Hello!

I'm trying to modify the DataStream API example
<https://github.com/apache/flink-statefun/blob/4fe04ea351145f989e144f160425424987050f68/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java>
provided by Flink Stateful Function by attaching another function and
creating a function chain. However, I'm getting the following error


Exception in thread "main" java.lang.IllegalArgumentException: Hash
collision on user-specified ID "feedback_union_uid1". Most likely cause is
a non-unique ID. Please check that all IDs specified via uid(String) are
unique. at
org.apache.flink.streaming.api.graph.StreamGraphHasherV2.generateNodeHash(StreamGraphHasherV2.java:178)
at
org.apache.flink.streaming.api.graph.StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes(StreamGraphHasherV2.java:109)
at
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:165)
at
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:113)
at
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:850)
at
org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
at
org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
at
org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:55)
at
org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:98)
at
org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:79)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1818)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1714)
at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1682)
at
org.apache.flink.statefun.examples.datastream.Example.main(Example.java:161)


Here <https://gist.github.com/flint-stone/b896ea3422245cdca9bc7cc324be152a>
is my modified example:

https://gist.github.com/flint-stone/b896ea3422245cdca9bc7cc324be152a


I realize that I can only modify uid of the stage through DataStream API
but not StateFun API -- what is the best practice to avoid such error (or
there is a better way to chain stateful function in Flink)?


Thanks!


Le

Reply via email to