Hi Luke - If I deploy the pipeline with 2.23.0, then I am getting below "The transform beam:transform:impulse:v1 is currently not supported" exception.
2020-10-03 00:42:31,117 ERROR > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Unhandled > exception. > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: The transform beam:transform:impulse:v1 is > currently not supported. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > at > org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79) > at > org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101) > at > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56) > at > org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128) > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.UnsupportedOperationException: > *The transform beam:transform:impulse:v1 is currently not supported.* > at > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:133) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251) > at > org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463) > at > org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38) > at > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:88) > at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:117) > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:82) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) > at com.oracle.cx.signals.Booster.main(Booster.java:278) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > ... 10 more Below is the transformation details of the pipeline from log. > > 2020-10-03 00:42:29,442 WARN org.apache.beam.sdk.Pipeline > - The following transforms do not have stable unique > names: ParDo(Anonymous) > 2020-10-03 00:42:29,445 INFO org.apache.beam.runners.flink.FlinkRunner > - Executing pipeline using FlinkRunner. > 2020-10-03 00:42:29,447 INFO org.apache.beam.runners.flink.FlinkRunner > - Translating pipeline to Flink program. > 2020-10-03 00:42:29,456 INFO > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment - Found > unbounded PCollection. Switching to streaming execution. > 2020-10-03 00:42:29,462 INFO > org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a > Streaming Environment. > 2020-10-03 00:42:29,463 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.rpc.address, > booster-app-16e2bc1f-jm-689c9bc7f-mvcmg > 2020-10-03 00:42:29,463 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.rpc.port, 6123 > 2020-10-03 00:42:29,464 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.heap.size, 1024m > 2020-10-03 00:42:29,464 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: taskmanager.memory.process.size, 1568m > 2020-10-03 00:42:29,464 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: taskmanager.numberOfTaskSlots, 1 > 2020-10-03 00:42:29,464 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: parallelism.default, 1 > 2020-10-03 00:42:29,465 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.execution.failover-strategy, region > 2020-10-03 00:42:29,465 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: s3.access-key, > d7a8b37b50ae280045ac8a2daed0451850a7196a > 2020-10-03 00:42:29,465 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: s3.secret-key, ****** > 2020-10-03 00:42:29,466 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: s3.path-style-access, true > 2020-10-03 00:42:29,466 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: s3.endpoint, > https://axo0ime4ahzu.compat.objectstorage.us-phoenix-1.oraclecloud.com > 2020-10-03 00:42:29,466 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: zookeeper.sasl.disable, true > 2020-10-03 00:42:29,466 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: metrics.reporters, prom > 2020-10-03 00:42:29,466 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: metrics.reporter.prom.class, > org.apache.flink.metrics.prometheus.PrometheusReporter > 2020-10-03 00:42:29,467 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: metrics.reporter.prom.port, 8080 > 2020-10-03 00:42:29,467 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: blob.server.port, 6124 > 2020-10-03 00:42:29,467 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: query.server.port, 6125 > 2020-10-03 00:42:29,467 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: blob.server.port, 6125 > 2020-10-03 00:42:29,467 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: high-availability, zookeeper > 2020-10-03 00:42:29,468 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: high-availability.cluster-id, /dev-flink-test > 2020-10-03 00:42:29,468 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: high-availability.jobmanager.port, 6123 > 2020-10-03 00:42:29,468 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: high-availability.storageDir, > s3p://dev-flink-test/job-metadata > 2020-10-03 00:42:29,468 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: high-availability.zookeeper.path.root, /flink > 2020-10-03 00:42:29,468 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: high-availability.zookeeper.quorum, > zk-0.zk-hs.default.svc.cluster.local:2181 > 2020-10-03 00:42:29,469 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.heap.size, 976562k > 2020-10-03 00:42:29,469 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.rpc.port, 6123 > 2020-10-03 00:42:29,469 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.web.port, 8081 > 2020-10-03 00:42:29,469 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: metrics.internal.query-service.port, 50101 > 2020-10-03 00:42:29,470 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: query.server.port, 6124 > 2020-10-03 00:42:29,470 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: state.backend, filesystem > 2020-10-03 00:42:29,470 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: state.checkpoints.dir, > s3p://dev-flink-test/checkpoints/ > 2020-10-03 00:42:29,470 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: state.savepoints.dir, > s3p://dev-flink-test/savepoints/ > 2020-10-03 00:42:29,470 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: taskmanager.heap.size, 976562k > 2020-10-03 00:42:29,471 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: taskmanager.network.memory.fraction, 0.1 > 2020-10-03 00:42:29,471 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: taskmanager.network.memory.min, 10m > 2020-10-03 00:42:29,471 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: taskmanager.numberOfTaskSlots, 2 > 2020-10-03 00:42:29,471 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: web.upload.dir, /opt/flink > 2020-10-03 00:42:29,472 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: high-availability.cluster-id, booster-app-16e2bc1f > 2020-10-03 00:42:29,472 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.rpc.address, 172.16.0.144 > 2020-10-03 00:42:29,507 WARN > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment - > UnboundedSources present which rely on checkpointing, but checkpointing is > disabled. > 2020-10-03 00:42:30,650 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - > enterCompositeTransform- > 2020-10-03 00:42:30,651 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | > enterCompositeTransform- Create.Values > 2020-10-03 00:42:30,675 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > visitPrimitiveTransform- Create.Values/Read(CreateSource) > 2020-10-03 00:42:30,865 WARN org.apache.beam.sdk.coders.SerializableCoder > - Can't verify serialized elements of type BoundedSource > have well defined equals method. This may produce incorrect results on some > PipelineRunner > 2020-10-03 00:42:30,908 INFO > org.apache.flink.api.java.typeutils.TypeExtractor - No fields > were detected for class org.apache.beam.sdk.util.WindowedValue so it cannot > be used as a POJO type and must be processed as GenericType. Please read > the Flink documentation on "Data Types & Serialization" for details of the > effect on performance. > 2020-10-03 00:42:30,976 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | > leaveCompositeTransform- Create.Values > 2020-10-03 00:42:30,977 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | > enterCompositeTransform- ParDo(Anonymous) > 2020-10-03 00:42:30,977 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > visitPrimitiveTransform- ParDo(Anonymous)/ParMultiDo(Anonymous) > 2020-10-03 00:42:31,000 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | > leaveCompositeTransform- ParDo(Anonymous) > 2020-10-03 00:42:31,001 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | > enterCompositeTransform- Parse Signals > 2020-10-03 00:42:31,001 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > visitPrimitiveTransform- Parse Signals/ParMultiDo(ParseSignals) > 2020-10-03 00:42:31,014 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | > leaveCompositeTransform- Parse Signals > 2020-10-03 00:42:31,019 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | > enterCompositeTransform- Log.LoggingTransform > 2020-10-03 00:42:31,019 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > enterCompositeTransform- Log.LoggingTransform/ParDo(Anonymous) > 2020-10-03 00:42:31,023 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | visitPrimitiveTransform- > Log.LoggingTransform/ParDo(Anonymous)/ParMultiDo(Anonymous) > 2020-10-03 00:42:31,041 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > leaveCompositeTransform- Log.LoggingTransform/ParDo(Anonymous) > 2020-10-03 00:42:31,041 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | > leaveCompositeTransform- Log.LoggingTransform > 2020-10-03 00:42:31,041 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | > enterCompositeTransform- View.AsList > 2020-10-03 00:42:31,041 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > enterCompositeTransform- View.AsList/View.VoidKeyToMultimapMaterialization > 2020-10-03 00:42:31,041 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | enterCompositeTransform- > View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization) > 2020-10-03 00:42:31,041 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | | visitPrimitiveTransform- > View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization) > 2020-10-03 00:42:31,051 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | leaveCompositeTransform- > View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization) > 2020-10-03 00:42:31,051 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > leaveCompositeTransform- View.AsList/View.VoidKeyToMultimapMaterialization > 2020-10-03 00:42:31,051 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > enterCompositeTransform- View.AsList/View.CreatePCollectionView > 2020-10-03 00:42:31,051 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | enterCompositeTransform- > View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate) > 2020-10-03 00:42:31,051 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | | enterCompositeTransform- > View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys > 2020-10-03 00:42:31,051 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | | | enterCompositeTransform- > View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys > 2020-10-03 00:42:31,052 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | | | | enterCompositeTransform- > View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map > 2020-10-03 00:42:31,052 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | | | | | visitPrimitiveTransform- > View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous) > 2020-10-03 00:42:31,068 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | | | | leaveCompositeTransform- > View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map > 2020-10-03 00:42:31,068 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | | | leaveCompositeTransform- > View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys > 2020-10-03 00:42:31,068 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | | leaveCompositeTransform- > View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys > 2020-10-03 00:42:31,068 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | | enterCompositeTransform- > View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate) > 2020-10-03 00:42:31,086 INFO > org.apache.flink.api.java.typeutils.TypeExtractor - No fields > were detected for class org.apache.beam.sdk.util.WindowedValue so it cannot > be used as a POJO type and must be processed as GenericType. Please read > the Flink documentation on "Data Types & Serialization" for details of the > effect on performance. > 2020-10-03 00:42:31,105 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | | | translated- > View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate) > 2020-10-03 00:42:31,105 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | | leaveCompositeTransform- > View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate) > 2020-10-03 00:42:31,105 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | | enterCompositeTransform- > View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Values > 2020-10-03 00:42:31,105 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | | | enterCompositeTransform- > View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values > 2020-10-03 00:42:31,105 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | | | | enterCompositeTransform- > View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map > 2020-10-03 00:42:31,105 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | | | | | visitPrimitiveTransform- > View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous) > 2020-10-03 00:42:31,115 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | | | | leaveCompositeTransform- > View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map > 2020-10-03 00:42:31,115 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | | | leaveCompositeTransform- > View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values > 2020-10-03 00:42:31,115 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | | leaveCompositeTransform- > View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate)/Values > 2020-10-03 00:42:31,115 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | leaveCompositeTransform- > View.AsList/View.CreatePCollectionView/Combine.globally(Concatenate) > 2020-10-03 00:42:31,115 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > | visitPrimitiveTransform- > View.AsList/View.CreatePCollectionView/CreateStreamingFlinkView.CreateFlinkPCollectionView > 2020-10-03 00:42:31,115 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > leaveCompositeTransform- View.AsList/View.CreatePCollectionView > 2020-10-03 00:42:31,115 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | > leaveCompositeTransform- View.AsList > 2020-10-03 00:42:31,116 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | > enterCompositeTransform- Read from rawsignal > 2020-10-03 00:42:31,116 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - | | > visitPrimitiveTransform- Read from rawsignal/Impulse > 2020-10-03 00:42:31,116 INFO > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator - > *beam:transform:impulse:v1* On Mon, Oct 5, 2020 at 9:51 AM Luke Cwik <lc...@google.com> wrote: > In your pom.xml you are stating you want Flink 2.21.0 but you are using > 2.23 elsewhere. You want these versions to match. Try updating your profile > to: > <profile> > <id>flink-runner</id> > <!-- Makes the FlinkRunner available when running a pipeline. --> > <dependencies> > <dependency> > <groupId>org.apache.beam</groupId> > <artifactId>beam-runners-flink-1.10</artifactId> > <version>*2.23.0*</version> > <!-- <scope>runtime</scope> --> > </dependency> > </dependencies> > </profile> > > On Fri, Oct 2, 2020 at 8:52 PM Tomo Suzuki <suzt...@google.com> wrote: > >> I suspect your dependencies have conflict. I develop Linkage Checker >> enforcer rule to identify incompatible dependencies. Do you want to give it >> a try? >> >> https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/Linkage-Checker-Enforcer-Rule >> >> Regards, >> Tomo >> >> On Fri, Oct 2, 2020 at 21:34 Praveen K Viswanathan < >> harish.prav...@gmail.com> wrote: >> >>> Hi - We have a beam pipeline reading and writing using an SDF based IO >>> connector working fine in a local machine using Direct Runner or Flink >>> Runner. However when we build an image of that pipeline along with Flink >>> and deploy in a cluster we get below exception. >>> >>> ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - >>>> Unhandled exception. >>>> org.apache.flink.client.program.ProgramInvocationException: The program >>>> caused an error: >>>> >>>> Classpath: >>>> [file:/opt/flink/flink-web-upload/Booster-bundled-1.0-SNAPSHOT.jar] >>>> System.out: (none) >>>> System.err: (none) >>>> at >>>> org.apache.flink.client.program.OptimizerPlanEnvironment.generateException(OptimizerPlanEnvironment.java:149) >>>> at >>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:89) >>>> at >>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101) >>>> at >>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56) >>>> at >>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128) >>>> at >>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138) >>>> at >>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION >>>> at >>>> org.apache.beam.runners.core.construction.PTransformTranslation.<clinit>(PTransformTranslation.java:199) >>>> at >>>> org.apache.beam.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:264) >>>> at >>>> org.apache.beam.sdk.Pipeline$2.enterCompositeTransform(Pipeline.java:272) >>>> at >>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:653) >>>> at >>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) >>>> at >>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317) >>>> at >>>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251) >>>> at >>>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463) >>>> at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:262) >>>> at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:212) >>>> at >>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:115) >>>> at >>>> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:82) >>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317) >>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) >>>> at com.org.cx.signals.Booster.main(Booster.java:278) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >>>> at >>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79) >>>> ... 8 more >>> >>> >>> In our pom.xml we have created a profile for flink-runner as shown below. >>> >>> <profiles> >>>> <profile> >>>> <id>flink-runner</id> >>>> <!-- Makes the FlinkRunner available when running a pipeline. >>>> --> >>>> <dependencies> >>>> <dependency> >>>> <groupId>org.apache.beam</groupId> >>>> <artifactId>beam-runners-flink-1.10</artifactId> >>>> <version>2.21.0</version> >>>> <!-- <scope>runtime</scope> --> >>>> </dependency> >>>> </dependencies> >>>> </profile> >>>> </profiles> >>> >>> >>> And the docker image has below flink version >>> >>> FROM flink:1.10.0-scala_2.12 >>> >>> >>> Both our pipeline and SDF based IO connector are on Beam 2.23.0 version. >>> Appreciate if you can guide us on what is causing this exception. >>> >>> -- >>> Thanks, >>> Praveen K Viswanathan >>> >> -- >> Regards, >> Tomo >> > -- Thanks, Praveen K Viswanathan