Hi Hans, That stack trace was due to user error and I'm glad it was. ;) There was one "id" field to many coming into the merge join…
Sorry for the noise! cheers Fabian > Am 02.09.2022 um 09:31 schrieb Fabian Peters <[email protected]>: > > Hi Hans, > > Thanks! I probably read that at some point, but the "Notice" modal popping up > when closing the "Merge join" dialogue probably convinced me otherwise: "If > the incoming data is not sorted on the specified keys, the output results may > not be correct. We recommend sorting the incoming data within the pipeline." > > I'm testing it in my pipeline now and am getting a stack trace (see below). > The "site_id" field is from an "Avro decode" transform and is a plain > Integer. Using the local runner and writing to Postgres this works fine. > > cheers > > Fabian > > 2022/09/02 09:20:20 - General - ERROR: Error starting the Beam pipeline > 2022/09/02 09:20:20 - General - ERROR: > org.apache.hop.core.exception.HopException: > 2022/09/02 09:20:20 - General - Error executing pipeline with runner Direct > 2022/09/02 09:20:20 - General - java.lang.RuntimeException: Error converting > HopRow to BigQuery TableRow > 2022/09/02 09:20:20 - General - > 2022/09/02 09:20:20 - General - at > org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258) > 2022/09/02 09:20:20 - General - at > org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305) > 2022/09/02 09:20:20 - General - at > java.base/java.lang.Thread.run(Thread.java:829) > 2022/09/02 09:20:20 - General - Caused by: > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow > 2022/09/02 09:20:20 - General - at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373) > 2022/09/02 09:20:20 - General - at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341) > 2022/09/02 09:20:20 - General - at > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218) > 2022/09/02 09:20:20 - General - at > org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246) > 2022/09/02 09:20:20 - General - ... 2 more > 2022/09/02 09:20:20 - General - Caused by: java.lang.RuntimeException: Error > converting HopRow to BigQuery TableRow > 2022/09/02 09:20:20 - General - at > org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:126) > 2022/09/02 09:20:20 - General - at > org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37) > 2022/09/02 09:20:20 - General - at > org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41) > 2022/09/02 09:20:20 - General - at > org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232) > 2022/09/02 09:20:20 - General - Caused by: > org.apache.hop.core.exception.HopValueException: > 2022/09/02 09:20:20 - General - Unexpected conversion error while converting > value [site_id Integer] to an Integer > 2022/09/02 09:20:20 - General - class java.lang.String cannot be cast to > class java.lang.Long (java.lang.String and java.lang.Long are in module > java.base of loader 'bootstrap') > 2022/09/02 09:20:20 - General - > 2022/09/02 09:20:20 - General - at > org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2164) > 2022/09/02 09:20:20 - General - at > org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:96) > 2022/09/02 09:20:20 - General - at > org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37) > 2022/09/02 09:20:20 - General - at > org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41) > 2022/09/02 09:20:20 - General - at > org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232) > 2022/09/02 09:20:20 - General - at > org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown > Source) > 2022/09/02 09:20:20 - General - at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228) > 2022/09/02 09:20:20 - General - at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184) > 2022/09/02 09:20:20 - General - at > org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:88) > 2022/09/02 09:20:20 - General - at > org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244) > 2022/09/02 09:20:20 - General - at > org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54) > 2022/09/02 09:20:20 - General - at > org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165) > 2022/09/02 09:20:20 - General - at > org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129) > 2022/09/02 09:20:20 - General - at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > 2022/09/02 09:20:20 - General - at > java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > 2022/09/02 09:20:20 - General - at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > 2022/09/02 09:20:20 - General - at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > 2022/09/02 09:20:20 - General - at > java.base/java.lang.Thread.run(Thread.java:829) > 2022/09/02 09:20:20 - General - Caused by: java.lang.ClassCastException: > class java.lang.String cannot be cast to class java.lang.Long > (java.lang.String and java.lang.Long are in module java.base of loader > 'bootstrap') > 2022/09/02 09:20:20 - General - at > org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2077) > 2022/09/02 09:20:20 - General - ... 17 more > >> Am 02.09.2022 um 08:58 schrieb Hans Van Akelyen <[email protected] >> <mailto:[email protected]>>: >> >> Hi Fabian, >> >> Merge join does not require your data to be sorted when executing on Beam >> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms >> >> <https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms> >> >> Chases, >> Hans >> >> On Fri, 2 Sep 2022 at 08:34, Fabian Peters <[email protected] >> <mailto:[email protected]>> wrote: >> Good morning Matt, >> >> Thanks for your quick reply! Unfortunately the inputs are not sorted, so the >> Merge Join transform is not an option. I guess I'll have to use temporary >> BigQuery tables to handle this. Those pipelines are all bounded, so this is >> an option. Or is there an easy option to sort things when running on Beam? >> >> I'll create a Jira ticket, no problem. >> >> cheers >> >> Fabian >> >>> Am 01.09.2022 um 19:11 schrieb Matt Casters <[email protected] >>> <mailto:[email protected]>>: >>> >>> Hi Fabian, >>> >>> Joining rows is indeed the exception in Beam. I would suggest you use the >>> Merge Join >>> <https://hop.apache.org/manual/latest/pipeline/transforms/mergejoin.html> >>> transforms. >>> For unbounded pipelines (never ending) that transform will be handled >>> <https://github.com/apache/hop/blob/master/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java> >>> correctly. >>> If you don't mind, please create a JIRA case so we can create a similar >>> handler for the Cartesian product use-case. >>> The code usually is non-trivial in the massive parallel world but quite >>> doable ;-) >>> >>> All the best, >>> Matt >>> >>> >>> On Thu, Sep 1, 2022 at 6:37 PM Fabian Peters <[email protected] >>> <mailto:[email protected]>> wrote: >>> Hi all, >>> >>> I've hit the next problem, this time something I thought I had testet on >>> Beam before: A pipeline containing a "Join rows (cartesian product)" >>> transform with input from two sources, loops forever when run via >>> Beam-Direct or Dataflow. It works fine using the local runner. >>> >>> While running it on Beam-Direct I've attached a debugger and can see that >>> it is stuck in the while loop at JoinRows.java:486 >>> <https://github.com/apache/hop/blob/758c07c360c26c0447251f0a29df81557864ad11/plugins/transforms/joinrows/src/main/java/org/apache/hop/pipeline/transforms/joinrows/JoinRows.java#L487>. >>> I've tried using a GCS temp directory and swapped the "Main transform to >>> read from" but none of those helped. >>> >>> Is this transform incompatible with Beam? If so, what could I use instead? >>> >>> cheers >>> >>> Fabian >>> >>> <PastedGraphic-8.png> >> >>> >>> >>> -- >>> Neo4j Chief Solutions Architect >>> ✉ [email protected] <mailto:[email protected]> >>> >>> >>> >> >
