Hi All,

We are running apache beam python batch pipeline and have code running on 
Dataflow and Flink. When we run the code in flink we are getting the below 
error. We are using apache beam 2.34.0, flink 1.12.x and python 3.6.8 anything 
help on the error will be awesome.

P.S after some digging we came across this issue 
https://issues.apache.org/jira/browse/BEAM-6523<https://urldefense.com/v3/__https:/issues.apache.org/jira/browse/BEAM-6523__;!!LSAcJDlP!k8a_Fn7qFQEhW_rVl3gQ3Kze4tfgO6O9zZ-fSg3gRi3kPC4fKJ1cFsZEekqj-EBJEQ$>
 it does look like this would be our fix any idea if this has been implemented 
already ?

The error
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[2022-02-23 21:24:06,844] [root] [DEBUG]: 
org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of 
different types. 
Input1=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(KvCoder(StringUtf8Coder
engthPrefixCoder(ByteArrayCoder)),GlobalWindow$Coder)}, 
input2=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(LengthPrefixCoder(ByteArrayCoder),GlobalWindow$Coder)}
        at 
org.apache.flink.api.java.operators.UnionOperator.<init>(UnionOperator.java<https://urldefense.com/v3/__http:/UnionOperator.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsscidiD-Y_g$>:48)
        at 
org.apache.flink.api.java.DataSet.union(DataSet.java<https://urldefense.com/v3/__http:/DataSet.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsscgvTo-I6g$>:1242)
        at 
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateFlatten(FlinkBatchPortablePipelineTranslator.java<https://urldefense.com/v3/__http:/FlinkBatchPortablePipelineTranslator.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssci34tdrWg$>:440)
        at 
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java<https://urldefense.com/v3/__http:/FlinkBatchPortablePipelineTranslator.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssci34tdrWg$>:272)
        at 
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java<https://urldefense.com/v3/__http:/FlinkBatchPortablePipelineTranslator.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssci34tdrWg$>:118)
        at 
org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java<https://urldefense.com/v3/__http:/FlinkPipelineRunner.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsscjThJprxg$>:115)
        at 
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java<https://urldefense.com/v3/__http:/FlinkPipelineRunner.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsscjThJprxg$>:85)
        at 
org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java<https://urldefense.com/v3/__http:/JobInvocation.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsschC5mkdaQ$>:86)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java<https://urldefense.com/v3/__http:/TrustedListenableFutureTask.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssciUVxhmtg$>:125)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java<https://urldefense.com/v3/__http:/InterruptibleTask.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsscgfMnxEyQ$>:57)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java<https://urldefense.com/v3/__http:/TrustedListenableFutureTask.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssciUVxhmtg$>:78)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java<https://urldefense.com/v3/__http:/ThreadPoolExecutor.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssci1tp0icw$>:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java<https://urldefense.com/v3/__http:/ThreadPoolExecutor.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssci1tp0icw$>:628)
        at 
java.base/java.lang.Thread.run(Thread.java<https://urldefense.com/v3/__http:/Thread.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsschWTnxtcw$>:829)

[2022-02-23 21:24:06,845] [root] [ERROR]: 
org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of 
different types. 
Input1=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(KvCoder(StringUtf8Coder
engthPrefixCoder(ByteArrayCoder)),GlobalWindow$Coder)}, 
input2=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(LengthPrefixCoder(ByteArrayCoder),GlobalWindow$Coder)}
[2022-02-23 21:24:06,996] [apache_beam.runners.portability.portable_runner] 
[INFO]: Job state changed to FAILED
Traceback (most recent call last):
  File "BeamInputPreparation_debug.py", line 37, in <module>
    main()
  File "BeamInputPreparation_debug.py", line 33, in main
    pipeline.run(sys.argv)
  File 
"/ravencr/NOLA2_PYTHON_PIPELINE_2.3.b1361.3/nola2x/apache_beam/pipeline/beam_input_prep_pipeline_debug.py",
 line 544, in run
    | "PersistSettings/Save" >> beam.ParDo(WriteByKeyDoFn(OUTPUT), backup=True, 
path_sep=PATH_SEP)
  File 
"/ravencr/NOLA2_PYTHON_PIPELINE_2.3.b1361.3/venv/lib/python3.6/site-packages/apache_beam/runners/portability/portable_runner.py",
 line 600, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline nola2_input_prep_f5c0c22e-c8b6-43d1-a889-fde8ad8decbf 
failed in state FAILED: org.apache.flink.api.common.InvalidProgramException: 
Cannot union inputs of different types. Input1=CoderTypeInformation{coder=W
dowedValue$FullWindowedValueCoder(KvCoder(StringUtf8Coder,LengthPrefixCoder(ByteArrayCoder)),GlobalWindow$Coder)},
 
input2=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(LengthPrefixCoder(ByteArrayCoder),GlobalWin
w$Coder)}
[2022-02-23 21:24:07,003] [root] [DEBUG]: Sending SIGINT to job_server
(venv) 
root@ravencr-jobmanager-677cbd59-h6rt7:/ravencr/NOLA2_PYTHON_PIPELINE_2.3.b1361.3#<mailto:root@ravencr-jobmanager-677cbd59-h6rt7:/ravencr/NOLA2_PYTHON_PIPELINE_2.3.b1361.3>
 command terminated with exit code 137


------------------------------------------------------------------------------------------------------------------------------------------



PUBLIC

-----------------------------------------
SAVE PAPER - THINK BEFORE YOU PRINT!

This E-mail is confidential. 

It may also be legally privileged. If you are not the addressee you may not 
copy,
forward, disclose or use any part of it. If you have received this message in 
error,
please delete it and all copies from your system and notify the sender 
immediately by
return E-mail.

Internet communications cannot be guaranteed to be timely secure, error or 
virus-free.
The sender does not accept liability for any errors or omissions.

Reply via email to