python main.py

作者在 2016-08-05 11:13:23 发布以下内容
# encoding: utf-8

import time, sys, os, multiprocessing, platform, gc, threading
import esl_init, core, updata, query, bind, xmlserver, database, api20
import osd, beacom, ftp_server, netlink, summary, htp, dot_temp
from daemon import Daemon
import heartbeat, Queue, socket
from esl_init import conf
from apv3_connector import APv3Connector
from util import func
from lcd_decode import lcddecode, RendZone

db = database.DB()

version = '1.3.6_rc7'
release = conf.system.release
if release == 'yes':
	version = version[0:version.index('_')]

try:
	import win32serviceutil
	import win32service
	import win32event
	windows_service = True
except Exception, e:
	windows_service = False

if windows_service:
	class SmallestPythonService(win32serviceutil.ServiceFramework):
		_svc_name_ = "esl-working"
		_svc_display_name_ = "esl-working"
		def __init__(self, args):
			win32serviceutil.ServiceFramework.__init__(self, args)
			# Create an event which we will use to wait on.
			# The "service stop" request will set this event.
			self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)
			self.stop = False
			self.process_stop = False

		def SvcStop(self):
			# Before we do anything, tell the SCM we are starting the stop process.
			self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
			# And set my event.
			win32event.SetEvent(self.hWaitStop)
			self.stop = True

			time_out = 0
			while True:
				time.sleep(1)
				time_out = time_out + 1
				esl_init.log.info('start exit =========%s' % time_out)
				if time_out >= 60:
					break
				if self.process_stop:
					break

		def SvcDoRun(self):
			# 把你的程序代码放到这里就OK了
			path = os.path.split(os.path.realpath(__file__))[0]
			os.chdir(path)#切换工作路径
			create_process(opt = self)		
			
			win32event.WaitForSingleObject(self.hWaitStop, win32event.INFINITE)
			self.process_stop = True

def is_port_free(prot, port):
	s = socket.socket(socket.AF_INET, prot) #socket.SOCK_STREAM)
	try:
		s.connect(('127.0.0.1', int(port)))
		s.shutdown(2)
		print "network port %s is in use, please check it again" % port
		return False
	except Exception, e:
		return True

def is_system_clean():
	#先检查网络端口
	if not (is_port_free(socket.SOCK_STREAM, conf.extend_service.xml_rpc_port)
		and is_port_free(socket.SOCK_STREAM, conf.extend_service.bind_listen_port)
		and is_port_free(socket.SOCK_STREAM, conf.extend_service.apv3_listen_port)
		and is_port_free(socket.SOCK_STREAM, conf.extend_service.summary_html_port)):
		return False
	
	if conf.htp.internal_ftp.lower() == 'yes' \
		and (not is_port_free(socket.SOCK_STREAM, conf.htp.ftp_port)):
		return False
	
	return True

def store_hb(task_data):
	storeid = conf.system.ids
	esl_init.log.info("store [%s] heardbeat" % (storeid))
	task_data['threading_event'].clear() # 设置event标志为False, 此时心跳接口调用e_flag.wait()阻塞线程
	try:
		xmlserver.resend_failed_msg()
		core.save_esl_power_info(task_data['esl_power_info'])
		heartbeat.auto_refresh_bind_at_hour_fun()
		xmlserver.check_reply_ack_q()
		if time.time() - task_data['startup_time'] >= conf.system.ap_heardbeat_time:
			bind.check_ap_timeout(db, conf.system.ap_heardbeat_time, everytime = True)
	except KeyboardInterrupt:
		raise KeyboardInterrupt, "KeyboardInterrupt"
	except Exception, e:
		esl_init.log.error_msg("store_hb:%s" % e, errid='EMA001')
	finally:
		task_data['threading_event'].set()

