Files
sunvpy-docs/docs/content/29-zi-ding-yi-shu-ju-jiao-huan-liu-cheng.md
2026-04-10 13:47:53 +08:00

34 KiB
Raw Permalink Blame History

本页面详细讲解如何构建自定义数据交换流程,将基础的数据导入导出功能组合成复杂的工作流。通过将 DataXScript 与 SSProcessManager 集合,并结合选择集操作、数据处理和进度跟踪等功能,您可以实现自动化的批量数据转换、质量检查和多格式同步导出等高级场景。

Sources: PySSDataX.py | PySSProcess.py

自定义数据交换架构

自定义数据交换流程的核心思想是将数据交换作为数据处理工作流中的一个环节,与选择集管理、属性编辑、数据验证等功能有机结合。这种架构允许在数据导入后进行预处理,在数据导出前进行质量检查,实现端到端的自动化数据处理。

graph TB
    A[开始自定义数据交换流程] --> B{选择流程类型}
    B -->|简单导入导出| C[基础模式]
    B -->|带数据预处理| D[处理模式]
    B -->|批量文件处理| E[批量模式]
    B -->|多格式同步导出| F[同步模式]
    
    C --> C1[配置参数]
    C1 --> C2[执行交换]
    C2 --> C3[结果验证]
    
    D --> D1[数据导入]
    D1 --> D2[选择集筛选]
    D2 --> D3[数据转换]
    D3 --> D4[质量检查]
    D4 --> D5[数据导出]
    
    E --> E1[遍历文件列表]
    E1 --> E2[逐个导入]
    E2 --> E3[进度跟踪]
    E3 --> E4[错误恢复]
    
    F --> F1[数据准备]
    F1 --> F2[配置多目标参数]
    F2 --> F3[并行/串行导出]
    F3 --> F4[结果汇总]
    
    C3 --> G[流程完成]
    D5 --> G
    E4 --> G
    F4 --> G

自定义数据交换流程基于以下核心组件构建:DataXScript 负责数据交换,SSProcessManager 提供工作流控制,Mixin 模块 提供日志、进度等辅助功能。这种模块化设计使您能够根据需求灵活组合功能。

Sources: PySSDataX.py | PySSProcess.py

模式一:带数据预处理的交换流程

这种模式适用于需要在数据导入后进行数据处理再导出的场景,例如数据清洗、属性标准化、坐标转换等。典型的应用包括:将外部数据导入后根据业务规则修改属性,然后导出为标准格式。

流程架构与步骤

flowchart TD
    Start([开始预处理交换流程]) --> Init[初始化 SSProcessManager]
    Init --> InitDataX[创建 DataXScript 实例]
    InitDataX --> ConfigImport[配置导入参数]
    ConfigImport --> ExecuteImport[执行 importData]
    ExecuteImport --> CheckImport{导入成功?}
    CheckImport -->|否| LogImportError[记录导入错误] --> ErrorEnd([流程失败])
    CheckImport -->|是| SelectData[筛选需要处理的数据]
    SelectData --> SetCondition[设置筛选条件]
    SetCondition --> Filter[执行 selectFilter]
    Filter --> CheckData{有选择集?}
    CheckData -->|否| NoDataEnd([无数据需处理])
    CheckData -->|是| ProcessData[遍历选择集处理数据]
    ProcessData --> ModifyAttributes[修改属性]
    ModifyAttributes --> SaveChanges[保存到数据库]
    SaveChanges --> ConfigExport[配置导出参数]
    ConfigExport --> ExecuteExport[执行 exportData]
    ExecuteExport --> CheckExport{导出成功?}
    CheckExport -->|否| LogExportError[记录导出错误] --> ErrorEnd
    CheckExport -->|是| VerifyResult[验证输出文件]
    VerifyResult --> SuccessEnd([流程成功完成])

完整代码示例

以下是一个完整的带数据预处理的数据交换流程示例,将 Shapefile 数据导入后,根据业务规则修改道路对象的属性,然后导出为 MDB 格式。

