#!/usr/bin/python import os import sys import hashlib import signal import re import getopt import urllib2 import time,logging import logging.handlers import tornado.httpserver import tornado.ioloop import tornado.web import datetime import sqlite3 from ctypes import * from multiprocessing import Value, Pipe, Queue #version: 2.00 ----- 317 DEBUG = 1 BE_WEIGHT_SCALE = 32 SRV_EWGHT_RANGE = 8192 rootdir = "/data" Q = Queue(20280) LOGQ = Queue(10240) FORKNUM = 20 CookieTTL = 1800 dwpid = [] ppid = 0 logpid = 0 httppid = 0 http_server = '' p_dwnum = Value('d',0) p_lognum = Value('d',0) g_opt = dict(port = 42233, ip = '0.0.0.0', cookie = 'cliuser', cookie_secret = '~t0.f/b?B=#7E%9Rdy%RXP&8N/Myx"1i', hdown_secret = 'tUKxLUENxuTJTB8WNeOZxTMXW3jI8OK6U6qpceZECeowL8YXlKZEnqUnPjHvaLzF21D2ZqaIpEW8lMWBXt2m8Ukcn1NhAmfAXH0U', ldb = '/data/logs/hashdown/hashdown.sdb', logfile = '/data/logs/hashdown/hashdown.log', rpurl = 'http://cdncenter.cibntv.net/syncreport', login_expired = 1200, pidfile = '/tmp/hashdown.pid', ) def daemonize (stdin='/dev/null', stdout='/dev/null', stderr='/dev/null', pidfile=None): ''' Fork the current process as a daemon, redirecting standard file descriptors (by default, redirects them to /dev/null). ''' # Perform first fork. try: pid = os.fork( ) if pid > 0: sys.exit(0) # Exit first parent. except OSError, e: sys.stderr.write("fork #1 failed: (%d) %s\n" % (e.errno, e.strerror)) sys.exit(1) # Decouple from parent environment. os.chdir("/") os.umask(0) os.setsid( ) # Perform second fork. try: pid = os.fork( ) if pid > 0: sys.exit(0) # Exit second parent. except OSError, e: sys.stderr.write("fork #2 failed: (%d) %s\n" % (e.errno, e.strerror)) sys.exit(1) # The process is now daemonized, redirect standard file descriptors. for f in sys.stdout, sys.stderr: f.flush( ) si = file(stdin, 'r') so = file(stdout, 'a+') se = file(stderr, 'a+', 0) os.dup2(si.fileno( ), sys.stdin.fileno( )) os.dup2(so.fileno( ), sys.stdout.fileno( )) os.dup2(se.fileno( ), sys.stderr.fileno( )) if pidfile == None: pidfile = "/var/run/%s.pid" % sys.argv[0].split('.')[0] try: pidf = file(pidfile,'w') pidf.write('%d' % os.getpid()) pidf.close() except IOError,msg: print msg.strerror sys.exit(1) def exit_handler(signum,frame): pid = os.getpid() i = 0 if pid == ppid: #os.kill(httppid,signal.SIGTERM) #for cip in xrange(0,200): # Q.put("9|exit") for cpid in dwpid: Q.put("9|exit|%i" % cpid) while True: if p_lognum.value == 1: break LOGQ.put("info| dwnum: %i" % p_dwnum.value) if p_dwnum.value == FORKNUM and i == 0: LOGQ.put("9|exit") i = 1 time.sleep(1) tornado.ioloop.IOLoop.instance().stop() os.remove(g_opt['pidfile']) def usr1_handler(signum,frame): pid = os.getpid() if pid == ppid: Q.put("1|start") class CHash: def __init__(self, hlists,t='s'): self.ring={} self.points = [] self.points_len = 0 self.weight = BE_WEIGHT_SCALE * 200 self.ipzone = CDLL('libdiskdisp.so') self.hashStr = self.ipzone.hashStr self.chash_hash = self.ipzone.chash_hash self.chash_hash.restype = c_uint self.hashStr.restype = c_uint tag = 1 for hlist in hlists: if t == 'disk': if not os.path.isdir("%s/%s" % (rootdir,hlist)): os.makedirs("%s/%s" % (rootdir,hlist)) for i in xrange(0,self.weight): #str=('%s-%03d' % (hlist,i)) point=self.chash_hash(tag * SRV_EWGHT_RANGE + (i+1)*33284123) self.ring[point]=hlist tag += 1 if len(self.ring)!=0: # set points and sorted self.points=self.ring.keys() self.points_len = len(self.points) - 1 self.points.sort() def getring(self,str): hash = self.hashStr(str) point = self.search(hash,0,self.points_len) return [self.ring[self.points[point]],point] def getbakring(self,point): pointc = 0 pointa = 0 pointb = 0 if point == self.points_len: pointa = point - 1 pointb = 0 elif point == 0: pointa = self.points_len pointb = 1 else: pointa = point - 1 pointb = point + 1 while self.ring[self.points[point]] == self.ring[self.points[pointa]]: if pointa == 0: pointa = self.points_len else: pointa -= 1 while self.ring[self.points[point]] == self.ring[self.points[pointb]]: if pointb == self.points_len: pointb = 0 else: pointb += 1 if pointb < pointa: if hash <= self.points[pointb]: if (4294967295 - self.points[pointa] + hash + 1) <= (self.points[pointb] - hash): pointc = pointa else: pointc = pointb elif self.points[pointa] < hash: #print "g: %u; t: %u" % (self.points[0],self.points[end]) if (hash - self.points[pointa]) <= (4294967295 - hash + self.points[pointb] + 1): pointc = pointa else: pointc = pointb else: if (hash - self.points[pointa]) <= (self.points[pointb] - hash): pointc = pointa else: pointc = pointb return self.ring[self.points[pointc]] def search(self,p,start=0,end=0): mid = (start+end) / 2 if p <= self.points[0]: if (4294967295 - self.points[end] + p + 1) <= (self.points[0] - p): return end else: return 0 elif self.points[end] < p: #print "g: %u; t: %u" % (self.points[0],self.points[end]) if (p - self.points[end]) <= (4294967295 - p + self.points[0] + 1): return end else: return 0 if self.points[mid] > p and self.points[mid-1] <= p: #print "g: %u; t: %u" % (self.points[mid],self.points[mid-1]) if (p - self.points[mid-1]) <= (self.points[mid] - p): return mid-1 else: return mid elif self.points[mid+1] > p and self.points[mid] <= p: #print "g: %u; t: %u" % (self.points[mid+1],self.points[mid]) if (p - self.points[mid]) <= (self.points[mid+1] - p): return mid else: return mid+1 elif self.points[mid] > p: end=mid return self.search(p,start,end) elif self.points[mid] < p: start=mid return self.search(p,start,end) def requrl(url,post = None,headers = {},timeout = 120,cookiefile = None,outfile=None,op=None): #op=:s."Content-Length",t."Last-Modified" #cookiejar = cookielib.CookieJar() if cookiefile == None: opener = urllib2.build_opener() else: cookiejar = cookielib.LWPCookieJar(cookiefile) #cookiejar.save(cookiefile) opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookiejar)) #opener = urllib2.build_opener() req = urllib2.Request(url,data = post,headers = headers) result = {} try: geturl = opener.open(req,timeout = timeout) except Exception, e: if hasattr(e, 'reason'): result['code'] = 10; result['content'] = e.reason elif hasattr(e, 'code'): #print 'The server couldn\'t fulfill the request.' result['code'] = e.code result['content'] = e.read() result['header'] = e.headers else: result['content'] = e result['code'] = 0 return result else: getlen = 0 s = '' #print geturl.info() result['code'] = geturl.getcode() result['content'] = '' #cookiejar.save() result['header'] = geturl.info() if outfile != None: if op == 's' or op == 'st': nsize = int(geturl.info().get("Content-Length",0)) if nsize == 0: outfile = None else: try: fsize = os.stat(outfile).st_size except: fsize = 0 if nsize == fsize: result['content'] = 'OK' geturl.close() return result try: if op == 'range': tofile = file(outfile,'a') else: tofile = file(outfile,'w') except IOError,msg: result['error'] = msg.strerror result['code'] = 260 outfile = None while 1: tmp = geturl.read(1048576) #print tmp if not tmp: break getlen = getlen + len(tmp) if outfile != None: try: tofile.write(tmp) except IOError,msg: result['error'] = msg.strerror result['code'] = 260 else: if getlen < 5242880: s = '%s%s' % (s,tmp) if outfile != None: result['content'] = 'OK' tofile.close() else: result['content'] = s geturl.close() result['content_length'] = getlen return result def getfilemd5(f): m=hashlib.md5() s = 0 try: fd=open(f,'rb') except: return '0',0 while True: d = fd.read(1048576) if not d: break m.update(d) s += len(d) fd.close() return m.hexdigest(),s def cookie_auth(): cookietime = int(time.time()) cookievalue = hashlib.md5("%s|%s" % (g_opt["cookie_secret"],cookietime)).hexdigest() return "%s|%s" % (cookievalue,cookietime) class Application(tornado.web.Application): def __init__(self): handlers = [ #(r"/admin?cmd=([a-zA-Z]+)&p=(\w+)", AdminHandler), (r"/synclist/(.+)", AdminHandler), ] settings = dict( ) tornado.web.Application.__init__(self, handlers, **settings) class AdminHandler(tornado.web.RequestHandler): def get_secure_cookie(self, name, include_name=True, value=None): if self.request.remote_ip == "127.0.0.1": return "localhost" if value is None: value = self.get_cookie(name) if not value: return None v = value.split('|') if len(v) != 2: return None checkvalue = hashlib.md5("%s|%s" % (g_opt["cookie_secret"],v[1])).hexdigest() #self.logtime = time.strftime("%Y-%m-%d %H:%M:%S +0000", time.localtime()) if v[0] == checkvalue and time.time() - int(v[1]) < CookieTTL: return "admin" else: LOGQ.put("warn|%s [signature] %s %s" % (self.request.remote_ip, self.request.headers['Cookie'], self.request.uri) ) return None def get_current_user(self): #return True return self.get_secure_cookie(g_opt['cookie']) def post(self, sk): if not self.get_current_user(): return status = 200 prilist = self.request.headers.get("X-Pri-Queue",default="mlist") msg = "OK" if sk == 'post': synclists = self.request.body.split('\n') umd5 = {} for listurl in synclists: if len(listurl) < 10: continue md5 = listurl.split('|')[2] if md5 in umd5: continue umd5[md5] = 1 try: Q.put("1|%s|%s" % (prilist,listurl)) except: status = 502 LOGQ.put("info|Queue full.") msg = "Queue full." self.set_status(status) LOGQ.put("info|Recv down task: %i" % len(synclists)) self.write(msg) elif sk == 'delete': msg = "OK" synclists = self.request.body.split('\n') for slist in synclists: try: Q.put("2|%s|%s" % (prilist,slist)) LOGQ.put("info|delete file: %s" % slist) except: status = 502 LOGQ.put("info|Queue full.") msg = "Queue full." self.set_status(status) LOGQ.put("info|Recv del task: %i" % len(synclists)) self.write(msg) elif sk == 'queue': qsize = Q.qsize() self.write("Queue size: %s" % qsize) elif sk == 'getqueue': try: qq = Q.get_nowait() except: qq = "NO" else: Q.put(qq) self.write(qq) def httpreport(md5,s,content_length,prilist): headers = {} headers["User-Agent"] = 'HashDown' #headers["Host"] = 'hdc.cibntv.net' rpurl = "%s?md5=%s&contentlen=%d&plist=%s&s=%i" % (g_opt['rpurl'],md5,content_length,prilist,s) headers['Cookie'] = '%s=%s' % (g_opt["cookie"], cookie_auth()) rps = "" dtime = 0 for i in [1,2,3]: now1 = time.time() rps = requrl(rpurl,headers=headers) now2 = time.time() dtime = now2-now1 if rps['code'] == 200: break time.sleep(1) if rps['code'] != 200: LOGQ.put("error|[report] %s; code: %s; error: %s" % (rpurl,rps['code'],rps['content'])) LOGQ.put("info|[report] url: %s; code: %s; time: %.03f" % (rpurl, rps['code'], dtime)) def httpdown(url,md5,bips,prilist): rec = re.compile('(http[s]?):\/\/([^\/|:]+)([^\/]*)(.*)\/(.+)\.(\w+)\??.*$') scheme,dname,port,spath,sname,sext = re.findall(rec,url)[0] s = 0 #sname = uname.encode() headers = {} headers["User-Agent"] = 'HashDown' headers["Host"] = 'hashdown.cibntv.net' hdir = ch.getring(sname)[0] rdir = "%s/%s%s" % (rootdir,hdir,spath) msg = '' dtime = 0 code = 999 fname = "%s/%s.%s" % (rdir,sname,sext) lockst = "%s/%s.lock" % (rdir,sname) ip = "" lksize = -1 content_length = 0 try: if not os.path.exists(rdir): os.makedirs(rdir) except OSError as e: if e.errno != 17: msg = "mkdir %s failed: %s %i" % (rdir,e.strerror,e.errno) s = 920 if s != 920: try: lksize = os.stat(lockst).st_size lkmtime = os.stat(lockst).st_mtime except: lksize = 0 if lksize > 0 and (time.time() - lkmtime) < 600: return try: lkfile = file(lockst,'w') except: lksize = -1 else: lkfile.write("lock") lkfile.close() lmd5,size = getfilemd5(fname) if lmd5 == md5: s = 20 msg = "File is already exist" else: size = 0 t = int(time.time()) k = hashlib.md5("%s|hashdown|%i|2592000" % (g_opt["hdown_secret"],t)).hexdigest() iplist = bips.split(':') iplist.insert(0,dname) for ip in iplist: url = "%s://%s%s%s/%s.%s?k=%s&channel=hashdown&t=%i&ttl=2592000" % (scheme,ip,port,spath,sname,sext,k,t) for i in [1,2]: now1 = time.time() #download file with range try: headers["Range"] = "bytes=0-1024" dresult = requrl(url,headers=headers,outfile=fname) if dresult['content_length'] > 1024: headers["Range"] = "bytes=1025-" dresult = requrl(url,headers=headers,outfile=fname,op="range") if dresult['code'] == 416: dresult['code'] = 206 except: s = 11 else: now2 = time.time() dtime = now2-now1 code = dresult['code'] if (dresult['code'] == 200 or dresult['code'] == 206) and dresult['content'] == 'OK': s = 10 lmd5,size = getfilemd5(fname) #logger.debug("[downlist] aaa: %s",lmd5) content_length = size if lmd5 == md5: s = 20 LOGQ.put("sdb|'%s','%s',%i" % (md5,fname,size)) break else: s = 21 try: os.remove(fname) except: msg = "error md5 file remove failed" msg = "md5 is not equal: %s|%s" % (lmd5,md5) elif dresult['code'] == 260: s = 920 else: s = 11 #os.remove(fname) msg = "download failed: [%s] %s" % (dresult['code']) if s == 20 or s == 920: break if lksize != -1: os.remove(lockst) try: logmsg = ("info|[down] IP: %s-%s; path: %s; code: %d; contentlen: %d; downtime: %.03f; stats: %i; msg: %s" % (dname, ip, fname, code, content_length, dtime, s, msg )) except: logmsg = "info|[down] IP: %s; path: %s" % (dname,fname) LOGQ.put(logmsg) if s == 920: s = 20 httpreport(md5,s,content_length,prilist) def downlist(): #rec = re.compile('http[s]?:\/\/([^\/|:]+)[^\/]*(.*)\/(.+)\.(\w+)\??.*$') pid = os.getpid() while True: op = [] try: tmp = Q.get(True,10) op = tmp.split('|') #op|prilist|1|url|md5|bips except: continue s = 0 md5 = '' lenop = len(op) if op[0] == "1": try: if lenop == 5: httpdown(op[3],op[4],"",op[1]) elif lenop == 6: httpdown(op[3],op[4],op[5],op[1]) except: continue elif op[0] == "2": if len(op) < 5: continue url = op[3] md5 = op[4] p1 = url.rfind('.') p2 = url.rfind('/') sname = url[p2+1:p1] hdir = ch.getring(sname)[0] rdir = "%s/%s%s" % (rootdir,hdir,url[:p2]) fname = "%s/%s.%s" % (rdir,sname,url[p1+1:]) try: os.remove(fname) s = 40 except OSError as e : s = 42 LOGQ.put("error|[dellist] delete failed: md5: ;fname: %s; error: %s" % (fname, e.strerror)) if e.errno == 2: s = 40 LOGQ.put("info|[dellist] delete with: fname: %s" % fname) httpreport(md5,s,-1,op[1]) elif op[0] == "9": if int(op[2]) == pid: p_dwnum.value += 1 LOGQ.put("info|downlist exit") return else: Q.put("9|exit|%s" % op[2]) else: continue def dict_factory(cursor, row): d = {} for idx, col in enumerate(cursor.description): d[col[0]] = row[idx] return d def logqueue(): logger = logging.getLogger() loghl = logging.handlers.RotatingFileHandler(g_opt['logfile'],maxBytes=209715200, backupCount=100) fmt = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s') loghl.setFormatter(fmt) logger.addHandler(loghl) logger.setLevel(logging.INFO) conn = sqlite3.connect(g_opt['ldb']) conn.row_factory = dict_factory cur = conn.cursor() cur.execute("select name from sqlite_master where name='hls'") if cur.fetchone() == None: cur.executescript("""create table hls (md5 text,file text,size integer); CREATE UNIQUE INDEX md5 on hls (md5); """) conn.commit() while True: op = [] try: tmp = LOGQ.get(True,10) op = tmp.split('|') except: if p_dwnum.value == FORKNUM: p_lognum.value = 1 return continue else: if op[0] == "error": logger.error(";".join(op[1:])) elif op[0] == "info": logger.info(";".join(op[1:])) elif op[0] == "warn": logger.warning(";".join(op[1:])) elif op[0] == "sdb": try: cur.execute("insert into hls values(%s)" % op[1]) conn.commit() except sqlite3.Error as e: logger.warning("[sdb] %s: %s" % (op[1],e.args[0])) elif op[0] == "9": logger.info("logproc exit") if p_dwnum.value == FORKNUM: p_lognum.value = 1 return cur.close() conn.close() if __name__ == "__main__": try: opts, args = getopt.getopt(sys.argv[1:], "hc:r:d:") except getopt.GetoptError, err: print str(err) # will print something like "option -a not recognized" sys.exit(2) hashconf = "no" datadir = "no" for o, a in opts: if o == "-h": print "python hashdown.py \n" sys.exit() elif o == '-c': hashconf = a elif o == '-r': rootdir = a elif o == '-d': datadir = a daemonize('/dev/null','/tmp/hdown.out','/tmp/hdown.err',g_opt['pidfile']) ppid = os.getpid() dd = [] if datadir != "no": dd.append(datadir) else: try: ff = file(hashconf,'r') except: print "Cant open hashconf file.\n" sys.exit(1) while True: line = ff.readline() if not line: break; l = line if line[-1] == "\n": l = line[:-1] dd.append(l) global ch ch = CHash(dd,'disk') pid1 = os.fork() if pid1 == 0: logqueue() sys.exit() else: logpid = pid1 for i in xrange(0,FORKNUM): pid = os.fork() if pid == 0: downlist() sys.exit() else: dwpid.append(pid) signal.signal(signal.SIGTERM,exit_handler) signal.signal(signal.SIGQUIT,exit_handler) signal.signal(signal.SIGUSR1,usr1_handler) http_server = tornado.httpserver.HTTPServer(Application()) http_server.listen(int(g_opt['port'])) tornado.ioloop.IOLoop.instance().start() """ while True: for i in xrange(0,FORKNUM): if len(ppids) >= i: continue pid = os.fork() if pid == 0: downlist() ppids.remove(os.getpid()) break else: if len(ppids) == 0: signal.signal(signal.SIGTERM,exit_handler) signal.signal(signal.SIGQUIT,exit_handler) signal.signal(signal.SIGUSR1,usr1_handler) ppids.append(pid) if ppid != os.getpid(): break time.sleep(60) #usr1_handler(signal.SIGUSR1,None) """