def create_task_data():
	task_data = {}
	for task_name in ['updata', 'bind', 'query', 'beacom', 'netlink']:
		task_data.setdefault(task_name, {})
		task_data[task_name]['in_queue'] = Queue.Queue() #multiprocessing.Queue() 
		task_data[task_name]['out_queue'] = Queue.Queue()#multiprocessing.Queue() 
		task_data[task_name]['work_list'] = {} # 更新次数,更新第几次
		task_data[task_name]['all_info'] = {} # 存储价签信息

	task_data['updata']['action'] = updata.action
	task_data['bind']['action'] = bind.action
	task_data['query']['action'] = query.action
	task_data['beacom']['action'] = beacom.action
	task_data['netlink']['action'] = netlink.action

	task_data['updata']['ap_check_list'] = {} #beacom.ap_check_list # 漫游列表
	task_data['bind']['ap_check_list'] = {} # beacom.ap_check_list
	task_data['query']['ap_check_list'] = {} #beacom.ap_check_list
	task_data['beacom']['ap_check_list'] = {} #beacom.ap_check_list #不再共用
	task_data['netlink']['ap_check_list'] = {} #beacom.ap_check_list

	#一些只有q但没有处理单元的, 其处理单元都借用query模块的
	for small_name in ['set_only_q']:
		task_data.setdefault(small_name, {})
		task_data[small_name]['in_queue'] = Queue.Queue()
		task_data[small_name]['work_list'] = {}
		task_data[small_name]['out_queue'] = task_data['query']['out_queue']
		task_data[small_name]['action'] = task_data['query']['action']
		task_data[small_name]['ap_check_list'] = task_data['query']['ap_check_list']
		task_data[small_name]['all_info'] = {} # 存储价签信息

	task_data['ppid'] = os.getpid()
	
	task_data['updata']['buf_in_queue'] = Queue.Queue() #multiprocessing.Queue()
	task_data['updata']['updata_dict'] = {} # 更新数据结构
	task_data['lowpower_counter'] = {} #multiprocessing.Manager().dict()
	task_data['osd_last_ack'] = {} #multiprocessing.Manager().dict()
	task_data['netlink']['rf_para'] = {} #multiprocessing.Manager().dict()
	task_data['ap_level'] = {} #multiprocessing.Manager().dict()
	task_data['esl_power_info'] = {} #multiprocessing.Manager().dict()
	task_data['esl_priority'] = {} #multiprocessing.Manager().dict()
	task_data['ap_work_time'] = {} #multiprocessing.Manager().dict()
	task_data["all_info"] = {}
	task_data['esl_info'] = {}
	task_data['esl_power_his'] = {}
	task_data['esl_power_last_time'] = {} #存储价签最近一次的心跳时间
	task_data['esl_level_change_count'] = {}

	task_data['reply_ack_q'] = Queue.Queue()
	task_data['isquit'] = {}

	task_data['threading_event'] = threading.Event() # 标志位
	task_data['threading_event'].set() # 设置标志位为True
	task_data['apset'] = {} # 生成set_list.ini

	task_data['bind_udp_q'] = Queue.Queue(maxsize = 1000000)
	task_data['ack_count'] = {}
	task_data['rf_para_is_changed'] = {}

	task_data['startup_time'] = time.time()
	task_data['system_version'] = version

	esl_init.log.set_uplink_q(task_data['reply_ack_q'])
	
	return task_data

def create_process(opt = None):
	'''
	程序设置为deamon后,将分别启动核心进程,更新进程,绑定进程,查询进程,xml-prc进程。
	并且将一直循环(1秒一次)检测这些进程的状态,如果进程因为异常而导致退出,则重新启动之
	'''

	from xmlserver import reply_ack

	try:
		try:
			db.check_db()
		except Exception, e:
			esl_init.print_except()
			esl_init.log.error("check db error: %s" % e)

		task_data = create_task_data()
		task_module = [core, xmlserver, updata, bind, query, beacom, ftp_server, netlink, summary]
		for t1 in ['core', 'xmlserver', 'updata', 'bind', 'query', 'beacom', 'ftp_server', 'netlink', 'summary']:
			task_data['isquit'][t1] = False
	
		#先加在APSET文件, apv3模块要用
		if not os.path.isfile('config/set_list.ini'):
			netlink.load_apset_info(task_data['apset']) # 生成set_list.ini
		else:
			task_data['apset'] = netlink.read_setid_info()
		
		if conf.extend_service.apv3_enable.lower() == 'yes':
			apv3 = APv3Connector(task_data['bind_udp_q'], task_data['apset'])
			apv3.start()
			task_data['apv3'] = apv3
			esl_init.log.info("apv3 starting")

		inner_ftp = conf.htp.internal_ftp.upper()
		if inner_ftp == 'NO':
			try:
				task_module.remove(ftp_server)
			except Exception ,e:
				pass

		core.load_esl_power_info(task_data['esl_power_info'], db.get_all_ap())
		task_data['esl_info'] = heartbeat.load_esl_info(task_data['esl_power_info'].keys(), db)
		task_list = [task.start(task_data) for task in task_module]
		esl_init.check_htp_kickoff_file()
		n = 0
		platform_os = platform.system()
		pid = task_data['ppid']
		updata_restart_n = 0
		
	except KeyboardInterrupt, e:
		system_exit(task_data)
		esl_init.log.warning("ESL system exit")
		return
	
	esl_init.log.info("ESL system starting, version %s" % version)
	reply_ack("RESEND_UPDATE", [{"system_version":task_data['system_version'], "startup_time":\
				time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(task_data['startup_time']))}], None)

	try:
		while True:
			for task in task_list:
				#如果task死掉
				if not task.is_alive():
					task.join(timeout=1)
					id1 = task_list.index(task)
					task_list[id1] = task_module[id1].start(task_data)

			time.sleep(1)
			n += 1
			if n % 5 == 0:
				try:
					esl_init.reload_config(conf)
					osd.watch_temp_change()
					esl_init.check_log_size()
					if time.time() - task_data['startup_time'] >= conf.system.ap_heardbeat_time:
						bind.check_ap_timeout(db, conf.system.ap_heardbeat_time)
				except KeyboardInterrupt, e:
					raise KeyboardInterrupt, e
				except Exception, e:
					esl_init.log.error_msg("esl watch event error %s" % e, errid='EMA002')

			if n == int(conf.system.store_heardbeat_time):
				n = 0
				store_hb(task_data)
				#如果接口服务死掉
				if bind.check_bind_listen_alive() == False:
					task_data['isquit']['bind'] = True
					task_list[task_module.index(bind)].join()
					task_data['isquit']['bind'] = False

				if xmlserver.is_xmlserver_alive() == False:
					task_data['isquit']['xmlserver'] = True
					task_list[task_module.index(xmlserver)].join()				
					task_data['isquit']['xmlserver'] = False

				gc.collect()
			if windows_service and opt and opt.stop:
				raise KeyboardInterrupt, "stop"
	except KeyboardInterrupt, e:	
		esl_init.log.warning("system exit, wait for save file")
		core.sava_ap_list(db)
		core.save_esl_power_info(task_data['esl_power_info'])
		netlink.load_apset_info(task_data['apset'])#apset_list.ini

		system_exit(task_data)

		if platform.system() == 'Windows':
			multiprocessing.Process(target = kill_main_task, args = (pid,)).start()
		for task in task_list:	
			task.join()
	except Exception, e:
		pass
	finally:
		esl_init.log.warning("ESL system exit")

