shsmi_sysdb_nex/sde_sync.py

556 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
import fnmatch
import os
import sys
import arcpy
from functions import *
from sys_config import SysConfig
from logger import Logger
# EPS SDE 系统表列表
g_sys_tb = ['SS_BDCDYHINFO', 'SS_GISDBLINKINFO', 'SS_LOGINFO', 'SS_ROLEINFO', 'SS_ROLEMXDINFO',
'SS_TABSAPCEPERMISSIONINFO', 'SS_USERINFO', 'SUNWAY_INIINFO', 'SS_LAYERINFO',
'GE_ENTITYCODE', 'GE_ENTITYIDTB']
class SdeSynchronous:
def __init__(self, conf, log):
self.config = conf
self.logger = log
self.error_count = 0
self.src_sde = None
self.txt_ct_file = None # 记录数量的txt文件
self.out_gdb_path = None
self.out_gdb_files = {}
self.only_changed_layers = False
def run(self):
"""同步的主函数"""
# 1.连接SDE数据库
self.src_sde = self.get_connection(str(u"源库SDE连接")).encode('gb2312')
if self.src_sde == "" or self.src_sde is None:
self.logger.error("src_sde Connection file not exist!")
else:
self.create_gdb_files()
txt_path = self.out_gdb_path + "\\" + u"图层数量统计.txt"
self.txt_ct_file = open(txt_path, "wb+", buffering=0)
# 开始编辑SDE
edit = self.start_edit()
# 获取上次同步的时间
last_sync_datetime = self.get_last_sync_datetime()
if last_sync_datetime == "":
self.config.last_sync_datetime = self.config.curr_sync_datetime
else:
self.config.last_sync_datetime = last_sync_datetime
self.logger.log('===' + self.config.last_sync_datetime + "==TO==" + self.config.curr_sync_datetime + "===")
# 获取fc列表
fc_list = arcpy.ListFeatureClasses("*", "all")
# 获取同步工程信息
last_gongch, last_yewbh = self.get_projects_info()
self.write_start_event_logs(last_gongch)
# 输出增量
self.sde2gdb_feature_list(fc_list)
# 获取数据集列表
ds_list = arcpy.ListDatasets("*", "Feature")
# 逐个数据集输出
for DS in ds_list:
print("Process Dataset: " + DS.encode("gb2312"))
ds_name = DS[DS.rfind('.') + 1:]
sr = arcpy.Describe(DS).spatialReference
# 更改工作空间回到SDE
arcpy.env.workspace = self.src_sde
# 获取数据集中fc列表
fc_list = arcpy.ListFeatureClasses(feature_dataset=DS)
# 输出增量
self.sde2gdb_feature_list(fc_list)
# 获取关联关系表列表
re_table_list = arcpy.ListTables("{}.*".format(self.config.sde_scheme()), "all")
self.sde2gdb_table_list(re_table_list)
bool_save = True if self.error_count == 0 else False
if bool_save:
# 记录同步成功时间
self.set_last_sync_datetime()
# 停止编辑
self.stop_edit(edit, bool_save)
# 执行回调返回gdb增量输出结果
self.return_result("gdb", bool_save, last_gongch, last_yewbh)
# 结果输出到日志信息
str_message = u"输出增量GDB完成" if bool_save else u"输出增量GDB失败"
self.logger.log(str_message.encode("gb2312"))
arcpy.env.workspace = self.out_gdb_path + "\\" + self.out_gdb_files["Delete"]
delete_fcs = arcpy.ListFeatureClasses()
for FC in delete_fcs:
arcpy.AddField_management(FC, "FEATUREGUIDCOPY", "TEXT")
arcpy.CalculateField_management(FC, "FEATUREGUIDCOPY", "[FEATUREGUID]", "VB")
arcpy.DeleteIdentical_management(FC, ["FEATUREGUIDCOPY"])
arcpy.DeleteField_management(FC, ["FEATUREGUIDCOPY"])
if bool_save:
# 开始输出mdo
# self.copy_to_mdo(self.out_gdb_path)
# 转换到旧标准的gdb
self.trans_to_old()
# 输出保密的gdb
self.output_security_gdb()
# TODO 写旧标准输出完成的标记文档
# WriteFinishFlag
# 执行回调返回mdo输出结果
self.return_result("mdo", bool_save, last_gongch, last_yewbh)
# 结果输出到日志信息
str_message = u"输出增量MDO完成" if bool_save else u"输出增量MDO失败"
self.logger.log(str_message.encode("gb2312"))
def create_gdb_files(self):
try:
# 创建输出GDB
tt = self.config.curr_sync_datetime
self.out_gdb_path = self.config.pathname + "\\" + "NEWGDB\\" + tt[:tt.find(' ')]
create_dir(self.out_gdb_path)
self.out_gdb_files = {"Add": "addFeatures.gdb", "Edit": "editFeatures.gdb", "Delete": "deleteFeatures.gdb"}
for key in self.out_gdb_files.keys():
gdb_path = self.out_gdb_path + "\\" + self.out_gdb_files[key]
if arcpy.Exists(gdb_path):
arcpy.Delete_management(gdb_path)
arcpy.CreateFileGDB_management(self.out_gdb_path, self.out_gdb_files[key])
except Exception as e:
self.logger.error("create_gdb error:\n" + str(e))
def get_connection(self, sub_path):
"""获取SDE连接"""
for fileName in os.listdir(self.config.pathname.encode('gb2312') + "\\" + sub_path.encode("gb2312")):
if fnmatch.fnmatch(fileName, '*.sde'):
return self.config.pathname + "\\" + sub_path + "\\" + fileName
return ""
def start_edit(self):
"""开始编辑"""
arcpy.env.workspace = self.src_sde
print(self.src_sde)
edit = arcpy.da.Editor(self.src_sde)
edit.startEditing(False, False)
edit.startOperation()
return edit
def stop_edit(self, edit, bool_save):
"""停止编辑"""
arcpy.env.workspace = self.src_sde
# 停止编辑
edit.stopOperation()
edit.stopEditing(bool_save)
def write_start_event_logs(self, last_gongch):
"""反查更新的项目编号并写OA事件"""
for GongCH in last_gongch:
write_event_log("OutPutMap_GDB",
GongCH,
u"开始".encode('utf8'),
self.config.logfile_name.decode('gbk').encode('utf8'),
u"增量输出GDB文件开始".encode('utf8'))
# write_event_log("OutPutMap_MDO",
# GongCH,
# u"开始".encode('utf8'),
# self.config.logfile_name.decode('gbk').encode('utf8'),
# u"增量输出MDO文件开始".encode('utf8'))
def get_projects_info(self):
feature_class = self.config.sde_scheme() + "." + self.config.updateregion()
time_values = {"from_time": self.config.last_sync_datetime, "to_time": self.config.curr_sync_datetime}
time_condition = """UPDATETIME>=TO_DATE('{from_time}','YYYY-MM-DD HH24:MI:SS') and
DOWNLOADTIME<TO_DATE('{to_time}','YYYY-MM-DD HH24:MI:SS')""".format(**time_values)
f1, f2, f3 = "OBJECTID", "GONGCH", "YEWBH"
try:
with arcpy.da.SearchCursor(feature_class, [f1, f2, f3], where_clause=time_condition,
sql_clause=(None, 'ORDER BY OBJECTID DESC')) as cur:
gongch_list = [row[1] for row in cur]
yewbh_list = [row[2] for row in cur]
except arcpy.ExecuteError:
print(arcpy.GetMessages().encode('gb2312'))
self.logger.error(arcpy.GetMessages().encode('gb2312'))
return gongch_list, yewbh_list
def sde2gdb_feature_list(self, fc_list):
"""根据fc列表同步输出增量"""
for FC in fc_list:
str_msg = "Process Dataset: " + FC.encode("gb2312")
index = FC.rfind('.')
fc_name = FC[index + 1:]
# 跳过非指定表空间中的fc
tablespace_name = get_tag_tablespace(FC)
if tablespace_name != self.config.sde_scheme():
continue
# 跳过updateregion_qp
if fc_name in ['UpdateRegion_QP', 'NewCity', 'NewCity_A', 'PlanRegion']:
continue
# 只同步图元时跳过Et_ historyEt_开头的图层
index_et = fc_name.lower().rfind('et_')
index_h_et = fc_name.lower().rfind('historyet_')
if self.config.sync_type() == "TUY":
if index_h_et == 0 or index_et == 0:
continue
# 调试时筛选图层输出
if self.config.is_debug:
if fc_name not in ['Building_A', 'historyBuilding_A']:
continue
# 记录图层地物个数
self.record_count(fc_name)
print(str_msg)
# 输出增删改三文件数据
self.export_one_layer_3f(fc_name)
def sde2gdb_table_list(self, table_list):
"""增量同步table表记录"""
for TB in table_list:
str_msg = "Process Table: " + TB.encode("gb2312")
tb_name = TB[TB.rfind('.') + 1:]
# 跳过系统表
if tb_name in g_sys_tb:
continue
# 跳过二三维模型关联表 Re3D_ historyRe3D_ 开头的图层
index_3d = tb_name.lower().rfind('re3d_')
index_h_3d = tb_name.lower().rfind('historyre3d_')
if index_3d == 0 or index_h_3d == 0:
continue
# 只同步图元时跳过所有table
if self.config.sync_type == "TUY":
continue
# 调试时筛选表输出
if self.config.is_debug:
if tb_name not in ['Re_Building_A',
'historyRe_Building_A',
'ReEnt_Building_A',
'historyReEnt_Building_A']:
continue
# 记录图层地物个数
self.record_count(tb_name)
print(str_msg)
# 输出增删改三文件数据
self.export_one_table_3f(tb_name)
def export_one_layer_3f(self, fc_name):
"""输出一个FeatureClass的增量"""
last_max_object_id = self.get_last_max_object_id(fc_name)
index0 = fc_name.find("history")
if index0 == 0:
return
fc_history = "history" + fc_name
if not arcpy.Exists(fc_history):
return
expression_add, expression_edit, expression_delete = self.get_export_expression(fc_name)
self.logger.log("Exporting Layer:" + fc_name.encode("gb2312"))
try:
if self.is_export_one_layer(fc_name):
# 输出新增数据
add_gdb_path = self.out_gdb_path + "\\" + self.out_gdb_files["Add"]
self.featureclass_to_featureclass(fc_name, add_gdb_path, fc_name, expression_add)
# 输出修改数据
edit_gdb_path = self.out_gdb_path + "\\" + self.out_gdb_files["Edit"]
self.featureclass_to_featureclass(fc_name, edit_gdb_path, fc_name, expression_edit)
# 记录最大ObjectID
max_object_id = self.get_max_object_id(fc_name)
self.set_last_max_object_id(fc_name, max_object_id)
self.logger.log("LastMaxObjectID:" + str(last_max_object_id) + ", CurMaxObjectID:" + str(max_object_id))
elif self.is_export_one_layer(fc_history):
# 输出删除数据
delete_gdb_path = self.out_gdb_path + "\\" + self.out_gdb_files["Delete"]
self.featureclass_to_featureclass(fc_history, delete_gdb_path, fc_name, expression_delete)
max_object_id_his = self.get_max_object_id(fc_history)
self.set_last_max_object_id(fc_history, max_object_id_his)
self.logger.log(
"LastMaxObjectID:" + str(last_max_object_id) + ", CurMaxObjectID:" + str(max_object_id_his))
else:
self.logger.log("No Change Detected!")
except arcpy.ExecuteError:
str1 = str(arcpy.GetMessages().encode("gb2312"))
self.logger.error(str1)
self.error_count += 1
except Exception as e:
str1 = str(e.args[0]).encode("gb2312")
self.logger.error(str1)
self.error_count += 1
def export_one_table_3f(self, tb_name):
self.logger.log("TableToTable:" + tb_name.encode('gb2312'))
index0 = tb_name.find("history")
if index0 == 0:
return
tb_history = "history" + tb_name
if not arcpy.Exists(tb_history):
return
entity_index = tb_name.find("ReEnt_")
tb_type = "entity_rela" if entity_index == 0 else "elem_rela"
expression_add, expression_edit, expression_delete = self.get_export_expression(tb_name, tb_type)
try:
# 输出增删改三文件数据
add_gdb_path = self.out_gdb_path + "\\" + self.out_gdb_files["Add"]
self.table_to_table(tb_name, add_gdb_path, tb_name, expression_add)
edit_gdb_path = self.out_gdb_path + "\\" + self.out_gdb_files["Edit"]
self.table_to_table(tb_name, edit_gdb_path, tb_name, expression_edit)
delete_gdb_path = self.out_gdb_path + "\\" + self.out_gdb_files["Delete"]
self.table_to_table(tb_history, delete_gdb_path, tb_name, expression_delete)
except arcpy.ExecuteError:
errmsg = str(arcpy.GetMessages().encode("gb2312"))
self.logger.error(errmsg)
self.error_count += 1
def get_export_expression(self, fc_name, fc_type="fc"):
"""获取输出表达式"""
fc_history = "history" + fc_name
fc_current = fc_name
sql_dic = {"sys_time": self.get_sys_time_condition(False),
"sys_time_his": self.get_sys_time_condition(True),
"fc": fc_current,
"his_fc": fc_history,
"is_in_his": "NOT IN",
"last_time": self.config.last_sync_datetime,
"curr_time": self.config.curr_sync_datetime,
"up_time_field": self.config.updatetime_field(),
"del_time_field": self.config.updatetime_field_history(),
"guid_field": "FeatureGuid"}
if fc_type == "entity_rela":
sql_dic.update(guid_field="EntityRelaGUID")
elif fc_type == "elem_rela":
sql_dic.update(guid_field="ElemRelaGUID")
expression_his = "{sys_time_his} " \
"and ({his_fc}.{up_time_field}<TO_DATE('{last_time}','YYYY-MM-DD HH24:MI:SS') " \
"or {his_fc}.{up_time_field} IS NULL) " \
"and ({guid_field} NOT IN (SELECT {guid_field} FROM {fc}))"
expression_cur = "{sys_time} and {guid_field} {is_in_his} (SELECT {guid_field} FROM {his_fc} " \
"WHERE {del_time_field}>=TO_DATE('{last_time}','YYYY-MM-DD HH24:MI:SS') " \
"and {del_time_field}<TO_DATE('{curr_time}','YYYY-MM-DD HH24:MI:SS') " \
"and ({up_time_field}<TO_DATE('{last_time}','YYYY-MM-DD HH24:MI:SS') " \
"or {up_time_field} IS NULL))"
expression_del = expression_his.format(**sql_dic)
# 新增地物现势表存在,历史表不存在
expression_add = expression_cur.format(**sql_dic)
# 修改地物现势表存在,历史表也存在
sql_dic.update(is_in_his="IN")
expression_edit = expression_cur.format(**sql_dic)
return expression_add, expression_edit, expression_del
def get_last_max_object_id(self, fc_name):
"""获取上次同步记录的最大ObjectID"""
section_name = "{}_SHDB_LayerInfo_LastMaxID".format(self.config.sde_scheme())
with arcpy.da.SearchCursor("SUNWAY_INIINFO", ["STRINGVALUE"],
"SECTION='{}' and KEYNAME='{}'".format(section_name, fc_name)) as cur:
for row in cur:
last_id = int(row[0])
return last_id
return 0
def get_last_sync_datetime(self):
section_name = self.config.sde_scheme() + "_Dbsyn_gdb"
key_name = "LastExportTime"
expression = "SECTION='{}' and KEYNAME='{}'".format(section_name, key_name)
with arcpy.da.SearchCursor("SUNWAY_INIINFO", "STRINGVALUE", expression) as cur:
last_time = [row[0] for row in cur]
return last_time[0] if last_time else ""
def set_iniinfo_value_int(self, section_suffix, fc_name, value):
"""Helper function to update or insert a value."""
section_name = "{}_{}".format(self.config.sde_scheme(), section_suffix)
fields = ["SECTION", "KEYNAME", "STRINGVALUE"]
with arcpy.da.UpdateCursor("SUNWAY_INIINFO",
fields,
where_clause="SECTION='{}' and KEYNAME='{}'".format(section_name, fc_name)) as cur:
try:
row = next(cur)
last_value = row[2]
row[2] = str(value)
cur.updateRow(row)
except StopIteration: # If the cursor is empty
with arcpy.da.InsertCursor("SUNWAY_INIINFO", fields) as insert_cur:
insert_cur.insertRow((section_name, fc_name, str(value)))
last_value = ""
print("{}:{} {}-->{}".format(section_name, fc_name, last_value, value))
def set_last_max_object_id(self, fc_name, objectid):
"""记录最大ObjectID"""
self.set_iniinfo_value_int("SHDB_LayerInfo_LastMaxID", fc_name, objectid)
# 记录同步完成状态
self.set_iniinfo_value_int("SHDB_LayerInfo_SynchronousStatus", fc_name, 1)
def set_last_sync_datetime(self):
"""记录同步时间"""
section_name = self.config.sde_scheme() + "_Dbsyn_gdb"
key_name = "LastExportTime"
self.set_iniinfo_value_int(section_name, key_name, self.config.curr_sync_datetime)
def get_max_object_id(self, fc_name):
"""获取图层当前最大ObjectID"""
try:
with arcpy.da.SearchCursor(fc_name, "OBJECTID", where_clause="ObjectID>0",
sql_clause=(None, 'ORDER BY OBJECTID DESC')) as cur:
try:
return next(cur)[0]
except StopIteration:
return 0
except arcpy.ExecuteError as e:
errmsg = str(arcpy.GetMessages(2).encode("gb2312"))
print("Error occurred when accessing {}: {}".format(fc_name, errmsg))
self.logger.log("Error occurred when accessing {}: {}".format(fc_name, e))
return 0
def get_sys_time_condition(self, is_history):
"""当前增量同步时间"""
time_field = self.config.updatetime_field_history() if is_history else self.config.updatetime_field()
str_sql = "{}>=TO_DATE('{}','YYYY-MM-DD HH24:MI:SS') and {}<TO_DATE('{}','YYYY-MM-DD HH24:MI:SS')"
time_condition = str_sql.format(time_field, self.config.last_sync_datetime,
time_field, self.config.curr_sync_datetime)
return time_condition
def record_count(self, layer):
count = 0
layer_lower = layer.lower()
if layer_lower.find('history') == 0:
return
try:
with arcpy.da.SearchCursor(layer, ['ObjectID']) as rows:
for row in rows:
count += 1
msg = str(layer).encode("gb2312")
self.txt_ct_file.write(
str(u"{0}:{1}\r\n").encode('gb2312').format(msg, count))
except BaseException as e:
self.txt_ct_file.write(e)
def is_export_one_layer(self, fc_name):
"""控制图层是否输出"""
last_max_object_id = self.get_last_max_object_id(fc_name)
max_object_id = self.get_max_object_id(fc_name)
print("last_max_id:" + str(last_max_object_id) + " || current_max_id:" + str(max_object_id))
if self.only_changed_layers:
return last_max_object_id != max_object_id
else:
return True
def featureclass_to_featureclass(self, fc_name, out, fc_cur, expression):
"""启动python子进程输出feature到gdb"""
return self.fc2fc(fc_name, out, fc_cur, expression, u'feature')
def table_to_table(self, fc_name, out, fc_cur, expression):
"""启动python子进程输出table到gdb"""
return self.fc2fc(fc_name, out, fc_cur, expression, u'table')
def fc2fc(self, fc_name, out, fc_cur, expression, fc_type):
"""启动python子进程输出feature或者table到gdb"""
python_path = str(self.config.python_path() + u' ').encode('gb2312')
script = str(self.config.pathname + '\\sde_fc2fc.py ')
src_sde = str(self.src_sde + ' ')
type_cmd = str(fc_type + u' ').encode('gb2312')
fc_name_cmd = str(fc_name + u' ').encode('gb2312')
out_cmd = str(out + ' ')
fc_cur_cmd = str(fc_cur + u' ').encode('gb2312')
exp_cmd = str(u'"' + expression + u'" ')
# 启动命令参数
command_string = python_path + script + src_sde + type_cmd + fc_name_cmd + out_cmd + fc_cur_cmd + exp_cmd
success, out_msg = run_command(command_string)
print(out_msg)
if not success:
# 使外部可以捕获异常
raise Exception(out_msg)
return success
def return_result(self, file_type, b_save, gongch_list, yewbh_list):
"""返回信息"""
# file_type "gdb" "mdo" b_save true false
log_type = "OutPutMap_GDB" if file_type == "gdb" else "OutPutMap_MDO"
log_status = u"成功" if b_save else u"失败"
log_remark = u"增量输出文件成功" if b_save else u"增量输出文件失败"
result = "true" if b_save else "false"
for GongCH in gongch_list:
write_event_log(log_type,
GongCH,
log_status.encode('utf8'),
self.config.logfile_name.decode('gbk').encode('utf8'),
log_remark.encode('utf8'))
for YeWBH in yewbh_list:
output_map_result(YeWBH, "true", "", file_type)
def copy_to_mdo(self, path_name):
"""启动EPS输出MDO"""
eps_path = str(self.config.eps_path() + u' ').encode('gb2312')
template_name = self.config.template_name()
runscript = str(u'/runscript ' + template_name + u' ').encode('gb2312')
script_path = str(u'"' + self.config.eps_vbs() + u'" ').encode('gb2312')
show_setting = str(u'/showwindow "' + self.config.eps_show_window() + u'" /autoexit ').encode('gb2312')
share_parameter = str(u'/WriteShareParameter ProjectInfo,pathname,').encode('gb2312')
# 启动命令参数
command1 = eps_path + runscript + script_path + show_setting + share_parameter + path_name
timeout_command(command1, 28800)
def trans_to_old(self):
# 标准降级输出
self.logger.log("--Start Translate TO old--")
"""启动Python输出OldGDB"""
python_path = str(self.config.python_path() + u' ').encode('gb2312')
script = str(u'translate2old.py ').encode('gb2312')
# 启动命令参数
command1 = python_path + script + self.out_gdb_path
run_command(command1)
self.logger.log("--End Translate TO old--")
# 移动文件
# remove_file(g_outFolderPath + "\\" + "OldGDB", g_outNewFolderPath)
def output_security_gdb(self):
# 开始输出保密增删gdb
out_security_new_folder_path = self.out_gdb_path + "\\OldGDB\\SecurityGdb"
create_dir(out_security_new_folder_path)
add_and_edit_gdb = self.out_gdb_path + "\\OldGDB\\addAndEditFeatures.gdb"
delete_gdb = self.out_gdb_path + "\\OldGDB\\deleteFeatures.gdb"
security_add_and_edit_gdb = out_security_new_folder_path + "\\securityaddAndEditFeatures.gdb"
security_delete_gdb = out_security_new_folder_path + "\\securitydeleteFeatures.gdb"
arcpy.CreateFileGDB_management(out_security_new_folder_path, "securityaddAndEditFeatures.gdb") # 建立汇总数据库
arcpy.CreateFileGDB_management(out_security_new_folder_path, "securitydeleteFeatures.gdb")
# "增改"文件
arcpy.env.workspace = add_and_edit_gdb
fcs = arcpy.ListFeatureClasses()
print("=== start security trans ===")
for fc in fcs:
print("addandedit " + fc)
if fc == 'Pool_FlatRoof':
expression = "FEATURECode not in ('338100','338300')"
# 输出"保密增改"文件
arcpy.FeatureClassToFeatureClass_conversion(fc, security_add_and_edit_gdb, fc, expression)
elif fc not in ["Device_Point", "Device_Line", "Device_Building", "Device_Text", "Pipe_Text",
"Ocean_Polygon",
"Control_Point", "Control_Text", "UPL_L", "UPL_Text", "Level_Point_A", "Level_Point_R",
"Contour", "UpdateRegion_A"]:
# 输出"保密增改"文件
arcpy.FeatureClassToFeatureClass_conversion(fc, security_add_and_edit_gdb, fc)
# "删"文件
arcpy.env.workspace = delete_gdb
fcs = arcpy.ListFeatureClasses()
for fc in fcs:
print("delete " + fc)
if fc == 'Pool_FlatRoof':
expression = "FEATURECode not in ('338100','338300')"
# 输出"保密删"文件
arcpy.FeatureClassToFeatureClass_conversion(fc, security_delete_gdb, fc, expression)
elif fc not in ["Device_Point", "Device_Line", "Device_Building", "Device_Text", "Pipe_Text",
"Ocean_Polygon",
"Control_Point", "Control_Text", "UPL_L", "UPL_Text", "Level_Point_A", "Level_Point_R",
"Contour", "UpdateRegion_A"]:
# 输出"保密删"文件
arcpy.FeatureClassToFeatureClass_conversion(fc, security_delete_gdb, fc)
print("=== end security trans ===")