你好,Xiao Zhang.
你的代码中的这一行需要替换:
ds = env.from_source(source=myTestSourceFunction,
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name="websocket source")

替换成:
ds = env.from_source(source=myTestSourceFunction,
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name="websocket source",
type_info=None
)

> 还有就是看这个接口他的一些其他实现如RichParallelSourceFunction已经过时,那现在想自定义source该用什么呀
是的,SourceFunction 是过时的,需要使用 Source API,可以参考官方文档的说明。
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/



> 2024年10月16日 23:22,Xiao Zhang <1272229...@qq.com.INVALID> 写道:
> 
> 
> 附件将于2024年11月15日过期
> 环境为:pyflink版本 1.20.0   Jdk11
> 具体报错如下:
> /Users/xiaozhang/anaconda3/envs/pyflink/bin/python 
> /Users/xiaozhang/Documents/临时/pyflink2/socketFlinkTest.py 
> Traceback (most recent call last):
>  File "/Users/xiaozhang/Documents/临时/pyflink2/socketFlinkTest.py", line 27, 
> in <module>
>    ds = env.from_source(source=myTestSourceFunction,
>  File 
> "/Users/xiaozhang/anaconda3/envs/pyflink/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py",
>  line 972, in from_source
>    j_data_stream = self._j_stream_execution_environment.fromSource(
>  File 
> "/Users/xiaozhang/anaconda3/envs/pyflink/lib/python3.10/site-packages/py4j/java_gateway.py",
>  line 1322, in __call__
>    return_value = get_return_value(
>  File 
> "/Users/xiaozhang/anaconda3/envs/pyflink/lib/python3.10/site-packages/pyflink/util/exceptions.py",
>  line 146, in deco
>    return f(*a, **kw)
>  File 
> "/Users/xiaozhang/anaconda3/envs/pyflink/lib/python3.10/site-packages/py4j/protocol.py",
>  line 330, in get_return_value
>    raise Py4JError(
> py4j.protocol.Py4JError: An error occurred while calling o10.fromSource. 
> Trace:
> org.apache.flink.api.python.shaded.py4j.Py4JException: Method 
> fromSource([class MyTestSourceFunction, class 
> org.apache.flink.api.common.eventtime.WatermarkStrategy$$Lambda$184/0x000000080026fc40,
>  class java.lang.String, null]) does not exist
>       at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
>       at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329)
>       at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274)
>       at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>       at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>       at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>       at java.base/java.lang.Thread.run(Thread.java:834)
> 
> 
> Java实现如下:
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> 
> public class MyTestSourceFunction implements SourceFunction<String> {
>    private boolean runningFlag = true;
>    @Override
>    public void run(SourceContext<String> ctx) throws Exception {
>        while (runningFlag){
>        ctx.collect("hi");
>        ctx.collect("world");
>        Thread.sleep(30000);
>        }
>    }
> 
>    @Override
>    public void cancel() {
>        runningFlag = false;
>        }
> }
> Python 调用代码如下:
> if __name__ == '__main__':
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> 
> # websocket_builder = WebSocketBuilder()
> # 
> websocket_builder.set_websocket_url('wss://fstream.binance.com/ws/btcusdt@aggTrade')
> # websocket_source = websocket_builder.build()
> 
> # 注意:仅支持本地文件URL(以"file:"开头)。
> # env.get_config().set("pipeline.jars", 'file:///Users/xiaozhang/Documents/ 
> <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>临时
>  
> <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>/pyflink2/data/jar/MyFlinkJava2.jar
>  
> <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>')
> #
> # # 注意:路径必须指定协议(例如:文件——"file"),并且用户应确保在客户端和群集上都可以访问这些URL。
> # env.get_config().set("pipeline.classpaths", 
> 'file:///Users/xiaozhang/Documents/ 
> <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>临时
>  
> <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>/pyflink2/data/jar/MyFlinkJava2.jar
>  
> <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>')
> 
> env.add_jars('file:///Users/xiaozhang/Documents/ 
> <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava3.jar>临时
>  
> <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava3.jar>/pyflink2/data/jar/MyFlinkJava3.jar
>  
> <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava3.jar>')
> 
> websocket_source = 
> MyWebSocketSourceFunction('wss://fstream.binance.com/ws/btcusdt@aggTrade')
> myTestSourceFunction = MyTestSourceFunction()
> 
> ds = env.from_source(source=myTestSourceFunction,
> watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
> source_name="websocket source")
> 
> # 这里可以对 DataStream 进行进一步的处理或输出
> 
> env.execute()
> 点按以下载
> <https://www.icloud.com/attachment/?u=https%3A%2F%2Fcvws.icloud-content.com.cn%2FB%2FAS_hPICVuwMyqRslSuaVxafzE28BAQfrbb9zEnSqBKJNQPFq52CKy3jJ%2F%24%7Bf%7D%3Fo%3DAlMPOFyDuAUnklB6z1WxH3Goppe3dCVz045kgO4v4FWZ%26v%3D1%26x%3D3%26a%3DCAog5b0lBmoLgON8MghDYAm5-GzyY_0ektnm6f0deRTf0AMSehDdlbKvqTIY3aWtg7MyIgEAKgkC6AMA_0qFfGBSBPMTbwFaBIrLeMlqJ9fmE8i8lV-QQKOdxk6DpYiwmTqaaNcRQJOZwoWKypKimqxRXG-asXInndmhkw6eiqABj61DFSJ5_EE3oesXGq59O8qZsuhs1iwX6M_7gXEC%26e%3D1731684160%26fl%3D%26r%3D90C67D72-75BF-4407-B862-CB77E2B20EA2-1%26k%3D%24%7Buk%7D%26ckc%3Dcom.apple.largeattachment%26ckz%3D90FDAF6E-4E3F-4528-B0D9-1CEC40207386%26p%3D203%26s%3DSVOlLV6Fb-d8rlZG2BdKPQb6cwQ&uk=eFXBf3JGovxD2MX0bJlPhw&f=MyFlinkJava3.jar&sz=66618227>MyFlinkJava3.jar
> 66.6 MB
> 不知报错的原因是什么,或有哪些排查方向,附件为自己打包的jar包
> 
> 还有就是看这个接口他的一些其他实现如RichParallelSourceFunction已经过时,那现在想自定义source该用什么呀,

回复