def system_exit(task_data):
	for t1 in ['core', 'xmlserver', 'updata', 'bind', 'query', 'beacom', 'ftp_server', 'netlink', 'summary']:
		task_data['isquit'][t1] = True
	#summary.http_server_exit() #容易引发104 错误
	if conf.extend_service.apv3_enable.lower() == 'yes':
		task_data['apv3'].stop()
		esl_init.log.info("apv3 exit")
	ftp_server.ftp_exit()

def kill_main_task(pid):
	try:
		time.sleep(10)
		os.kill(pid, 9)
	except Exception, e:
		pass

class TestDaemon(Daemon):
	'''
	继承Daemon类,覆写run函数
	'''
	def run(self):
		create_process()

def main_run():
	'''
	程序入口函数,主要作用是实例化一个daemon对象,将自身设置为daemon进程。
	程序执行时需要提供1个而外的参数,[start|stop|restart], 用于指示启动、停止后者重启程序。
	'''
	try:
		if len(sys.argv) <= 1:
			if platform.system() == 'Windows' and windows_service:
				print "Usage: python %s [--startup auto install|remove|start|stop|restart]" % sys.argv[0]	
			else:
				print "Usage: python %s [start|stop|restart]" % sys.argv[0]	
			return 

		if sys.argv[1] in ['start', 'restart', 'install'] and (not is_system_clean()):
			print "system start failed"
			return

		if platform.system() == 'Windows' and windows_service:
			if sys.argv[1] not in ['start', 'stop', 'restart', 'install', 'remove', '--startup']:
				print "Usage: python %s [--startup auto install|remove|start|stop|restart]" % sys.argv[0]	
				return 
		else:
			if sys.argv[1] not in ['start', 'stop', 'restart']:
				print "Usage: python %s [start|stop|restart]" % sys.argv[0]
				return 

		is_covtest()

		if platform.system() == 'Linux' and "notdaemon" not in sys.argv:
			test_daemon = TestDaemon(conf.system.lock_file, home_dir = os.getcwd(), username='hanshow')
			if sys.argv[1] == 'start':
				test_daemon.start()
			elif sys.argv[1] == 'stop':
				test_daemon.stop()
			elif sys.argv[1] == 'restart':
				test_daemon.restart()
		else:
			if windows_service:
				win32serviceutil.HandleCommandLine(SmallestPythonService)
			else:
				esl_init.log.warning("can't detect python_win32_module, esl-working not running as service")
				create_process()
	except IOError, e:
		pass
	except socket.error, e:
		pass
	except KeyboardInterrupt:
		pass

def is_covtest():
	if "covtest_normal" in sys.argv:
		conf.db_file.db_file = 'tools/db.sqlite'
		conf.system.dot_temp_dir = "./tools"
	elif "covtest_wm" in sys.argv:
		conf.db_file.db_file = 'tools/wm/db.sqlite'
		conf.system.dot_temp_dir = "./tools/wm/temp_json"
		conf.extend_service.apv3_enable = "no"
		conf.htp.vitrual_htp = "yes"
		conf.htp.vitrual_pass_rate = 95
	elif "covtest_wm_all_failed" in sys.argv:
		conf.db_file.db_file = 'tools/wm/db.sqlite'
		conf.system.dot_temp_dir = "./tools/wm/temp_json"
		conf.extend_service.apv3_enable = "no"
		conf.htp.vitrual_htp = "yes"
		conf.htp.vitrual_pass_rate = 0

def self_test():
	esl_init.log.warning("start self test mode")
	import tools
	for m in [tools.heartbeat_test, 
				netlink, xmlserver, database, api20, htp, osd, updata, func,
				lcddecode, dot_temp, RendZone, bind,
				]:
		m.self_test()

if __name__ == '__main__':
	# reload(sys)
	# sys.setdefaultencoding('utf-8')
	if sys.platform.startswith('win'):
		multiprocessing.freeze_support()
	if 'test' in sys.argv:
		self_test()
		print "test finished"
	else:
		main_run()

公司 | 阅读 55073 次
文章评论,共0条
游客请输入验证码
浏览2344044次