你好,Xiao Zhang. 你的代码中的这一行需要替换: ds = env.from_source(source=myTestSourceFunction, watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), source_name="websocket source")
替换成: ds = env.from_source(source=myTestSourceFunction, watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), source_name="websocket source", type_info=None ) > 还有就是看这个接口他的一些其他实现如RichParallelSourceFunction已经过时,那现在想自定义source该用什么呀 是的,SourceFunction 是过时的,需要使用 Source API,可以参考官方文档的说明。 https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/ > 2024年10月16日 23:22,Xiao Zhang <1272229...@qq.com.INVALID> 写道: > > > 附件将于2024年11月15日过期 > 环境为:pyflink版本 1.20.0 Jdk11 > 具体报错如下: > /Users/xiaozhang/anaconda3/envs/pyflink/bin/python > /Users/xiaozhang/Documents/临时/pyflink2/socketFlinkTest.py > Traceback (most recent call last): > File "/Users/xiaozhang/Documents/临时/pyflink2/socketFlinkTest.py", line 27, > in <module> > ds = env.from_source(source=myTestSourceFunction, > File > "/Users/xiaozhang/anaconda3/envs/pyflink/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py", > line 972, in from_source > j_data_stream = self._j_stream_execution_environment.fromSource( > File > "/Users/xiaozhang/anaconda3/envs/pyflink/lib/python3.10/site-packages/py4j/java_gateway.py", > line 1322, in __call__ > return_value = get_return_value( > File > "/Users/xiaozhang/anaconda3/envs/pyflink/lib/python3.10/site-packages/pyflink/util/exceptions.py", > line 146, in deco > return f(*a, **kw) > File > "/Users/xiaozhang/anaconda3/envs/pyflink/lib/python3.10/site-packages/py4j/protocol.py", > line 330, in get_return_value > raise Py4JError( > py4j.protocol.Py4JError: An error occurred while calling o10.fromSource. > Trace: > org.apache.flink.api.python.shaded.py4j.Py4JException: Method > fromSource([class MyTestSourceFunction, class > org.apache.flink.api.common.eventtime.WatermarkStrategy$$Lambda$184/0x000000080026fc40, > class java.lang.String, null]) does not exist > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.base/java.lang.Thread.run(Thread.java:834) > > > Java实现如下: > import org.apache.flink.streaming.api.functions.source.SourceFunction; > > public class MyTestSourceFunction implements SourceFunction<String> { > private boolean runningFlag = true; > @Override > public void run(SourceContext<String> ctx) throws Exception { > while (runningFlag){ > ctx.collect("hi"); > ctx.collect("world"); > Thread.sleep(30000); > } > } > > @Override > public void cancel() { > runningFlag = false; > } > } > Python 调用代码如下: > if __name__ == '__main__': > env = StreamExecutionEnvironment.get_execution_environment() > env.set_parallelism(1) > > # websocket_builder = WebSocketBuilder() > # > websocket_builder.set_websocket_url('wss://fstream.binance.com/ws/btcusdt@aggTrade') > # websocket_source = websocket_builder.build() > > # 注意:仅支持本地文件URL(以"file:"开头)。 > # env.get_config().set("pipeline.jars", 'file:///Users/xiaozhang/Documents/ > <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>临时 > > <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>/pyflink2/data/jar/MyFlinkJava2.jar > > <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>') > # > # # 注意:路径必须指定协议(例如:文件——"file"),并且用户应确保在客户端和群集上都可以访问这些URL。 > # env.get_config().set("pipeline.classpaths", > 'file:///Users/xiaozhang/Documents/ > <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>临时 > > <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>/pyflink2/data/jar/MyFlinkJava2.jar > > <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>') > > env.add_jars('file:///Users/xiaozhang/Documents/ > <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava3.jar>临时 > > <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava3.jar>/pyflink2/data/jar/MyFlinkJava3.jar > > <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava3.jar>') > > websocket_source = > MyWebSocketSourceFunction('wss://fstream.binance.com/ws/btcusdt@aggTrade') > myTestSourceFunction = MyTestSourceFunction() > > ds = env.from_source(source=myTestSourceFunction, > watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), > source_name="websocket source") > > # 这里可以对 DataStream 进行进一步的处理或输出 > > env.execute() > 点按以下载 > <https://www.icloud.com/attachment/?u=https%3A%2F%2Fcvws.icloud-content.com.cn%2FB%2FAS_hPICVuwMyqRslSuaVxafzE28BAQfrbb9zEnSqBKJNQPFq52CKy3jJ%2F%24%7Bf%7D%3Fo%3DAlMPOFyDuAUnklB6z1WxH3Goppe3dCVz045kgO4v4FWZ%26v%3D1%26x%3D3%26a%3DCAog5b0lBmoLgON8MghDYAm5-GzyY_0ektnm6f0deRTf0AMSehDdlbKvqTIY3aWtg7MyIgEAKgkC6AMA_0qFfGBSBPMTbwFaBIrLeMlqJ9fmE8i8lV-QQKOdxk6DpYiwmTqaaNcRQJOZwoWKypKimqxRXG-asXInndmhkw6eiqABj61DFSJ5_EE3oesXGq59O8qZsuhs1iwX6M_7gXEC%26e%3D1731684160%26fl%3D%26r%3D90C67D72-75BF-4407-B862-CB77E2B20EA2-1%26k%3D%24%7Buk%7D%26ckc%3Dcom.apple.largeattachment%26ckz%3D90FDAF6E-4E3F-4528-B0D9-1CEC40207386%26p%3D203%26s%3DSVOlLV6Fb-d8rlZG2BdKPQb6cwQ&uk=eFXBf3JGovxD2MX0bJlPhw&f=MyFlinkJava3.jar&sz=66618227>MyFlinkJava3.jar > 66.6 MB > 不知报错的原因是什么,或有哪些排查方向,附件为自己打包的jar包 > > 还有就是看这个接口他的一些其他实现如RichParallelSourceFunction已经过时,那现在想自定义source该用什么呀,