from sunvpy.PySSDataX import DataXScript
from sunvpy.PySSProcess import SSProcess

def preprocess_and_exchange():
    """
    带数据预处理的数据交换流程:
    1. 导入道路 Shapefile 数据
    2. 根据道路等级修改属性
    3. 导出为 MDB 格式
    """
    
    # ========== 第一阶段:初始化 ==========
    print("=== 初始化数据交换流程 ===")
    
    # SSProcessManager 已经是全局单例,无需创建
    # DataXScript 需要为每次交换创建新实例
    import_datax = DataXScript()
    
    # ========== 第二阶段:数据导入 ==========
    print("\n[阶段 1/4] 导入原始数据...")
    
    import_datax.setDataXParameter("ImportPath", "D:\\data\\input\\roads.shp")
    import_datax.setDataXParameter("SourceEncoding", "GBK")
    import_datax.setDataXParameter("CoordSystem", "EPSG:4326")
    import_datax.setDataXParameter("LayerMapping", "道路")
    import_datax.setDataXParameter("OverwriteExisting", "true")
    
    import_success = import_datax.importData()
    
    if not import_success:
        print("错误:数据导入失败")
        return False
    
    import_file = import_datax.getImportFileName()
    print(f"成功导入: {import_file}")
    
    # 清理导入参数
    import_datax.clearDataXParameter()
    
    # ========== 第三阶段:数据预处理 ==========
    print("\n[阶段 2/4] 执行数据预处理...")
    
    # 清空之前的选择集和条件
    SSProcess.clearSelection()
    SSProcess.clearSelectCondition()
    
    # 筛选所有道路对象
    SSProcess.setSelectCondition("SSObj_LayerName", "==", "道路")
    SSProcess.selectFilter()
    
    obj_count = SSProcess.getSelGeoCount()
    print(f"找到 {obj_count} 个道路对象")
    
    if obj_count == 0:
        print("警告:没有找到需要处理的道路对象")
        return False
    
    # 创建撤销标记,支持撤销操作
    SSProcess.pushUndoMark("道路属性标准化")
    
    # 遍历选择集,根据道路等级修改属性
    modified_count = 0
    for i in range(obj_count):
        # 读取道路等级属性(假设存在 RoadGrade 字段)
        road_grade = SSProcess.getSelGeoValue(i, "RoadGrade")
        
        # 根据等级设置不同的线型和颜色
        if road_grade == "高速":
            SSProcess.setSelGeoValue(i, "SSObj_LineWidth", "3")
            SSProcess.setSelGeoValue(i, "SSObj_Color", "255")  # 红色
        elif road_grade == "省道":
            SSProcess.setSelGeoValue(i, "SSObj_LineWidth", "2")
            SSProcess.setSelGeoValue(i, "SSObj_Color", "16711680")  # 蓝色
        elif road_grade == "县道":
            SSProcess.setSelGeoValue(i, "SSObj_LineWidth", "1.5")
            SSProcess.setSelGeoValue(i, "SSObj_Color", "65280")  # 绿色
        else:
            SSProcess.setSelGeoValue(i, "SSObj_LineWidth", "1")
            SSProcess.setSelGeoValue(i, "SSObj_Color", "0")  # 黑色
        
        # 添加数据来源标记
        SSProcess.setSelGeoValue(i, "DataSource", "预处理流程")
        
        modified_count += 1
    
    print(f"已修改 {modified_count} 个道路对象的属性")
    
    # 批量保存到数据库
    SSProcess.saveBufferToDatabase()
    print("数据预处理完成,已保存到数据库")
    
    # ========== 第四阶段:数据导出 ==========
    print("\n[阶段 3/4] 导出处理后的数据...")
    
    export_datax = DataXScript()
    
    export_datax.setDataXParameter("ExportPath", "D:\\data\\output\\roads_processed.mdb")
    export_datax.setDataXParameter("ExportFormat", "MDB")
    export_datax.setDataXParameter("TargetEncoding", "UTF-8")
    export_datax.setDataXParameter("CoordSystem", "EPSG:4490")  # 转换为 CGCS2000
    export_datax.setDataXParameter("LayerFilter", "道路")
    export_datax.setDataXParameter("IncludeAttributes", "true")
    
    export_success = export_datax.exportData()
    
    if not export_success:
        print("错误:数据导出失败")
        return False
    
    print("数据导出成功")
    export_datax.clearDataXParameter()
    
    # ========== 第五阶段:结果验证 ==========
    print("\n[阶段 4/4] 验证输出结果...")
    
    # 重新导入验证
    verify_datax = DataXScript()
    verify_datax.setDataXParameter("ImportPath", "D:\\data\\output\\roads_processed.mdb")
    verify_success = verify_datax.importData()
    
    if verify_success:
        print("输出文件验证成功")
    else:
        print("警告:输出文件验证失败,请检查文件完整性")
    
    print("\n=== 数据交换流程完成 ===")
    return True

