Manual:API Python3: Difference between revisions
Jump to navigation
Jump to search
(→code) |
|||
(15 intermediate revisions by 3 users not shown) | |||
Line 1: | Line 1: | ||
__TOC__ | |||
==Summary== | ==Summary== | ||
Since python language have introduced changes to syntax when going from 2.x to 3.x some adjustments had to be made for old code from [[Manual:API|API]]. | Since python language have introduced changes to syntax when going from 2.x to 3.x some adjustments had to be made for old code from [[Manual:API|API]]. | ||
==Code for Python3== | ==Code for Python3== | ||
====code==== | |||
<pre> | |||
#!/usr/bin/python3 | |||
# -*- coding: latin-1 -*- | |||
import sys, posix, time, binascii, socket, select, ssl | |||
import hashlib | |||
class ApiRos: | |||
"Routeros api" | "Routeros api" | ||
def __init__(self, sk): | def __init__(self, sk): | ||
self.sk = sk | self.sk = sk | ||
self.currenttag = 0 | self.currenttag = 0 | ||
def login(self, username, pwd): | def login(self, username, pwd): | ||
for repl, attrs in self.talk(["/login"]): | for repl, attrs in self.talk(["/login", "=name=" + username, | ||
chal = binascii.unhexlify((attrs['=ret']).encode( | "=password=" + pwd]): | ||
if repl == '!trap': | |||
return False | |||
elif '=ret' in attrs.keys(): | |||
#for repl, attrs in self.talk(["/login"]): | |||
chal = binascii.unhexlify((attrs['=ret']).encode(sys.stdout.encoding)) | |||
"=response=00" + binascii.hexlify(md.digest()).decode( | md = hashlib.md5() | ||
md.update(b'\x00') | |||
md.update(pwd.encode(sys.stdout.encoding)) | |||
md.update(chal) | |||
for repl2, attrs2 in self.talk(["/login", "=name=" + username, | |||
"=response=00" + binascii.hexlify(md.digest()).decode(sys.stdout.encoding) ]): | |||
if repl2 == '!trap': | |||
return False | |||
return True | |||
def talk(self, words): | def talk(self, words): | ||
if self.writeSentence(words) == 0: return | if self.writeSentence(words) == 0: return | ||
Line 40: | Line 51: | ||
r.append((reply, attrs)) | r.append((reply, attrs)) | ||
if reply == '!done': return r | if reply == '!done': return r | ||
def writeSentence(self, words): | def writeSentence(self, words): | ||
ret = 0 | ret = 0 | ||
Line 48: | Line 59: | ||
self.writeWord('') | self.writeWord('') | ||
return ret | return ret | ||
def readSentence(self): | def readSentence(self): | ||
r = [] | r = [] | ||
Line 55: | Line 66: | ||
if w == '': return r | if w == '': return r | ||
r.append(w) | r.append(w) | ||
def writeWord(self, w): | def writeWord(self, w): | ||
print(("<<< " + w)) | print(("<<< " + w)) | ||
self.writeLen(len(w)) | self.writeLen(len(w)) | ||
self.writeStr(w) | self.writeStr(w) | ||
def readWord(self): | def readWord(self): | ||
ret = self.readStr(self.readLen()) | ret = self.readStr(self.readLen()) | ||
print((">>> " + ret)) | print((">>> " + ret)) | ||
return ret | return ret | ||
def writeLen(self, l): | def writeLen(self, l): | ||
if l < 0x80: | if l < 0x80: | ||
self. | self.writeByte((l).to_bytes(1, sys.byteorder)) | ||
elif l < 0x4000: | elif l < 0x4000: | ||
l |= 0x8000 | l |= 0x8000 | ||
self. | tmp = (l >> 8) & 0xFF | ||
self. | self.writeByte(((l >> 8) & 0xFF).to_bytes(1, sys.byteorder)) | ||
self.writeByte((l & 0xFF).to_bytes(1, sys.byteorder)) | |||
elif l < 0x200000: | elif l < 0x200000: | ||
l |= 0xC00000 | l |= 0xC00000 | ||
self. | self.writeByte(((l >> 16) & 0xFF).to_bytes(1, sys.byteorder)) | ||
self. | self.writeByte(((l >> 8) & 0xFF).to_bytes(1, sys.byteorder)) | ||
self. | self.writeByte((l & 0xFF).to_bytes(1, sys.byteorder)) | ||
elif l < 0x10000000: | elif l < 0x10000000: | ||
l |= 0xE0000000 | l |= 0xE0000000 | ||
self. | self.writeByte(((l >> 24) & 0xFF).to_bytes(1, sys.byteorder)) | ||
self. | self.writeByte(((l >> 16) & 0xFF).to_bytes(1, sys.byteorder)) | ||
self. | self.writeByte(((l >> 8) & 0xFF).to_bytes(1, sys.byteorder)) | ||
self. | self.writeByte((l & 0xFF).to_bytes(1, sys.byteorder)) | ||
else: | else: | ||
self. | self.writeByte((0xF0).to_bytes(1, sys.byteorder)) | ||
self. | self.writeByte(((l >> 24) & 0xFF).to_bytes(1, sys.byteorder)) | ||
self. | self.writeByte(((l >> 16) & 0xFF).to_bytes(1, sys.byteorder)) | ||
self. | self.writeByte(((l >> 8) & 0xFF).to_bytes(1, sys.byteorder)) | ||
self. | self.writeByte((l & 0xFF).to_bytes(1, sys.byteorder)) | ||
def readLen(self): | def readLen(self): | ||
c = ord(self.readStr(1)) | c = ord(self.readStr(1)) | ||
if (c & 0x80) == 0x00: | # print (">rl> %i" % c) | ||
pass | if (c & 0x80) == 0x00: | ||
elif (c & 0xC0) == 0x80: | pass | ||
c &= ~0xC0 | elif (c & 0xC0) == 0x80: | ||
c <<= 8 | c &= ~0xC0 | ||
c += ord(self.readStr(1)) | c <<= 8 | ||
elif (c & 0xE0) == 0xC0: | c += ord(self.readStr(1)) | ||
c &= ~0xE0 | elif (c & 0xE0) == 0xC0: | ||
c <<= 8 | c &= ~0xE0 | ||
c += ord(self.readStr(1)) | c <<= 8 | ||
c <<= 8 | c += ord(self.readStr(1)) | ||
c += ord(self.readStr(1)) | c <<= 8 | ||
elif (c & 0xF0) == 0xE0: | c += ord(self.readStr(1)) | ||
c &= ~0xF0 | elif (c & 0xF0) == 0xE0: | ||
c <<= 8 | c &= ~0xF0 | ||
c += ord(self.readStr(1)) | c <<= 8 | ||
c <<= 8 | c += ord(self.readStr(1)) | ||
c += ord(self.readStr(1)) | c <<= 8 | ||
c <<= 8 | c += ord(self.readStr(1)) | ||
c += ord(self.readStr(1)) | c <<= 8 | ||
elif (c & 0xF8) == 0xF0: | c += ord(self.readStr(1)) | ||
c = ord(self.readStr(1)) | elif (c & 0xF8) == 0xF0: | ||
c <<= 8 | c = ord(self.readStr(1)) | ||
c += ord(self.readStr(1)) | c <<= 8 | ||
c <<= 8 | c += ord(self.readStr(1)) | ||
c += ord(self.readStr(1)) | c <<= 8 | ||
c <<= 8 | c += ord(self.readStr(1)) | ||
c += ord(self.readStr(1)) | c <<= 8 | ||
return c | c += ord(self.readStr(1)) | ||
return c | |||
def writeStr(self, str): | |||
n = 0; | def writeStr(self, str): | ||
while n < len(str): | n = 0; | ||
while n < len(str): | |||
r = self.sk.send(bytes(str[n:], 'UTF-8')) | r = self.sk.send(bytes(str[n:], 'UTF-8')) | ||
if r == 0: raise RuntimeError("connection closed by remote end") | if r == 0: raise RuntimeError("connection closed by remote end") | ||
n += r | n += r | ||
def readStr(self, length): | def writeByte(self, str): | ||
ret = '' | n = 0; | ||
while len(ret) < length: | while n < len(str): | ||
s = self.sk.recv(length - len(ret) | r = self.sk.send(str[n:]) | ||
if s == '': raise RuntimeError("connection closed by remote end") | if r == 0: raise RuntimeError("connection closed by remote end") | ||
ret += s | n += r | ||
def readStr(self, length): | |||
ret = '' | |||
# print ("length: %i" % length) | |||
while len(ret) < length: | |||
s = self.sk.recv(length - len(ret)) | |||
if s == b'': raise RuntimeError("connection closed by remote end") | |||
# print (b">>>" + s) | |||
# atgriezt kaa byte ja nav ascii chars | |||
if s >= (128).to_bytes(1, "big") : | |||
return s | |||
# print((">>> " + s.decode(sys.stdout.encoding, 'ignore'))) | |||
ret += s.decode(sys.stdout.encoding, "replace") | |||
return ret | return ret | ||
def open_socket(dst, port, secure=False): | |||
s = None | |||
s.connect((sys.argv[1], | res = socket.getaddrinfo(dst, port, socket.AF_UNSPEC, socket.SOCK_STREAM) | ||
apiros = ApiRos(s); | af, socktype, proto, canonname, sockaddr = res[0] | ||
apiros.login( | skt = socket.socket(af, socktype, proto) | ||
if secure: | |||
s = ssl.wrap_socket(skt, ssl_version=ssl.PROTOCOL_TLSv1_2, ciphers="ADH-AES128-SHA256") #ADH-AES128-SHA256 | |||
else: | |||
s = skt | |||
s.connect(sockaddr) | |||
return s | |||
def main(): | |||
s = None | |||
dst = sys.argv[1] | |||
user = "admin" | |||
passw = "" | |||
secure = True | |||
port = 0 | |||
# use default username and pasword if not specified | |||
if len(sys.argv) == 4: | |||
user = sys.argv[2] | |||
passw = sys.argv[3] | |||
elif len(sys.argv) == 3: | |||
user = sys.argv[2] | |||
if (port==0): | |||
port = 8729 if secure else 8728 | |||
s = open_socket(dst, port, secure) | |||
if s is None: | |||
print ('could not open socket') | |||
sys.exit(1) | |||
apiros = ApiRos(s); | |||
if not apiros.login(user, passw): | |||
return | |||
inputsentence = [] | inputsentence = [] | ||
while 1: | while 1: | ||
r = select.select([s, sys.stdin], [], [], None) | r = select.select([s, sys.stdin], [], [], None) | ||
Line 151: | Line 210: | ||
# something to read in socket, read sentence | # something to read in socket, read sentence | ||
x = apiros.readSentence() | x = apiros.readSentence() | ||
if sys.stdin in r[0]: | if sys.stdin in r[0]: | ||
# read line from input and strip off newline | # read line from input and strip off newline | ||
l = sys.stdin.readline() | l = sys.stdin.readline() | ||
l = l[:-1] | l = l[:-1] | ||
# if empty line, send sentence and start with new | # if empty line, send sentence and start with new | ||
# otherwise append to input sentence | # otherwise append to input sentence | ||
Line 164: | Line 223: | ||
else: | else: | ||
inputsentence.append(l) | inputsentence.append(l) | ||
if __name__ == '__main__': | |||
main() | main() | ||
</pre> | |||
====file==== | |||
[http://wiki.mikrotik.com/images/6/6b/Api.txt api client in python3] | |||
==See also== | ==See also== | ||
[[Manual:API|API]] | [[Manual:API|API]] |
Latest revision as of 14:45, 22 February 2019
Summary
Since python language have introduced changes to syntax when going from 2.x to 3.x some adjustments had to be made for old code from API.
Code for Python3
code
#!/usr/bin/python3 # -*- coding: latin-1 -*- import sys, posix, time, binascii, socket, select, ssl import hashlib class ApiRos: "Routeros api" def __init__(self, sk): self.sk = sk self.currenttag = 0 def login(self, username, pwd): for repl, attrs in self.talk(["/login", "=name=" + username, "=password=" + pwd]): if repl == '!trap': return False elif '=ret' in attrs.keys(): #for repl, attrs in self.talk(["/login"]): chal = binascii.unhexlify((attrs['=ret']).encode(sys.stdout.encoding)) md = hashlib.md5() md.update(b'\x00') md.update(pwd.encode(sys.stdout.encoding)) md.update(chal) for repl2, attrs2 in self.talk(["/login", "=name=" + username, "=response=00" + binascii.hexlify(md.digest()).decode(sys.stdout.encoding) ]): if repl2 == '!trap': return False return True def talk(self, words): if self.writeSentence(words) == 0: return r = [] while 1: i = self.readSentence(); if len(i) == 0: continue reply = i[0] attrs = {} for w in i[1:]: j = w.find('=', 1) if (j == -1): attrs[w] = '' else: attrs[w[:j]] = w[j+1:] r.append((reply, attrs)) if reply == '!done': return r def writeSentence(self, words): ret = 0 for w in words: self.writeWord(w) ret += 1 self.writeWord('') return ret def readSentence(self): r = [] while 1: w = self.readWord() if w == '': return r r.append(w) def writeWord(self, w): print(("<<< " + w)) self.writeLen(len(w)) self.writeStr(w) def readWord(self): ret = self.readStr(self.readLen()) print((">>> " + ret)) return ret def writeLen(self, l): if l < 0x80: self.writeByte((l).to_bytes(1, sys.byteorder)) elif l < 0x4000: l |= 0x8000 tmp = (l >> 8) & 0xFF self.writeByte(((l >> 8) & 0xFF).to_bytes(1, sys.byteorder)) self.writeByte((l & 0xFF).to_bytes(1, sys.byteorder)) elif l < 0x200000: l |= 0xC00000 self.writeByte(((l >> 16) & 0xFF).to_bytes(1, sys.byteorder)) self.writeByte(((l >> 8) & 0xFF).to_bytes(1, sys.byteorder)) self.writeByte((l & 0xFF).to_bytes(1, sys.byteorder)) elif l < 0x10000000: l |= 0xE0000000 self.writeByte(((l >> 24) & 0xFF).to_bytes(1, sys.byteorder)) self.writeByte(((l >> 16) & 0xFF).to_bytes(1, sys.byteorder)) self.writeByte(((l >> 8) & 0xFF).to_bytes(1, sys.byteorder)) self.writeByte((l & 0xFF).to_bytes(1, sys.byteorder)) else: self.writeByte((0xF0).to_bytes(1, sys.byteorder)) self.writeByte(((l >> 24) & 0xFF).to_bytes(1, sys.byteorder)) self.writeByte(((l >> 16) & 0xFF).to_bytes(1, sys.byteorder)) self.writeByte(((l >> 8) & 0xFF).to_bytes(1, sys.byteorder)) self.writeByte((l & 0xFF).to_bytes(1, sys.byteorder)) def readLen(self): c = ord(self.readStr(1)) # print (">rl> %i" % c) if (c & 0x80) == 0x00: pass elif (c & 0xC0) == 0x80: c &= ~0xC0 c <<= 8 c += ord(self.readStr(1)) elif (c & 0xE0) == 0xC0: c &= ~0xE0 c <<= 8 c += ord(self.readStr(1)) c <<= 8 c += ord(self.readStr(1)) elif (c & 0xF0) == 0xE0: c &= ~0xF0 c <<= 8 c += ord(self.readStr(1)) c <<= 8 c += ord(self.readStr(1)) c <<= 8 c += ord(self.readStr(1)) elif (c & 0xF8) == 0xF0: c = ord(self.readStr(1)) c <<= 8 c += ord(self.readStr(1)) c <<= 8 c += ord(self.readStr(1)) c <<= 8 c += ord(self.readStr(1)) return c def writeStr(self, str): n = 0; while n < len(str): r = self.sk.send(bytes(str[n:], 'UTF-8')) if r == 0: raise RuntimeError("connection closed by remote end") n += r def writeByte(self, str): n = 0; while n < len(str): r = self.sk.send(str[n:]) if r == 0: raise RuntimeError("connection closed by remote end") n += r def readStr(self, length): ret = '' # print ("length: %i" % length) while len(ret) < length: s = self.sk.recv(length - len(ret)) if s == b'': raise RuntimeError("connection closed by remote end") # print (b">>>" + s) # atgriezt kaa byte ja nav ascii chars if s >= (128).to_bytes(1, "big") : return s # print((">>> " + s.decode(sys.stdout.encoding, 'ignore'))) ret += s.decode(sys.stdout.encoding, "replace") return ret def open_socket(dst, port, secure=False): s = None res = socket.getaddrinfo(dst, port, socket.AF_UNSPEC, socket.SOCK_STREAM) af, socktype, proto, canonname, sockaddr = res[0] skt = socket.socket(af, socktype, proto) if secure: s = ssl.wrap_socket(skt, ssl_version=ssl.PROTOCOL_TLSv1_2, ciphers="ADH-AES128-SHA256") #ADH-AES128-SHA256 else: s = skt s.connect(sockaddr) return s def main(): s = None dst = sys.argv[1] user = "admin" passw = "" secure = True port = 0 # use default username and pasword if not specified if len(sys.argv) == 4: user = sys.argv[2] passw = sys.argv[3] elif len(sys.argv) == 3: user = sys.argv[2] if (port==0): port = 8729 if secure else 8728 s = open_socket(dst, port, secure) if s is None: print ('could not open socket') sys.exit(1) apiros = ApiRos(s); if not apiros.login(user, passw): return inputsentence = [] while 1: r = select.select([s, sys.stdin], [], [], None) if s in r[0]: # something to read in socket, read sentence x = apiros.readSentence() if sys.stdin in r[0]: # read line from input and strip off newline l = sys.stdin.readline() l = l[:-1] # if empty line, send sentence and start with new # otherwise append to input sentence if l == '': apiros.writeSentence(inputsentence) inputsentence = [] else: inputsentence.append(l) if __name__ == '__main__': main()