Hi All, We are running apache beam python batch pipeline and have code running on Dataflow and Flink. When we run the code in flink we are getting the below error. We are using apache beam 2.34.0, flink 1.12.x and python 3.6.8 anything help on the error will be awesome.
P.S after some digging we came across this issue https://issues.apache.org/jira/browse/BEAM-6523<https://urldefense.com/v3/__https:/issues.apache.org/jira/browse/BEAM-6523__;!!LSAcJDlP!k8a_Fn7qFQEhW_rVl3gQ3Kze4tfgO6O9zZ-fSg3gRi3kPC4fKJ1cFsZEekqj-EBJEQ$> it does look like this would be our fix any idea if this has been implemented already ? The error --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [2022-02-23 21:24:06,844] [root] [DEBUG]: org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of different types. Input1=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(KvCoder(StringUtf8Coder engthPrefixCoder(ByteArrayCoder)),GlobalWindow$Coder)}, input2=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(LengthPrefixCoder(ByteArrayCoder),GlobalWindow$Coder)} at org.apache.flink.api.java.operators.UnionOperator.<init>(UnionOperator.java<https://urldefense.com/v3/__http:/UnionOperator.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsscidiD-Y_g$>:48) at org.apache.flink.api.java.DataSet.union(DataSet.java<https://urldefense.com/v3/__http:/DataSet.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsscgvTo-I6g$>:1242) at org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateFlatten(FlinkBatchPortablePipelineTranslator.java<https://urldefense.com/v3/__http:/FlinkBatchPortablePipelineTranslator.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssci34tdrWg$>:440) at org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java<https://urldefense.com/v3/__http:/FlinkBatchPortablePipelineTranslator.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssci34tdrWg$>:272) at org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java<https://urldefense.com/v3/__http:/FlinkBatchPortablePipelineTranslator.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssci34tdrWg$>:118) at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java<https://urldefense.com/v3/__http:/FlinkPipelineRunner.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsscjThJprxg$>:115) at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java<https://urldefense.com/v3/__http:/FlinkPipelineRunner.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsscjThJprxg$>:85) at org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java<https://urldefense.com/v3/__http:/JobInvocation.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsschC5mkdaQ$>:86) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java<https://urldefense.com/v3/__http:/TrustedListenableFutureTask.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssciUVxhmtg$>:125) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java<https://urldefense.com/v3/__http:/InterruptibleTask.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsscgfMnxEyQ$>:57) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java<https://urldefense.com/v3/__http:/TrustedListenableFutureTask.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssciUVxhmtg$>:78) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java<https://urldefense.com/v3/__http:/ThreadPoolExecutor.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssci1tp0icw$>:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java<https://urldefense.com/v3/__http:/ThreadPoolExecutor.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssci1tp0icw$>:628) at java.base/java.lang.Thread.run(Thread.java<https://urldefense.com/v3/__http:/Thread.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsschWTnxtcw$>:829) [2022-02-23 21:24:06,845] [root] [ERROR]: org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of different types. Input1=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(KvCoder(StringUtf8Coder engthPrefixCoder(ByteArrayCoder)),GlobalWindow$Coder)}, input2=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(LengthPrefixCoder(ByteArrayCoder),GlobalWindow$Coder)} [2022-02-23 21:24:06,996] [apache_beam.runners.portability.portable_runner] [INFO]: Job state changed to FAILED Traceback (most recent call last): File "BeamInputPreparation_debug.py", line 37, in <module> main() File "BeamInputPreparation_debug.py", line 33, in main pipeline.run(sys.argv) File "/ravencr/NOLA2_PYTHON_PIPELINE_2.3.b1361.3/nola2x/apache_beam/pipeline/beam_input_prep_pipeline_debug.py", line 544, in run | "PersistSettings/Save" >> beam.ParDo(WriteByKeyDoFn(OUTPUT), backup=True, path_sep=PATH_SEP) File "/ravencr/NOLA2_PYTHON_PIPELINE_2.3.b1361.3/venv/lib/python3.6/site-packages/apache_beam/runners/portability/portable_runner.py", line 600, in wait_until_finish raise self._runtime_exception RuntimeError: Pipeline nola2_input_prep_f5c0c22e-c8b6-43d1-a889-fde8ad8decbf failed in state FAILED: org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of different types. Input1=CoderTypeInformation{coder=W dowedValue$FullWindowedValueCoder(KvCoder(StringUtf8Coder,LengthPrefixCoder(ByteArrayCoder)),GlobalWindow$Coder)}, input2=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(LengthPrefixCoder(ByteArrayCoder),GlobalWin w$Coder)} [2022-02-23 21:24:07,003] [root] [DEBUG]: Sending SIGINT to job_server (venv) root@ravencr-jobmanager-677cbd59-h6rt7:/ravencr/NOLA2_PYTHON_PIPELINE_2.3.b1361.3#<mailto:root@ravencr-jobmanager-677cbd59-h6rt7:/ravencr/NOLA2_PYTHON_PIPELINE_2.3.b1361.3> command terminated with exit code 137 ------------------------------------------------------------------------------------------------------------------------------------------ PUBLIC ----------------------------------------- SAVE PAPER - THINK BEFORE YOU PRINT! This E-mail is confidential. It may also be legally privileged. If you are not the addressee you may not copy, forward, disclose or use any part of it. If you have received this message in error, please delete it and all copies from your system and notify the sender immediately by return E-mail. Internet communications cannot be guaranteed to be timely secure, error or virus-free. The sender does not accept liability for any errors or omissions.
