Thanks Luke. I will check on the "portable" mode with Flink. Also hoping
that 2.25.0 will be released soon.

Regards,
Praveen

On Mon, Oct 5, 2020 at 11:46 AM Luke Cwik <lc...@google.com> wrote:

> Impulse in a released version of Apache Beam is only supported if you run
> your pipeline in "portable" mode with Flink. See
> https://beam.apache.org/documentation/runners/flink/ for some example
> instructions on how to run a "portable" pipeline.
>
> I added support for impulse in non portable pipeline execution to Flink in
> https://github.com/apache/beam/pull/12708 which will be available in
> 2.25.0 release (this release is currently underway).
>
> On Mon, Oct 5, 2020 at 11:28 AM Praveen K Viswanathan <
> harish.prav...@gmail.com> wrote:
>
>> Thanks for sharing the tool Tomo. I will give it a try and let you know.
>>
>> Regards,
>> Praveen
>>
>> On Fri, Oct 2, 2020 at 8:52 PM Tomo Suzuki <suzt...@google.com> wrote:
>>
>>> I suspect your dependencies have conflict. I develop Linkage Checker
>>> enforcer rule to identify incompatible dependencies. Do you want to give it
>>> a try?
>>>
>>> https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/Linkage-Checker-Enforcer-Rule
>>>
>>> Regards,
>>> Tomo
>>>
>>> On Fri, Oct 2, 2020 at 21:34 Praveen K Viswanathan <
>>> harish.prav...@gmail.com> wrote:
>>>
>>>> Hi - We have a beam pipeline reading and writing using an SDF based IO
>>>> connector working fine in a local machine using Direct Runner or Flink
>>>> Runner. However when we build an image of that pipeline along with Flink
>>>> and deploy in a cluster we get below exception.
>>>>
>>>> ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    -
>>>>> Unhandled exception.
>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>>>>> program caused an error:
>>>>>
>>>>> Classpath:
>>>>> [file:/opt/flink/flink-web-upload/Booster-bundled-1.0-SNAPSHOT.jar]
>>>>> System.out: (none)
>>>>> System.err: (none)
>>>>>     at
>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.generateException(OptimizerPlanEnvironment.java:149)
>>>>>     at
>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:89)
>>>>>     at
>>>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
>>>>>     at
>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
>>>>>     at
>>>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
>>>>>     at
>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
>>>>>     at
>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>>>>>     at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>     at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION
>>>>>     at
>>>>> org.apache.beam.runners.core.construction.PTransformTranslation.<clinit>(PTransformTranslation.java:199)
>>>>>     at
>>>>> org.apache.beam.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:264)
>>>>>     at
>>>>> org.apache.beam.sdk.Pipeline$2.enterCompositeTransform(Pipeline.java:272)
>>>>>     at
>>>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:653)
>>>>>     at
>>>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
>>>>>     at
>>>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
>>>>>     at
>>>>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
>>>>>     at
>>>>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463)
>>>>>     at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:262)
>>>>>     at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:212)
>>>>>     at
>>>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:115)
>>>>>     at
>>>>> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:82)
>>>>>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
>>>>>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
>>>>>     at com.org.cx.signals.Booster.main(Booster.java:278)
>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>     at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>     at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>     at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>>>>     at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>>>>     at
>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79)
>>>>>     ... 8 more
>>>>
>>>>
>>>> In our pom.xml we have created a profile for flink-runner as shown
>>>> below.
>>>>
>>>> <profiles>
>>>>>    <profile>
>>>>>       <id>flink-runner</id>
>>>>>          <!-- Makes the FlinkRunner available when running a pipeline.
>>>>> -->
>>>>>          <dependencies>
>>>>>           <dependency>
>>>>>            <groupId>org.apache.beam</groupId>
>>>>>            <artifactId>beam-runners-flink-1.10</artifactId>
>>>>>            <version>2.21.0</version>
>>>>>           <!-- <scope>runtime</scope> -->
>>>>>       </dependency>
>>>>>   </dependencies>
>>>>>   </profile>
>>>>> </profiles>
>>>>
>>>>
>>>> And the docker image has below flink version
>>>>
>>>> FROM flink:1.10.0-scala_2.12
>>>>
>>>>
>>>> Both our pipeline and SDF based IO connector are on Beam 2.23.0
>>>> version. Appreciate if you can guide us on what is causing this exception.
>>>>
>>>> --
>>>> Thanks,
>>>> Praveen K Viswanathan
>>>>
>>> --
>>> Regards,
>>> Tomo
>>>
>>
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

-- 
Thanks,
Praveen K Viswanathan

Reply via email to