Hi everyone,
I am very excited with the 2.36 release, especially the stopReadOffset
addition to the KafkaSourceDescriptors. With it, I can read sections of a
topic and create state,effectively having a bounded kafka source, before
reading new items that need processing.
Unfortunately, running the pipeline from the Flink CLI produces the
following error:
Pretty printing Flink args:
--detached
--class=namespace.pipeline.App
/opt/bns/etf/data-platform/user_code/pipelines/cerberus-etl-pipelines.jar
--configFilePath=/path/to/config.properties
--runner=FlinkRunner
--streaming
--checkpointingInterval=30000
--stateBackend=filesystem
--stateBackendStoragePath=file:///path/to/state
--numberOfExecutionRetries=2
--fasterCopy
--debugThrowExceptions
java.lang.IncompatibleClassChangeError: Class
org.apache.beam.model.pipeline.v1.RunnerApi$StandardResourceHints$Enum does
not implement the requested interface
org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ProtocolMessageEnum
at
org.apache.beam.sdk.transforms.resourcehints.ResourceHints.getUrn(ResourceHints.java:50)
at
org.apache.beam.sdk.transforms.resourcehints.ResourceHints.<clinit>(ResourceHints.java:54)
at org.apache.beam.sdk.Pipeline.<init>(Pipeline.java:523)
at org.apache.beam.sdk.Pipeline.create(Pipeline.java:157)
at lines containing Pipeline.create(options) <--- my code
at namespace.pipeline.App.main(App.java:42) <-- my code
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Any advice would be appreciated.
Thank you,
Cristian