Hi, I modified the Stateful Functions 2.2.0 asyc example to include a real
binding to kafka, I included statefun-flink-distribution and
stateful-kafka-io in the pom and I created a fat jar using the
maven-assembly-plugin,

and my flink cluster complains about:

java.lang.IllegalStateException: Unable to find a source translation for
ingress of type IngressType(statefun.kafka.io, universal-ingress), which is
bound for key IngressIdentifier(org.apache.flink.statefun.examples.async,
tasks, class
org.apache.flink.statefun.examples.async.events.TaskStartedEvent)
org.apache.flink.statefun.flink.core.translation.IngressToSourceFunctionTranslator.sourceFromSpec(IngressToSourceFunctionTranslator.java:45)
org.apache.flink.statefun.flink.core.common.Maps.transformValues(Maps.java:54)
org.apache.flink.statefun.flink.core.translation.IngressToSourceFunctionTranslator.translate(IngressToSourceFunctionTranslator.java:37)
org.apache.flink.statefun.flink.core.translation.Sources.ingressToSourceFunction(Sources.java:117)
org.apache.flink.statefun.flink.core.translation.Sources.create(Sources.java:52)
org.apache.flink.statefun.flink.core.translation.FlinkUniverse.configure(FlinkUniverse.java:44)
org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:74)
org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:47)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base/java.lang.reflect.Method.invoke(Method.java:566)
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 12 more


Does anyone have any idea why this wouldn't work on a cluster, yet is
completely fine when I'm using the test harness with a real kafka?

Many thanks
Fil

Reply via email to