Manual:API Python3: Difference between revisions
Jump to navigation
Jump to search
(→code) |
m (→file) |
||
(One intermediate revision by the same user not shown) | |||
Line 8: | Line 8: | ||
#!/usr/bin/python3 | #!/usr/bin/python3 | ||
import sys, posix, time, binascii, socket, select | import sys, posix, time, hashlib, binascii, socket, select | ||
class ApiRos: | class ApiRos: | ||
Line 19: | Line 18: | ||
def login(self, username, pwd): | def login(self, username, pwd): | ||
for repl, attrs in self.talk(["/login"]): | for repl, attrs in self.talk(["/login"]): | ||
chal = binascii.unhexlify | chal = binascii.unhexlify(attrs['=ret']) | ||
md = hashlib.md5() | md = hashlib.md5() | ||
md.update(b'\x00') | md.update(b'\x00') | ||
md.update(pwd | md.update(bytes(pwd, 'utf-8')) | ||
md.update(chal) | md.update(chal) | ||
self.talk(["/login", "=name=" + username, | self.talk(["/login", "=name=" + username, | ||
"=response=00" + binascii.hexlify(md.digest()).decode(' | "=response=00" + binascii.hexlify(md.digest()).decode('utf-8')]) | ||
def talk(self, words): | def talk(self, words): | ||
Line 60: | Line 59: | ||
def writeWord(self, w): | def writeWord(self, w): | ||
print | print("<<< " + w) | ||
self.writeLen(len( | b = bytes(w, "utf-8") | ||
self. | self.writeLen(len(b)) | ||
self.writeBytes(b) | |||
def readWord(self): | def readWord(self): | ||
ret = self. | ret = self.readBytes(self.readLen()).decode('utf-8') | ||
print | print(">>> " + ret) | ||
return ret | return ret | ||
def writeLen(self, l): | def writeLen(self, l): | ||
if l < 0x80: | if l < 0x80: | ||
self. | self.writeBytes(bytes([l])) | ||
elif l < 0x4000: | elif l < 0x4000: | ||
l |= 0x8000 | l |= 0x8000 | ||
self. | self.writeBytes(bytes([(l >> 8) & 0xff, l & 0xff])) | ||
elif l < 0x200000: | elif l < 0x200000: | ||
l |= 0xC00000 | l |= 0xC00000 | ||
self. | self.writeBytes(bytes([(l >> 16) & 0xff, (l >> 8) & 0xff, l & 0xff])) | ||
elif l < 0x10000000: | elif l < 0x10000000: | ||
l |= 0xE0000000 | l |= 0xE0000000 | ||
self. | self.writeBytes(bytes([(l >> 24) & 0xff, (l >> 16) & 0xff, (l >> 8) & 0xff, l & 0xff])) | ||
else: | else: | ||
self. | self.writeBytes(bytes([0xf0, (l >> 24) & 0xff, (l >> 16) & 0xff, (l >> 8) & 0xff, l & 0xff])) | ||
def readLen(self): | def readLen(self): | ||
c = | c = self.readBytes(1)[0] | ||
if (c & 0x80) == 0x00: | if (c & 0x80) == 0x00: | ||
pass | pass | ||
Line 101: | Line 91: | ||
c &= ~0xC0 | c &= ~0xC0 | ||
c <<= 8 | c <<= 8 | ||
c += | c += self.readBytes(1)[0] | ||
elif (c & 0xE0) == 0xC0: | elif (c & 0xE0) == 0xC0: | ||
c &= ~0xE0 | c &= ~0xE0 | ||
c <<= 8 | c <<= 8 | ||
c += | c += self.readBytes(1)[0] | ||
c <<= 8 | c <<= 8 | ||
c += | c += self.readBytes(1)[0] | ||
elif (c & 0xF0) == 0xE0: | elif (c & 0xF0) == 0xE0: | ||
c &= ~0xF0 | c &= ~0xF0 | ||
c <<= 8 | c <<= 8 | ||
c += | c += self.readBytes(1)[0] | ||
c <<= 8 | c <<= 8 | ||
c += | c += self.readBytes(1)[0] | ||
c <<= 8 | c <<= 8 | ||
c += | c += self.readBytes(1)[0] | ||
elif (c & 0xF8) == 0xF0: | elif (c & 0xF8) == 0xF0: | ||
c = | c = self.readBytes(1)[0] | ||
c <<= 8 | c <<= 8 | ||
c += | c += self.readBytes(1)[0] | ||
c <<= 8 | c <<= 8 | ||
c += | c += self.readBytes(1)[0] | ||
c <<= 8 | c <<= 8 | ||
c += | c += self.readBytes(1)[0] | ||
return c | return c | ||
def | def writeBytes(self, str): | ||
n = 0; | n = 0; | ||
while n < len(str): | while n < len(str): | ||
r = self.sk.send | r = self.sk.send(str[n:]) | ||
if r == 0: raise RuntimeError("connection closed by remote end") | if r == 0: raise RuntimeError("connection closed by remote end") | ||
n += r | n += r | ||
def | def readBytes(self, length): | ||
ret = '' | ret = b'' | ||
while len(ret) < length: | while len(ret) < length: | ||
s = self.sk.recv(length - len(ret)) | s = self.sk.recv(length - len(ret)) | ||
if s == | if len(s) == 0: raise RuntimeError("connection closed by remote end") | ||
ret += s | ret += s | ||
return ret | return ret | ||
def main(): | def main(): | ||
s = | s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
s.connect((sys.argv[1], 8728)) | |||
apiros = ApiRos(s); | |||
apiros = ApiRos(s); | |||
apiros.login(sys.argv[2], sys.argv[3]); | apiros.login(sys.argv[2], sys.argv[3]); | ||
Line 191: | Line 164: | ||
====file==== | ====file==== | ||
[http://wiki.mikrotik.com/images/6/6b/Api.txt | [http://wiki.mikrotik.com/images/6/6b/Api.txt api client in python3] | ||
==See also== | ==See also== |
Revision as of 10:01, 30 October 2015
Summary
Since python language have introduced changes to syntax when going from 2.x to 3.x some adjustments had to be made for old code from API.
Code for Python3
code
#!/usr/bin/python3 import sys, posix, time, hashlib, binascii, socket, select class ApiRos: "Routeros api" def __init__(self, sk): self.sk = sk self.currenttag = 0 def login(self, username, pwd): for repl, attrs in self.talk(["/login"]): chal = binascii.unhexlify(attrs['=ret']) md = hashlib.md5() md.update(b'\x00') md.update(bytes(pwd, 'utf-8')) md.update(chal) self.talk(["/login", "=name=" + username, "=response=00" + binascii.hexlify(md.digest()).decode('utf-8')]) def talk(self, words): if self.writeSentence(words) == 0: return r = [] while 1: i = self.readSentence(); if len(i) == 0: continue reply = i[0] attrs = {} for w in i[1:]: j = w.find('=', 1) if (j == -1): attrs[w] = '' else: attrs[w[:j]] = w[j+1:] r.append((reply, attrs)) if reply == '!done': return r def writeSentence(self, words): ret = 0 for w in words: self.writeWord(w) ret += 1 self.writeWord('') return ret def readSentence(self): r = [] while 1: w = self.readWord() if w == '': return r r.append(w) def writeWord(self, w): print("<<< " + w) b = bytes(w, "utf-8") self.writeLen(len(b)) self.writeBytes(b) def readWord(self): ret = self.readBytes(self.readLen()).decode('utf-8') print(">>> " + ret) return ret def writeLen(self, l): if l < 0x80: self.writeBytes(bytes([l])) elif l < 0x4000: l |= 0x8000 self.writeBytes(bytes([(l >> 8) & 0xff, l & 0xff])) elif l < 0x200000: l |= 0xC00000 self.writeBytes(bytes([(l >> 16) & 0xff, (l >> 8) & 0xff, l & 0xff])) elif l < 0x10000000: l |= 0xE0000000 self.writeBytes(bytes([(l >> 24) & 0xff, (l >> 16) & 0xff, (l >> 8) & 0xff, l & 0xff])) else: self.writeBytes(bytes([0xf0, (l >> 24) & 0xff, (l >> 16) & 0xff, (l >> 8) & 0xff, l & 0xff])) def readLen(self): c = self.readBytes(1)[0] if (c & 0x80) == 0x00: pass elif (c & 0xC0) == 0x80: c &= ~0xC0 c <<= 8 c += self.readBytes(1)[0] elif (c & 0xE0) == 0xC0: c &= ~0xE0 c <<= 8 c += self.readBytes(1)[0] c <<= 8 c += self.readBytes(1)[0] elif (c & 0xF0) == 0xE0: c &= ~0xF0 c <<= 8 c += self.readBytes(1)[0] c <<= 8 c += self.readBytes(1)[0] c <<= 8 c += self.readBytes(1)[0] elif (c & 0xF8) == 0xF0: c = self.readBytes(1)[0] c <<= 8 c += self.readBytes(1)[0] c <<= 8 c += self.readBytes(1)[0] c <<= 8 c += self.readBytes(1)[0] return c def writeBytes(self, str): n = 0; while n < len(str): r = self.sk.send(str[n:]) if r == 0: raise RuntimeError("connection closed by remote end") n += r def readBytes(self, length): ret = b'' while len(ret) < length: s = self.sk.recv(length - len(ret)) if len(s) == 0: raise RuntimeError("connection closed by remote end") ret += s return ret def main(): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((sys.argv[1], 8728)) apiros = ApiRos(s); apiros.login(sys.argv[2], sys.argv[3]); inputsentence = [] while 1: r = select.select([s, sys.stdin], [], [], None) if s in r[0]: # something to read in socket, read sentence x = apiros.readSentence() if sys.stdin in r[0]: # read line from input and strip off newline l = sys.stdin.readline() l = l[:-1] # if empty line, send sentence and start with new # otherwise append to input sentence if l == '': apiros.writeSentence(inputsentence) inputsentence = [] else: inputsentence.append(l) if __name__ == '__main__': main()