Thanks Luke. I will check on the "portable" mode with Flink. Also hoping that 2.25.0 will be released soon.
Regards, Praveen On Mon, Oct 5, 2020 at 11:46 AM Luke Cwik <lc...@google.com> wrote: > Impulse in a released version of Apache Beam is only supported if you run > your pipeline in "portable" mode with Flink. See > https://beam.apache.org/documentation/runners/flink/ for some example > instructions on how to run a "portable" pipeline. > > I added support for impulse in non portable pipeline execution to Flink in > https://github.com/apache/beam/pull/12708 which will be available in > 2.25.0 release (this release is currently underway). > > On Mon, Oct 5, 2020 at 11:28 AM Praveen K Viswanathan < > harish.prav...@gmail.com> wrote: > >> Thanks for sharing the tool Tomo. I will give it a try and let you know. >> >> Regards, >> Praveen >> >> On Fri, Oct 2, 2020 at 8:52 PM Tomo Suzuki <suzt...@google.com> wrote: >> >>> I suspect your dependencies have conflict. I develop Linkage Checker >>> enforcer rule to identify incompatible dependencies. Do you want to give it >>> a try? >>> >>> https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/Linkage-Checker-Enforcer-Rule >>> >>> Regards, >>> Tomo >>> >>> On Fri, Oct 2, 2020 at 21:34 Praveen K Viswanathan < >>> harish.prav...@gmail.com> wrote: >>> >>>> Hi - We have a beam pipeline reading and writing using an SDF based IO >>>> connector working fine in a local machine using Direct Runner or Flink >>>> Runner. However when we build an image of that pipeline along with Flink >>>> and deploy in a cluster we get below exception. >>>> >>>> ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - >>>>> Unhandled exception. >>>>> org.apache.flink.client.program.ProgramInvocationException: The >>>>> program caused an error: >>>>> >>>>> Classpath: >>>>> [file:/opt/flink/flink-web-upload/Booster-bundled-1.0-SNAPSHOT.jar] >>>>> System.out: (none) >>>>> System.err: (none) >>>>> at >>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.generateException(OptimizerPlanEnvironment.java:149) >>>>> at >>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:89) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56) >>>>> at >>>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128) >>>>> at >>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138) >>>>> at >>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> Caused by: java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION >>>>> at >>>>> org.apache.beam.runners.core.construction.PTransformTranslation.<clinit>(PTransformTranslation.java:199) >>>>> at >>>>> org.apache.beam.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:264) >>>>> at >>>>> org.apache.beam.sdk.Pipeline$2.enterCompositeTransform(Pipeline.java:272) >>>>> at >>>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:653) >>>>> at >>>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) >>>>> at >>>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317) >>>>> at >>>>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251) >>>>> at >>>>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463) >>>>> at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:262) >>>>> at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:212) >>>>> at >>>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:115) >>>>> at >>>>> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:82) >>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317) >>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) >>>>> at com.org.cx.signals.Booster.main(Booster.java:278) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >>>>> at >>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79) >>>>> ... 8 more >>>> >>>> >>>> In our pom.xml we have created a profile for flink-runner as shown >>>> below. >>>> >>>> <profiles> >>>>> <profile> >>>>> <id>flink-runner</id> >>>>> <!-- Makes the FlinkRunner available when running a pipeline. >>>>> --> >>>>> <dependencies> >>>>> <dependency> >>>>> <groupId>org.apache.beam</groupId> >>>>> <artifactId>beam-runners-flink-1.10</artifactId> >>>>> <version>2.21.0</version> >>>>> <!-- <scope>runtime</scope> --> >>>>> </dependency> >>>>> </dependencies> >>>>> </profile> >>>>> </profiles> >>>> >>>> >>>> And the docker image has below flink version >>>> >>>> FROM flink:1.10.0-scala_2.12 >>>> >>>> >>>> Both our pipeline and SDF based IO connector are on Beam 2.23.0 >>>> version. Appreciate if you can guide us on what is causing this exception. >>>> >>>> -- >>>> Thanks, >>>> Praveen K Viswanathan >>>> >>> -- >>> Regards, >>> Tomo >>> >> >> >> -- >> Thanks, >> Praveen K Viswanathan >> > -- Thanks, Praveen K Viswanathan