Hi Hans, Thanks! I probably read that at some point, but the "Notice" modal popping up when closing the "Merge join" dialogue probably convinced me otherwise: "If the incoming data is not sorted on the specified keys, the output results may not be correct. We recommend sorting the incoming data within the pipeline."
I'm testing it in my pipeline now and am getting a stack trace (see below). The "site_id" field is from an "Avro decode" transform and is a plain Integer. Using the local runner and writing to Postgres this works fine. cheers Fabian 2022/09/02 09:20:20 - General - ERROR: Error starting the Beam pipeline 2022/09/02 09:20:20 - General - ERROR: org.apache.hop.core.exception.HopException: 2022/09/02 09:20:20 - General - Error executing pipeline with runner Direct 2022/09/02 09:20:20 - General - java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow 2022/09/02 09:20:20 - General - 2022/09/02 09:20:20 - General - at org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258) 2022/09/02 09:20:20 - General - at org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305) 2022/09/02 09:20:20 - General - at java.base/java.lang.Thread.run(Thread.java:829) 2022/09/02 09:20:20 - General - Caused by: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow 2022/09/02 09:20:20 - General - at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373) 2022/09/02 09:20:20 - General - at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341) 2022/09/02 09:20:20 - General - at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218) 2022/09/02 09:20:20 - General - at org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246) 2022/09/02 09:20:20 - General - ... 2 more 2022/09/02 09:20:20 - General - Caused by: java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow 2022/09/02 09:20:20 - General - at org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:126) 2022/09/02 09:20:20 - General - at org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37) 2022/09/02 09:20:20 - General - at org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41) 2022/09/02 09:20:20 - General - at org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232) 2022/09/02 09:20:20 - General - Caused by: org.apache.hop.core.exception.HopValueException: 2022/09/02 09:20:20 - General - Unexpected conversion error while converting value [site_id Integer] to an Integer 2022/09/02 09:20:20 - General - class java.lang.String cannot be cast to class java.lang.Long (java.lang.String and java.lang.Long are in module java.base of loader 'bootstrap') 2022/09/02 09:20:20 - General - 2022/09/02 09:20:20 - General - at org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2164) 2022/09/02 09:20:20 - General - at org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:96) 2022/09/02 09:20:20 - General - at org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37) 2022/09/02 09:20:20 - General - at org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41) 2022/09/02 09:20:20 - General - at org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232) 2022/09/02 09:20:20 - General - at org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown Source) 2022/09/02 09:20:20 - General - at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228) 2022/09/02 09:20:20 - General - at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184) 2022/09/02 09:20:20 - General - at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:88) 2022/09/02 09:20:20 - General - at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244) 2022/09/02 09:20:20 - General - at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54) 2022/09/02 09:20:20 - General - at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165) 2022/09/02 09:20:20 - General - at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129) 2022/09/02 09:20:20 - General - at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 2022/09/02 09:20:20 - General - at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 2022/09/02 09:20:20 - General - at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 2022/09/02 09:20:20 - General - at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 2022/09/02 09:20:20 - General - at java.base/java.lang.Thread.run(Thread.java:829) 2022/09/02 09:20:20 - General - Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Long (java.lang.String and java.lang.Long are in module java.base of loader 'bootstrap') 2022/09/02 09:20:20 - General - at org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2077) 2022/09/02 09:20:20 - General - ... 17 more > Am 02.09.2022 um 08:58 schrieb Hans Van Akelyen <[email protected]>: > > Hi Fabian, > > Merge join does not require your data to be sorted when executing on Beam > https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms > > <https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms> > > Chases, > Hans > > On Fri, 2 Sep 2022 at 08:34, Fabian Peters <[email protected] > <mailto:[email protected]>> wrote: > Good morning Matt, > > Thanks for your quick reply! Unfortunately the inputs are not sorted, so the > Merge Join transform is not an option. I guess I'll have to use temporary > BigQuery tables to handle this. Those pipelines are all bounded, so this is > an option. Or is there an easy option to sort things when running on Beam? > > I'll create a Jira ticket, no problem. > > cheers > > Fabian > >> Am 01.09.2022 um 19:11 schrieb Matt Casters <[email protected] >> <mailto:[email protected]>>: >> >> Hi Fabian, >> >> Joining rows is indeed the exception in Beam. I would suggest you use the >> Merge Join >> <https://hop.apache.org/manual/latest/pipeline/transforms/mergejoin.html> >> transforms. >> For unbounded pipelines (never ending) that transform will be handled >> <https://github.com/apache/hop/blob/master/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java> >> correctly. >> If you don't mind, please create a JIRA case so we can create a similar >> handler for the Cartesian product use-case. >> The code usually is non-trivial in the massive parallel world but quite >> doable ;-) >> >> All the best, >> Matt >> >> >> On Thu, Sep 1, 2022 at 6:37 PM Fabian Peters <[email protected] >> <mailto:[email protected]>> wrote: >> Hi all, >> >> I've hit the next problem, this time something I thought I had testet on >> Beam before: A pipeline containing a "Join rows (cartesian product)" >> transform with input from two sources, loops forever when run via >> Beam-Direct or Dataflow. It works fine using the local runner. >> >> While running it on Beam-Direct I've attached a debugger and can see that it >> is stuck in the while loop at JoinRows.java:486 >> <https://github.com/apache/hop/blob/758c07c360c26c0447251f0a29df81557864ad11/plugins/transforms/joinrows/src/main/java/org/apache/hop/pipeline/transforms/joinrows/JoinRows.java#L487>. >> I've tried using a GCS temp directory and swapped the "Main transform to >> read from" but none of those helped. >> >> Is this transform incompatible with Beam? If so, what could I use instead? >> >> cheers >> >> Fabian >> >> <PastedGraphic-8.png> > >> >> >> -- >> Neo4j Chief Solutions Architect >> ✉ [email protected] <mailto:[email protected]> >> >> >> >
