#!/usr/bin/python import os import sys import hashlib import signal import re import getopt import urllib2 import time,logging import logging.handlers import tornado.httpserver import tornado.ioloop import tornado.web import datetime import sqlite3 from ctypes import * from multiprocessing import Value, Pipe, Queue, Pool #version: 1.02 ----- 275 DEBUG = 1 BE_WEIGHT_SCALE = 32 SRV_EWGHT_RANGE = 8192 rootdir = "/data" Q = Queue(20280) FORKNUM = 20 CookieTTL = 1800 ppids = [] ppid = 0 httppid = 0 http_server = '' hdown_option = dict(port = 42233, ip = '0.0.0.0', cookie = 'cliuser', cookie_secret = '~t0.f/b?B=#7E%9Rdy%RXP&8N/Myx"1i', hdown_secret = 'tUKxLUENxuTJTB8WNeOZxTMXW3jI8OK6U6qpceZECeowL8YXlKZEnqUnPjHvaLzF21D2ZqaIpEW8lMWBXt2m8Ukcn1NhAmfAXH0U', hlsdir = '/', ldb = 'hashdown.db', logfile = '/data/logs/hashdown/hashdown.log', rpurl = 'http://cdncenter.cibntv.net/syncreport', login_expired = 1200, pidfile = '/tmp/hashdown.pid', ) def daemonize (stdin='/dev/null', stdout='/dev/null', stderr='/dev/null', pidfile=None): ''' Fork the current process as a daemon, redirecting standard file descriptors (by default, redirects them to /dev/null). ''' # Perform first fork. try: pid = os.fork( ) if pid > 0: sys.exit(0) # Exit first parent. except OSError, e: sys.stderr.write("fork #1 failed: (%d) %s\n" % (e.errno, e.strerror)) sys.exit(1) # Decouple from parent environment. os.chdir("/") os.umask(0) os.setsid( ) # Perform second fork. try: pid = os.fork( ) if pid > 0: sys.exit(0) # Exit second parent. except OSError, e: sys.stderr.write("fork #2 failed: (%d) %s\n" % (e.errno, e.strerror)) sys.exit(1) # The process is now daemonized, redirect standard file descriptors. for f in sys.stdout, sys.stderr: f.flush( ) si = file(stdin, 'r') so = file(stdout, 'a+') se = file(stderr, 'a+', 0) os.dup2(si.fileno( ), sys.stdin.fileno( )) os.dup2(so.fileno( ), sys.stdout.fileno( )) os.dup2(se.fileno( ), sys.stderr.fileno( )) if pidfile == None: pidfile = "/var/run/%s.pid" % sys.argv[0].split('.')[0] try: pidf = file(pidfile,'w') pidf.write('%d' % os.getpid()) pidf.close() except IOError,msg: logging.warning(msg.strerror) sys.exit(1) def exit_handler(signum,frame): pid = os.getpid() if pid == httppid: tornado.ioloop.IOLoop.instance().stop() elif pid == ppid: #os.kill(httppid,signal.SIGTERM) tornado.ioloop.IOLoop.instance().stop() for i in xrange(0,FORKNUM): Q.put("9|exit") os.remove(hdown_option['pidfile']) else: for i in xrange(0,FORKNUM): Q.put("9|%s" % pid) def usr1_handler(signum,frame): pid = os.getpid() if pid == ppid: Q.put("1|start") class CHash: def __init__(self, hlists,t='s'): self.ring={} self.points = [] self.points_len = 0 self.weight = BE_WEIGHT_SCALE * 200 self.ipzone = CDLL('libdiskdisp.so') self.hashStr = self.ipzone.hashStr self.chash_hash = self.ipzone.chash_hash self.chash_hash.restype = c_uint self.hashStr.restype = c_uint tag = 1 for hlist in hlists: if t == 'disk': if not os.path.isdir("%s/%s" % (rootdir,hlist)): os.makedirs("%s/%s" % (rootdir,hlist)) for i in xrange(0,self.weight): #str=('%s-%03d' % (hlist,i)) point=self.chash_hash(tag * SRV_EWGHT_RANGE + (i+1)*33284123) self.ring[point]=hlist tag += 1 if len(self.ring)!=0: # set points and sorted self.points=self.ring.keys() self.points_len = len(self.points) - 1 self.points.sort() def getring(self,str): hash = self.hashStr(str) point = self.search(hash,0,self.points_len) return [self.ring[self.points[point]],point] def getbakring(self,point): pointc = 0 pointa = 0 pointb = 0 if point == self.points_len: pointa = point - 1 pointb = 0 elif point == 0: pointa = self.points_len pointb = 1 else: pointa = point - 1 pointb = point + 1 while self.ring[self.points[point]] == self.ring[self.points[pointa]]: if pointa == 0: pointa = self.points_len else: pointa -= 1 while self.ring[self.points[point]] == self.ring[self.points[pointb]]: if pointb == self.points_len: pointb = 0 else: pointb += 1 if pointb < pointa: if hash <= self.points[pointb]: if (4294967295 - self.points[pointa] + hash + 1) <= (self.points[pointb] - hash): pointc = pointa else: pointc = pointb elif self.points[pointa] < hash: #print "g: %u; t: %u" % (self.points[0],self.points[end]) if (hash - self.points[pointa]) <= (4294967295 - hash + self.points[pointb] + 1): pointc = pointa else: pointc = pointb else: if (hash - self.points[pointa]) <= (self.points[pointb] - hash): pointc = pointa else: pointc = pointb return self.ring[self.points[pointc]] def search(self,p,start=0,end=0): mid = (start+end) / 2 if p <= self.points[0]: if (4294967295 - self.points[end] + p + 1) <= (self.points[0] - p): return end else: return 0 elif self.points[end] < p: #print "g: %u; t: %u" % (self.points[0],self.points[end]) if (p - self.points[end]) <= (4294967295 - p + self.points[0] + 1): return end else: return 0 if self.points[mid] > p and self.points[mid-1] <= p: #print "g: %u; t: %u" % (self.points[mid],self.points[mid-1]) if (p - self.points[mid-1]) <= (self.points[mid] - p): return mid-1 else: return mid elif self.points[mid+1] > p and self.points[mid] <= p: #print "g: %u; t: %u" % (self.points[mid+1],self.points[mid]) if (p - self.points[mid]) <= (self.points[mid+1] - p): return mid else: return mid+1 elif self.points[mid] > p: end=mid return self.search(p,start,end) elif self.points[mid] < p: start=mid return self.search(p,start,end) def requrl(url,post = None,headers = {},timeout = 120,cookiefile = None,outfile=None,op=None): #op=:s."Content-Length",t."Last-Modified" #cookiejar = cookielib.CookieJar() if cookiefile == None: opener = urllib2.build_opener() else: cookiejar = cookielib.LWPCookieJar(cookiefile) #cookiejar.save(cookiefile) opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookiejar)) #opener = urllib2.build_opener() req = urllib2.Request(url,data = post,headers = headers) result = {} try: geturl = opener.open(req,timeout = timeout) except Exception, e: result['url'] = url if hasattr(e, 'reason'): result['code'] = 10; result['content'] = e.reason elif hasattr(e, 'code'): #print 'The server couldn\'t fulfill the request.' result['code'] = e.code result['content'] = e.read() result['header'] = e.headers else: result['content'] = e result['code'] = 0 return result else: getlen = 0 s = '' #print geturl.info() result['code'] = geturl.getcode() result['content'] = '' #cookiejar.save() result['header'] = geturl.info() if outfile != None: if op == 's' or op == 'st': nsize = int(geturl.info().get("Content-Length",0)) if nsize == 0: outfile = None else: try: fsize = os.stat(outfile).st_size except: fsize = 0 if nsize == fsize: result['content'] = 'OK' geturl.close() return result try: tofile = file(outfile,'w') except IOError,msg: result['error'] = msg.strerror result['code'] = 260 outfile = None while 1: tmp = geturl.read(1048576) #print tmp if not tmp: break if outfile != None: try: tofile.write(tmp) except IOError,msg: result['error'] = msg.strerror result['code'] = 260 else: getlen = getlen + len(tmp) if getlen < 5242880: s = '%s%s' % (s,tmp) if outfile != None: result['content'] = 'OK' tofile.close() else: result['content'] = s geturl.close() return result def getfilemd5(f): m=hashlib.md5() s = 0 try: fd=open(f,'rb') except: return '0',0 while True: d = fd.read(1048576) if not d: break m.update(d) s += len(d) fd.close() return m.hexdigest(),s def cookie_auth(): cookietime = int(time.time()) cookievalue = hashlib.md5("%s|%s" % (hdown_option["cookie_secret"],cookietime)).hexdigest() return "%s|%s" % (cookievalue,cookietime) class Application(tornado.web.Application): def __init__(self): handlers = [ #(r"/admin?cmd=([a-zA-Z]+)&p=(\w+)", AdminHandler), (r"/synclist/(.+)", AdminHandler), ] settings = dict( ) tornado.web.Application.__init__(self, handlers, **settings) class AdminHandler(tornado.web.RequestHandler): def get_secure_cookie(self, name, include_name=True, value=None): if value is None: value = self.get_cookie(name) if not value: return None v = value.split('|') checkvalue = hashlib.md5("%s|%s" % (hdown_option["cookie_secret"],v[1])).hexdigest() #self.logtime = time.strftime("%Y-%m-%d %H:%M:%S +0000", time.localtime()) if v[0] == checkvalue and time.time() - int(v[1]) < CookieTTL: return "admin" else: logger.warning("%s [signature] %s %s", self.request.remote_ip, self.request.headers['Cookie'], self.request.uri, ) return None def get_current_user(self): #return True return self.get_secure_cookie(hdown_option['cookie']) def post(self, sk): if not self.get_current_user(): return status = 200 msg = "OK" if sk == 'post': synclists = self.request.body.split('\n') for listurl in synclists: try: Q.put("1|%s" % listurl) except: status = 505 logger.debug("Queue full.") msg = "Queue full." self.set_status(status) self.write(msg) elif sk == 'delete': msg = "OK" synclists = self.request.body.split('\n') logger.debug("[dellist] list: %s", synclists, ) for slist in synclists: try: Q.put("2|%s" % slist) except: status = 505 msg = "Queue full." self.set_status(status) self.write(msg) elif sk == 'queue': qsize = Q.qsize() try: qq = Q.get_nowait() except: qsize = 0 else: Q.put(qq) self.write("Queue size: %s" % qsize) elif sk == 'getqueue': try: qq = Q.get_nowait() except: qq = "NO" else: Q.put(qq) self.write(qq) def httpreport(md5,s): headers = {} headers["User-Agent"] = 'HashDown' #headers["Host"] = 'hdc.cibntv.net' rpurl = "%s?md5=%s&s=%i" % (hdown_option['rpurl'],md5,s) headers['Cookie'] = '%s=%s' % (hdown_option["cookie"], cookie_auth()) rps = "" for i in [1,2,3]: now1 = time.time() rps = requrl(rpurl,headers=headers) now2 = time.time() if rps['code'] == 200: break time.sleep(1) if rps['code'] != 200: logger.error("[report] %s; code: %i; error: %s",rps['url'],rps['code'],rps['content']) logger.info("[report] url: %s; code: %s; time: %i", rpurl, rps['code'], int(now2-now1) ) def httpdown(url,md5): rec = re.compile('http[s]?:\/\/([^\/|:]+)[^\/]*(.*)\/(.+)\.(\w+)\??.*$') dname,spath,sname,sext = re.findall(rec,url)[0] s = 0 #sname = uname.encode() headers = {} headers["User-Agent"] = 'HashDown' #headers["Host"] = 'hdc.cibntv.net' hdir = ch.getring(sname)[0] rdir = "%s/%s%s" % (rootdir,hdir,spath) msg = '' dtime = 0 if not os.path.exists(rdir): try: os.makedirs(rdir) except: msg = "mkdir failed." fname = "%s/%s.%s" % (rdir,sname,sext) logger.info("[fname] %s", fname) lmd5,size = getfilemd5(fname) if lmd5 == md5: s = 20 msg = "File is already exist" else: size = 0 t = int(time.time()) k = hashlib.md5("%s|hashdown|%i|2592000" % (hdown_option["hdown_secret"],t)).hexdigest() url = "%s?k=%s&channel=hashdown&t=%i&ttl=2592000" % (url,k,t) for i in [1,2,3]: now1 = time.time() dresult = requrl(url,headers=headers,outfile=fname) now2 = time.time() dtime = "code: %i;time: %i" % (dresult['code'],now2-now1) if dresult['code'] == 200 and dresult['content'] == 'OK': s = 10 lmd5,size = getfilemd5(fname) #logger.debug("[downlist] aaa: %s",lmd5) if lmd5 == md5: s = 20 break else: s = 21 msg = "md5 is not equal: %s|%s" % (lmd5,md5) else: s = 11 msg = "download failed: [%s] %s" % (dresult['code'],dresult['content']) ###############by zhaowei start################### if s == 11: f = open('/data/scripts/sdhealthy.txt','r') read = f.read() f.close() read = int(read) if read == 0: s = 20 else: s = s msg = "download failed: %s %s" % (s,read) if s != 20: logger.error("[downfailed] url: %s; %s",url,msg) ############## end ################################ logger.info("[down] IP: %s; path: %s; downtime: %i; stats: %i; msg: %s", dname, fname, dtime, s, msg, ) httpreport(md5,s) def downlist(): #rec = re.compile('http[s]?:\/\/([^\/|:]+)[^\/]*(.*)\/(.+)\.(\w+)\??.*$') while True: op = [] try: tmp = Q.get(True,600) op = tmp.split('|') except: continue s = 0 md5 = '' if op[0] == "1": if len(op) != 4: continue try: httpdown(op[2],op[3]) except: continue elif op[0] == "2": if len(op) != 4: continue url = op[2] md5 = op[3] p1 = url.rfind('.') p2 = url.rfind('/') sname = url[p2+1:p1] hdir = ch.getring(sname)[0] rdir = "%s/%s%s" % (rootdir,hdir,url[:p2]) fname = "%s/%s.%s" % (rdir,sname,url[p1+1:]) try: os.remove(fname) s = 40 except OSError as e : s = 42 logger.error("[dellist] delete failed: md5: ;fname: %s; error: %s", fname, e.strerror, ) if e.errno == "2": s = 40 logger.info("[dellist] delete with: fname: %s", fname, ) httpreport(md5,s) elif op[0] == "9": break else: continue logger.debug("[exit downlist]") if __name__ == "__main__": try: opts, args = getopt.getopt(sys.argv[1:], "hc:r:") except getopt.GetoptError, err: print str(err) # will print something like "option -a not recognized" sys.exit(2) hashconf = "no" for o, a in opts: if o == "-h": print "python hashdown.py \n" sys.exit() elif o == '-c': hashconf = a elif o == '-r': rootdir = a logger = logging.getLogger() loghl = logging.handlers.RotatingFileHandler(hdown_option['logfile'],maxBytes=104857600, backupCount=50) fmt = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s') loghl.setFormatter(fmt) logger.addHandler(loghl) logger.setLevel(logging.INFO) daemonize('/dev/null','/tmp/hdown.out','/tmp/hdown.err',hdown_option['pidfile']) ppid = os.getpid() print "ppid: %i" % ppid dd = [] try: ff = file(hashconf,'r') except: print "Cant open hashconf file.\n" sys.exit(1) while True: line = ff.readline() if not line: break; l = line if line[-1] == "\n": l = line[:-1] dd.append(l) global ch ch = CHash(dd,'disk') for i in xrange(0,FORKNUM): pid = os.fork() if pid == 0: downlist() sys.exit() signal.signal(signal.SIGTERM,exit_handler) signal.signal(signal.SIGQUIT,exit_handler) signal.signal(signal.SIGUSR1,usr1_handler) http_server = tornado.httpserver.HTTPServer(Application()) http_server.listen(int(hdown_option['port'])) tornado.ioloop.IOLoop.instance().start() """ while True: for i in xrange(0,FORKNUM): if len(ppids) >= i: continue pid = os.fork() if pid == 0: downlist() ppids.remove(os.getpid()) break else: if len(ppids) == 0: signal.signal(signal.SIGTERM,exit_handler) signal.signal(signal.SIGQUIT,exit_handler) signal.signal(signal.SIGUSR1,usr1_handler) ppids.append(pid) if ppid != os.getpid(): break time.sleep(60) #usr1_handler(signal.SIGUSR1,None) """