# 执行流程
if __name__ == "__main__":
    success = preprocess_and_exchange()
    if success:
        print("\n流程执行成功")
    else:
        print("\n流程执行失败")

Sources: PySSDataX.py | PySSProcess.py | ssprocess_mixins/geo_edit_mixin.py

关键步骤对比表

步骤 基础导入导出 带预处理的交换 增强功能
导入配置 设置 ImportPath 等参数 相同 相同
导入执行 importData() 相同 相同
数据处理 筛选选择集 + 修改属性 集成业务逻辑
缓存管理 无需要 saveBufferToDatabase() 支持撤销
导出配置 设置 ExportPath 等参数 相同 增加转换参数
结果验证 可选 重新导入验证 自动化验证

Sources: PySSDataX.py | ssprocess_mixins/geo_edit_mixin.py

模式二:批量文件处理流程

当需要处理大量同类文件时,批量文件处理模式可以显著提高效率。该模式遍历指定目录中的文件,逐个执行数据交换,并提供进度跟踪、错误恢复和结果汇总功能。

批量处理架构

flowchart TD
    Start([开始批量处理]) --> ScanDir[扫描目录获取文件列表]
    ScanDir --> InitProgress[初始化进度条]
    InitProgress --> LoopStart{还有文件?}
    LoopStart -->|否| GenerateReport[生成处理报告]
    GenerateReport --> End([批量处理完成])
    LoopStart -->|是| GetNextFile[获取下一个文件]
    GetNextFile --> ValidateFile{文件有效?}
    ValidateFile -->|否| LogSkip[记录跳过] --> UpdateProgress1[更新进度]
    ValidateFile -->|是| CreateDataX[创建 DataXScript 实例]
    CreateDataX --> ConfigParams[配置参数]
    ConfigParams --> ExecuteProcess[执行导入导出]
    ExecuteProcess --> CheckSuccess{操作成功?}
    CheckSuccess -->|否| LogError[记录错误] --> CheckRetry{允许重试?}
    CheckRetry -->|是| Retry[重试当前文件] --> ConfigParams
    CheckRetry -->|否| UpdateProgress2[更新进度]
    CheckSuccess -->|是| UpdateProgress3[更新进度]
    UpdateProgress1 --> LoopStart
    UpdateProgress2 --> LoopStart
    UpdateProgress3 --> LoopStart

批量处理完整示例

以下是一个批量文件处理示例,将多个 Shapefile 文件批量导入并统一导出为 MDB 格式,集成进度跟踪和错误处理功能。

from sunvpy.PySSDataX import DataXScript
from sunvpy.PySSProcess import SSProcess
from sunvpy.PySSWidget import startProgress, setProgressRange, stepProgress, closeProgress
import os
from pathlib import Path
from datetime import datetime

