各位老师好:
[上封邮件未正确加载图片,重新发送] 各位老师好: 背景是这样的[flink1.17.1],我在window机器,本地单机调用自定义的pythonUDF,下面是我python代码 err=None @udtf(input_types=DataTypes.STRING(), result_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]) def run(data_str): try: logger.info("input param is ", data_str) data = [{'name': data_str}] extractEngine = init_info(data) for row in extractEngine.extract(): return row.content.get('content'), list_tran_json(row.content.get('tok')), list_tran_json( row.content.get('pos')), list_tran_json(row.content.get('dep')) except Exception as e: err = e logger.error(e) return str(err), '', '', '' 这段代码我是定义了一个切词能力,使用hanlp的,进来把语句进行切词和依存处理,并返回, 我能保证并验证这个方法没有任何问题,都能执行; 现在我通过@udtf包装成udf,通过java代码去调用 下面是java代码: @Test @SneakyThrows public void testTableMiningFunc() { registerTable(); String registerSql = "CREATE TEMPORARY FUNCTION mining AS 'py_bian_func.mining_w.run' LANGUAGE PYTHON"; tableEnv.executeSql( registerSql); String sql = "SELECT * from t_a001 ,LATERAL TABLE(mining(name)) as alias(content, pos, top, des) "; TableResult tableResult = tableEnv.executeSql(sql); tableResult.print(); } 返回数据是 这个我一直没搞懂,为什么会显示递归问题; PS: 补充解析下,python代码之所为通过全局变量去接受异常,因为我之前发现我如果不进行异常捕获,flink程序会卡住,日志提示会显示:org.apache.beam.runners.fnexecution.logging.GrpcLoggingService - Logging client hanged up.然后卡住不动 我去跟踪了算子图:,发现数据发送到python的算子,但是没有输出,对应日志为:org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator - Closing streams for instruction 1 and outbound data {fn/read/input:0=Byte size: 239, Element count: 4} and timers {}. 现在我在怀疑是flink在@udtf 加载装饰器过程中可能出现了问题,导致我run方法没有进去执行;因为我在run定义了log,但是在控制台没看到任何日志输出; 由于官网的docs对于这块底层加载逻辑没有太多介绍,这边请教下各位老师,应该怎么处理 我用pydev-pycharm去debug pyflink代码,发现在java_gateway.py中run方法有正常跑,而且 能正常发送,但是没有执行到我自定义的udf中的run方法; 困扰2天,望各位老师指点!