So I think the following happens: 1. the schema tree is initialized at construction time. The tree get serialized and send to the workers 2. the workers deserialize the tree, but as the Timestamp logical type have a logical type with a *static* schema the schema will be *re-initialized without the UUID* (as it was never serialized) 3. this is why setting a fixed UUID at static initialization works
So solution is - as tested, se a fixed UUID - make the schema not static in the logical type _/ _/ Alex Van Boxel On Mon, Jan 13, 2020 at 8:08 PM Reuven Lax <[email protected]> wrote: > SchemaCoder today recursively sets UUIDs for all schemas, including > logical types, in setSchemaIds. Is it possible that your changes modified > that logic somehow? > > On Mon, Jan 13, 2020 at 9:39 AM Alex Van Boxel <[email protected]> wrote: > >> This is the stacktrace: >> >> >> java.lang.IllegalStateException at >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:491) >> at >> org.apache.beam.sdk.coders.RowCoderGenerator.getCoder(RowCoderGenerator.java:380) >> at >> org.apache.beam.sdk.coders.RowCoderGenerator.getCoder(RowCoderGenerator.java:371) >> at >> org.apache.beam.sdk.coders.RowCoderGenerator.createComponentCoders(RowCoderGenerator.java:337) >> at >> org.apache.beam.sdk.coders.RowCoderGenerator.generate(RowCoderGenerator.java:140) >> at >> org.apache.beam.sdk.schemas.SchemaCoder.getDelegateCoder(SchemaCoder.java:159) >> at org.apache.beam.sdk.schemas.SchemaCoder.toString(SchemaCoder.java:204) >> at java.lang.String.valueOf(String.java:2994) at >> java.lang.StringBuilder.append(StringBuilder.java:131) at >> org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300) >> at >> org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291) >> at >> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:623) >> at >> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:539) >> at >> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:400) >> at >> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125) >> at >> org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:64) >> at >> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43) >> at >> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) >> at >> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) >> at >> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) >> at >> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:411) >> at >> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:380) >> at >> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:305) >> at >> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) >> at >> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) >> at >> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:748) >> >> In red is the case for LogicalType. Setting it to a fixed value makes it >> run on Dataflow. Note that the pipeline works perfect on DirectRunner. >> >> _/ >> _/ Alex Van Boxel >> >> >> On Mon, Jan 13, 2020 at 6:06 PM Reuven Lax <[email protected]> wrote: >> >>> I don't think that should be the case. Also SchemaCoder will >>> automatically set the UUID for such logical types. >>> >>> On Mon, Jan 13, 2020 at 8:24 AM Alex Van Boxel <[email protected]> wrote: >>> >>>> OK, I've rechecked everything and eventually found the problem. The >>>> problem is when you use a LogicalType backed back a Row, then the UUID >>>> needs to be set to make it work. (this is the case for Proto based >>>> Timestamps). I'll create a fix. >>>> >>>> _/ >>>> _/ Alex Van Boxel >>>> >>>> >>>> On Mon, Jan 13, 2020 at 8:36 AM Reuven Lax <[email protected]> wrote: >>>> >>>>> Can you elucidate? All BeamSQL pipelines use schemas and I believe >>>>> those test are working just fine on the Dataflow runner. In addition, >>>>> there >>>>> are a number of ValidatesRunner schema-aware pipelines that are running >>>>> regularly on the Dataflow runner. >>>>> >>>>> On Sun, Jan 12, 2020 at 1:43 AM Alex Van Boxel <[email protected]> >>>>> wrote: >>>>> >>>>>> Hey all, >>>>>> >>>>>> anyone tried master with a *schema aware pipeline* on Dataflow? I'm >>>>>> testing some PR's to see if the run on Dataflow (as they are working on >>>>>> Direct) but they got: >>>>>> >>>>>> >>>>>> Workflow failed. Causes: The Dataflow job appears to be stuck because >>>>>> no worker activity has been seen in the last 1h. You can get help with >>>>>> Cloud Dataflow at >>>>>> >>>>>> because I got this I wanted to see if master (without my changes) >>>>>> also have the same behaviour. >>>>>> >>>>>> It's easy to simulate: Just read for BigQuery with: >>>>>> >>>>>> BigQueryIO.readTableRowsWithSchema() >>>>>> >>>>>> it works with the classic: readTableRows(). >>>>>> >>>>>> _/ >>>>>> _/ Alex Van Boxel >>>>>> >>>>>