def batch_exchange_files(input_dir, output_dir, max_retries=3):
    """
    批量数据交换流程:
    将指定目录中的所有 Shapefile 批量导入并导出为 MDB 格式
    
    参数:
        input_dir: 输入目录路径
        output_dir: 输出目录路径
        max_retries: 最大重试次数
    """
    
    # ========== 第一阶段:准备阶段 ==========
    print(f"=== 批量数据交换流程开始 ===")
    print(f"输入目录: {input_dir}")
    print(f"输出目录: {output_dir}")
    print(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    
    # 确保输出目录存在
    os.makedirs(output_dir, exist_ok=True)
    
    # 扫描输入目录获取所有 .shp 文件
    input_files = list(Path(input_dir).glob("*.shp"))
    
    if not input_files:
        print("错误:输入目录中没有找到 Shapefile 文件")
        return False
    
    total_files = len(input_files)
    print(f"找到 {total_files} 个文件需要处理")
    
    # 初始化统计变量
    success_count = 0
    failed_count = 0
    skipped_count = 0
    error_details = []
    
    # ========== 第二阶段:初始化进度条 ==========
    SSProcess.startProgress("批量数据交换", total_files)
    
    # ========== 第三阶段:批量处理 ==========
    for index, input_file in enumerate(input_files):
        file_name = input_file.name
        print(f"\n处理文件 [{index+1}/{total_files}]: {file_name}")
        
        try:
            # 创建 DataXScript 实例
            datax = DataXScript()
            
            # 配置导入参数
            datax.setDataXParameter("ImportPath", str(input_file))
            datax.setDataXParameter("SourceEncoding", "GBK")
            datax.setDataXParameter("CoordSystem", "EPSG:4326")
            datax.setDataXParameter("OverwriteExisting", "true")
            
            # 执行导入(带重试机制)
            import_success = False
            retry_count = 0
            
            while retry_count < max_retries and not import_success:
                if retry_count > 0:
                    print(f"  重试第 {retry_count} 次...")
                
                import_success = datax.importData()
                retry_count += 1
            
            if not import_success:
                print(f"  错误:文件导入失败(已重试 {max_retries} 次)")
                failed_count += 1
                error_details.append(f"{file_name}: 导入失败")
                SSProcess.stepProgress(f"失败: {file_name}")
                continue
            
            # 获取导入后的文件名
            import_file_name = datax.getImportFileName()
            print(f"  导入成功: {import_file_name}")
            
            # 清理导入参数
            datax.clearDataXParameter()
            
            # 配置导出参数
            output_file = Path(output_dir) / f"{input_file.stem}.mdb"
            datax.setDataXParameter("ExportPath", str(output_file))
            datax.setDataXParameter("ExportFormat", "MDB")
            datax.setDataXParameter("TargetEncoding", "UTF-8")
            datax.setDataXParameter("CoordSystem", "EPSG:4490")
            datax.setDataXParameter("IncludeAttributes", "true")
            
            # 执行导出(带重试机制)
            export_success = False
            retry_count = 0
            
            while retry_count < max_retries and not export_success:
                if retry_count > 0:
                    print(f"  重试第 {retry_count} 次...")
                
                export_success = datax.exportData()
                retry_count += 1
            
            if not export_success:
                print(f"  错误:文件导出失败(已重试 {max_retries} 次)")
                failed_count += 1
                error_details.append(f"{file_name}: 导出失败")
                SSProcess.stepProgress(f"失败: {file_name}")
                continue
            
            print(f"  导出成功: {output_file.name}")
            success_count += 1
            SSProcess.stepProgress(f"成功: {file_name}")
            
            # 清理导出参数
            datax.clearDataXParameter()
            
        except Exception as e:
            print(f"  异常:{str(e)}")
            failed_count += 1
            error_details.append(f"{file_name}: 异常 - {str(e)}")
            SSProcess.stepProgress(f"异常: {file_name}")
    
    # ========== 第四阶段:清理进度条 ==========
    SSProcess.closeProgress()
    
    # ========== 第五阶段:生成报告 ==========
    print(f"\n=== 批量处理完成 ===")
    print(f"总文件数: {total_files}")
    print(f"成功处理: {success_count}")
    print(f"处理失败: {failed_count}")
    print(f"跳过文件: {skipped_count}")
    print(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    
    if error_details:
        print("\n错误详情:")
        for detail in error_details:
            print(f"  - {detail}")
    
    # 生成报告文件
    report_file = Path(output_dir) / "batch_report.txt"
    with open(report_file, 'w', encoding='utf-8') as f:
        f.write("批量数据交换处理报告\n")
        f.write("=" * 50 + "\n")
        f.write(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
        f.write(f"输入目录: {input_dir}\n")
        f.write(f"输出目录: {output_dir}\n")
        f.write(f"总文件数: {total_files}\n")
        f.write(f"成功处理: {success_count}\n")
        f.write(f"处理失败: {failed_count}\n")
        f.write(f"跳过文件: {skipped_count}\n")
        f.write("\n错误详情:\n")
        for detail in error_details:
            f.write(f"  {detail}\n")
    
    print(f"\n处理报告已保存到: {report_file}")
    
    return failed_count == 0

# 执行批量处理
if __name__ == "__main__":
    input_directory = r"D:\data\batch_input"
    output_directory = r"D:\data\batch_output"
    
    success = batch_exchange_files(input_directory, output_directory)
    
    if success:
        print("\n所有文件处理成功")
    else:
        print("\n部分文件处理失败,请查看错误详情")

Sources: PySSDataX.py | PySSProcess.py | ssprocess_mixins/progress_mixin.py

批量处理优化技巧

优化项 优化方法 效果
进度跟踪 使用 SSProcess 的进度条功能 提供实时反馈
错误恢复 实现重试机制 提高成功率
资源清理 及时 clearDataXParameter() 避免内存泄漏
异步处理 可考虑多进程处理(需谨慎) 提高吞吐量
报告生成 自动生成处理报告 便于审计和追溯

Sources: PySSDataX.py | ssprocess_mixins/progress_mixin.py

模式三:带日志记录和进度跟踪的增强流程

在生产环境中,完整的数据交换流程应该包含详细的日志记录和可视化的进度跟踪。这种模式特别适合长时间运行的复杂交换任务,便于问题排查和进度监控。

增强流程架构

graph TB
    Start([开始增强流程]) --> SetupLogger[配置日志记录器]
    SetupLogger --> LogStart[记录开始信息]
    LogStart --> SetupProgress[配置进度条]
    SetupProgress --> InitDataX[创建 DataXScript 实例]
    InitDataX --> ConfigParams[配置参数]
    ConfigParams --> LogParams[记录参数配置]
    LogParams --> Execute[执行数据交换]
    Execute --> UpdateProgress[更新进度]
    UpdateProgress --> CheckResult{成功?}
    CheckResult -->|是| LogSuccess[记录成功信息]
    CheckResult -->|否| LogError[记录错误信息]
    LogSuccess --> NextStep{还有步骤?}
    LogError --> NextStep
    NextStep -->|是| InitDataX
    NextStep -->|否| CloseProgress[关闭进度条]
    CloseProgress --> LogComplete[记录完成信息]
    LogComplete --> End([流程结束])

增强流程实现示例

from sunvpy.PySSDataX import DataXScript
from sunvpy.PySSProcess import SSProcess
import logging
from datetime import datetime

def setup_logger(log_file):
    """配置日志记录器"""
    logger = logging.getLogger('DataExchange')
    logger.setLevel(logging.DEBUG)
    
    # 文件处理器
    file_handler = logging.FileHandler(log_file, encoding='utf-8')
    file_handler.setLevel(logging.DEBUG)
    
    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    
    # 格式化器
    formatter = logging.Formatter(
        '%(asctime)s - %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)
    
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    
    return logger

def enhanced_exchange_workflow():
    """
    带日志和进度跟踪的增强数据交换流程
    """
    
    # ========== 配置日志 ==========
    log_file = f"D:\\logs\\data_exchange_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
    logger = setup_logger(log_file)
    SSProcess.set_logger(logger)
    
    logger.info("=== 数据交换流程开始 ===")
    logger.info(f"日志文件: {log_file}")
    
    try:
        # ========== 启用进度条 ==========
        SSProcess.startProgress("数据交换流程", 3)
        
        # ========== 步骤 1数据导入 ==========
        logger.info("[步骤 1/3] 开始数据导入...")
        SSProcess.stepProgress("正在导入数据...")
        
        import_datax = DataXScript()
        import_datax.setDataXParameter("ImportPath", "D:\\data\\input\\survey.shp")
        import_datax.setDataXParameter("SourceEncoding", "GBK")
        import_datax.setDataXParameter("CoordSystem", "EPSG:4326")
        import_datax.setDataXParameter("LayerMapping", "测区数据")
        
        logger.debug("导入参数配置完成")
        logger.debug(f"  ImportPath: {import_datax.getDataXParameter('ImportPath')}")
        
        import_success = import_datax.importData()
        
        if import_success:
            import_file = import_datax.getImportFileName()
            logger.info(f"数据导入成功: {import_file}")
        else:
            logger.error("数据导入失败")
            SSProcess.log_error_msg("数据导入失败")
            raise Exception("导入操作失败")
        
        import_datax.clearDataXParameter()
        
        # ========== 步骤 2数据处理 ==========
        logger.info("[步骤 2/3] 开始数据处理...")
        SSProcess.stepProgress("正在处理数据...")
        
        SSProcess.clearSelection()
        SSProcess.clearSelectCondition()
        SSProcess.setSelectCondition("SSObj_LayerName", "==", "测区数据")
        SSProcess.selectFilter()
        
        obj_count = SSProcess.getSelGeoCount()
        logger.info(f"找到 {obj_count} 个对象")
        
        if obj_count > 0:
            SSProcess.pushUndoMark("数据交换处理")
            logger.info("开始修改对象属性...")
            
            for i in range(min(obj_count, 100)):  # 最多处理100个对象
                SSProcess.setSelGeoValue(i, "DataSource", "增强流程")
                SSProcess.setSelGeoValue(i, "ProcessTime", 
                    datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
            
            SSProcess.saveBufferToDatabase()
            logger.info("数据处理完成,已保存到数据库")
        else:
            logger.warning("没有找到需要处理的对象")
        
        # ========== 步骤 3数据导出 ==========
        logger.info("[步骤 3/3] 开始数据导出...")
        SSProcess.stepProgress("正在导出数据...")
        
        export_datax = DataXScript()
        export_datax.setDataXParameter("ExportPath", "D:\\data\\output\\survey_processed.mdb")
        export_datax.setDataXParameter("ExportFormat", "MDB")
        export_datax.setDataXParameter("TargetEncoding", "UTF-8")
        export_datax.setDataXParameter("CoordSystem", "EPSG:4490")
        
        logger.debug("导出参数配置完成")
        logger.debug(f"  ExportPath: {export_datax.getDataXParameter('ExportPath')}")
        
        export_success = export_datax.exportData()
        
        if export_success:
            logger.info("数据导出成功")
        else:
            logger.error("数据导出失败")
            SSProcess.log_error_msg("数据导出失败")
            raise Exception("导出操作失败")
        
        export_datax.clearDataXParameter()
        
        # ========== 流程完成 ==========
        logger.info("=== 数据交换流程成功完成 ===")
        SSProcess.closeProgress()
        
        return True
        
    except Exception as e:
        logger.error(f"流程执行异常: {str(e)}", exc_info=True)
        SSProcess.closeProgress()
        return False

# 执行增强流程
if __name__ == "__main__":
    success = enhanced_exchange_workflow()
    if success:
        print("\n流程执行成功")
    else:
        print("\n流程执行失败,请查看日志文件")

Sources: PySSDataX.py | PySSProcess.py | ssprocess_mixins/log_mixin.py | ssprocess_mixins/progress_mixin.py

日志和进度功能对比

功能 基础流程 增强流程 优势
错误记录 print 输出 logging 模块 持久化、分级
进度显示 进度条 用户友好
参数记录 DEBUG 级别日志 便于调试
异常追踪 exc_info 完整堆栈
时间戳 自动添加 时间分析

Sources: ssprocess_mixins/log_mixin.py | ssprocess_mixins/progress_mixin.py

自定义流程设计模式

构建自定义数据交换流程时,可以应用以下设计模式提高代码的可维护性和可扩展性。

模式一:流程模板模式

将固定的交换流程定义为模板,通过继承和重写特定步骤实现定制化。

from abc import ABC, abstractmethod
from sunvpy.PySSDataX import DataXScript
from sunvpy.PySSProcess import SSProcess

class DataExchangeTemplate(ABC):
    """数据交换流程模板"""
    
    def execute(self):
        """执行完整流程(模板方法)"""
        self.preprocess()
        self.import_data()
        self.process_data()
        self.export_data()
        self.postprocess()
    
    def preprocess(self):
        """前置处理"""
        print("前置处理...")
    
    def import_data(self):
        """导入数据"""
        import_datax = DataXScript()
        self.configure_import(import_datax)
        success = import_datax.importData()
        if success:
            self.on_import_success(import_datax)
        else:
            self.on_import_failed()
        return success
    
    def export_data(self):
        """导出数据"""
        export_datax = DataXScript()
        self.configure_export(export_datax)
        success = export_datax.exportData()
        if success:
            self.on_export_success()
        else:
            self.on_export_failed()
        return success
    
    def postprocess(self):
        """后置处理"""
        print("后置处理...")
    
    # 抽象方法,由子类实现
    @abstractmethod
    def configure_import(self, datax):
        """配置导入参数"""
        pass
    
    @abstractmethod
    def configure_export(self, datax):
        """配置导出参数"""
        pass
    
    @abstractmethod
    def process_data(self):
        """处理数据"""
        pass
    
    # 钩子方法,子类可选重写
    def on_import_success(self, datax):
        print("导入成功")
    
    def on_import_failed(self):
        print("导入失败")
    
    def on_export_success(self):
        print("导出成功")
    
    def on_export_failed(self):
        print("导出失败")

# 实现具体流程
class RoadDataExchange(DataExchangeTemplate):
    """道路数据交换流程"""
    
    def configure_import(self, datax):
        datax.setDataXParameter("ImportPath", "D:\\data\\roads.shp")
        datax.setDataXParameter("SourceEncoding", "GBK")
        datax.setDataXParameter("CoordSystem", "EPSG:4326")
    
    def configure_export(self, datax):
        datax.setDataXParameter("ExportPath", "D:\\output\\roads.mdb")
        datax.setDataXParameter("TargetEncoding", "UTF-8")
        datax.setDataXParameter("CoordSystem", "EPSG:4490")
    
    def process_data(self):
        """处理道路数据"""
        SSProcess.clearSelection()
        SSProcess.setSelectCondition("SSObj_LayerName", "==", "道路")
        SSProcess.selectFilter()
        
        count = SSProcess.getSelGeoCount()
        for i in range(count):
            SSProcess.setSelGeoValue(i, "Processed", "true")
        
        SSProcess.saveBufferToDatabase()

# 执行流程
if __name__ == "__main__":
    workflow = RoadDataExchange()
    workflow.execute()

Sources: PySSDataX.py | PySSProcess.py

模式二:管道模式

将数据交换分解为多个独立的处理阶段,通过管道连接形成完整流程。

class DataExchangePipeline:
    """数据交换管道"""
    
    def __init__(self):
        self.stages = []
    
    def add_stage(self, stage):
        """添加处理阶段"""
        self.stages.append(stage)
        return self
    
    def execute(self, context):
        """执行管道"""
        for stage in self.stages:
            context = stage.process(context)
            if context is None:
                return None
        return context

class ExchangeContext:
    """交换上下文,在各阶段间传递数据"""
    def __init__(self):
        self.input_path = None
        self.output_path = None
        self.datax = None
        self.result = True
        self.message = ""
    
    def fail(self, message):
        """标记失败"""
        self.result = False
        self.message = message
        return None

# 定义各个处理阶段
class ImportStage:
    """导入阶段"""
    def process(self, context):
        datax = DataXScript()
        datax.setDataXParameter("ImportPath", context.input_path)
        datax.setDataXParameter("SourceEncoding", "GBK")
        
        if not datax.importData():
            return context.fail("导入失败")
        
        context.datax = datax
        return context

class ProcessStage:
    """处理阶段"""
    def process(self, context):
        # 执行数据处理逻辑
        SSProcess.clearSelection()
        SSProcess.selectFilter()
        
        count = SSProcess.getSelGeoCount()
        for i in range(count):
            SSProcess.setSelGeoValue(i, "Processed", "true")
        
        SSProcess.saveBufferToDatabase()
        return context

class ExportStage:
    """导出阶段"""
    def process(self, context):
        datax = DataXScript()
        datax.setDataXParameter("ExportPath", context.output_path)
        datax.setDataXParameter("TargetEncoding", "UTF-8")
        
        if not datax.exportData():
            return context.fail("导出失败")
        
        return context

# 构建并执行管道
if __name__ == "__main__":
    context = ExchangeContext()
    context.input_path = "D:\\data\\input.shp"
    context.output_path = "D:\\data\\output.mdb"
    
    pipeline = DataExchangePipeline()
    pipeline.add_stage(ImportStage()) \
           .add_stage(ProcessStage()) \
           .add_stage(ExportStage())
    
    result = pipeline.execute(context)
    
    if result and result.result:
        print("流程执行成功")
    else:
        print(f"流程执行失败: {context.message}")

Sources: PySSDataX.py | PySSProcess.py

错误处理与最佳实践

在构建自定义数据交换流程时,完善的错误处理机制和遵循最佳实践可以显著提高脚本的可靠性和可维护性。

错误处理策略

错误类型 处理策略 实现方法
文件不存在 提前验证并提示 os.path.exists()
参数配置错误 参数验证 getDataXParameter() 检查
导入导出失败 重试机制 循环重试逻辑
数据处理异常 异常捕获 try-except
权限问题 检查并提示 权限验证函数

Sources: PySSDataX.py

最佳实践总结

实践类别 最佳实践 说明
实例管理 每次交换创建新 DataXScript 实例 避免参数污染
参数清理 执行后调用 clearDataXParameter() 释放资源
路径处理 使用绝对路径,验证路径有效性 避免路径错误
错误日志 使用日志记录器记录详细错误 便于问题排查
进度反馈 长时间任务提供进度显示 改善用户体验
撤销支持 数据修改前创建撤销标记 支持回滚操作
测试验证 导入后重新导入验证 确保数据完整性

Sources: PySSDataX.py | ssprocess_mixins/log_mixin.py | ssprocess_mixins/progress_mixin.py

常见问题排查

问题现象 可能原因 解决方法
参数设置后无效 参数名称拼写错误 检查参数名称准确性
导入数据乱码 字符编码不匹配 设置正确的 SourceEncoding
坐标位置偏移 坐标系统未设置 配置 CoordSystem 参数
批量处理卡住 文件过大或死锁 分批处理或检查权限
进度条不更新 进度条未正确初始化 检查 startProgress 调用
日志未生成 日志配置错误 检查日志路径和权限

Sources: PySSDataX.py

学习路径建议

掌握了自定义数据交换流程的基础知识后,建议按照以下路径深入学习:

  1. 深入学习选择集操作:阅读使用 SSProcess 管理选择集,了解复杂的选择条件和批量操作技巧,为数据预处理阶段提供更强的数据处理能力。

  2. 掌握数据检查功能:学习数据检查记录管理自定义检查规则,在数据交换后自动进行质量检查,确保输出数据的准确性。

  3. 理解日志与进度:阅读配置日志记录器进度条使用指南,为长时间运行的交换任务提供完善的监控和反馈机制。

  4. 掌握错误处理:学习错误处理与异常管理,构建健壮的数据交换流程,能够优雅地处理各种异常情况。

  5. 探索系统架构:阅读Mixin 设计模式SunvStation 系统架构,深入理解底层设计原理,为构建高级自定义流程奠定理论基础。

通过将这些知识点融会贯通,您将能够构建出功能强大、可靠稳定的数据交换自动化系统,满足各种复杂的业务需求。