It's indeed the first Logical identifier with Row base type. The UUID is
generated from the name of the class, but doing it in code (from a string)
you need to create bytes from the string, then a UUID.

 _/
_/ Alex Van Boxel


On Mon, Jan 13, 2020 at 10:40 PM Brian Hulette <[email protected]> wrote:

> I guess these are the first logical types we've defined with a base type
> of row. It does seem reasonable that a static schema for a logical type
> could have some fixed id, but it feels odd to have a fixed UUID, it would
> be nice if we could give the schema some meaningful static identifier.
>
> I think Reuven was investigating adding support for "named" schemas in
> order to add support for recursive schemas, but ran into some issues. Maybe
> something like that is what we need here?
>
> On Mon, Jan 13, 2020 at 12:26 PM Alex Van Boxel <[email protected]> wrote:
>
>> Fix in this PR:
>>
>> [BEAM-9113] Fix serialization proto logical types
>> https://github.com/apache/beam/pull/10569
>>
>> or we all agree to *promote* the logical types to top-level logical
>> types (as described in the design document, see ticket):
>>
>> [BEAM-9037] Instant and duration as logical type
>> https://github.com/apache/beam/pull/10486
>>
>>
>>
>>  _/
>> _/ Alex Van Boxel
>>
>>
>> On Mon, Jan 13, 2020 at 8:40 PM Alex Van Boxel <[email protected]> wrote:
>>
>>> So I think the following happens:
>>>
>>>    1. the schema tree is initialized at construction time. The tree get
>>>    serialized and send to the workers
>>>    2. the workers deserialize the tree, but as the Timestamp logical
>>>    type have a logical type with a *static* schema the schema will be 
>>> *re-initialized
>>>    without the UUID* (as it was never serialized)
>>>    3. this is why setting a fixed UUID at static initialization works
>>>
>>> So solution is
>>>
>>>    - as tested, se a fixed UUID
>>>    - make the schema not static in the logical type
>>>
>>>  _/
>>> _/ Alex Van Boxel
>>>
>>>
>>> On Mon, Jan 13, 2020 at 8:08 PM Reuven Lax <[email protected]> wrote:
>>>
>>>> SchemaCoder today recursively sets UUIDs for all schemas, including
>>>> logical types, in setSchemaIds. Is it possible that your changes modified
>>>> that logic somehow?
>>>>
>>>> On Mon, Jan 13, 2020 at 9:39 AM Alex Van Boxel <[email protected]>
>>>> wrote:
>>>>
>>>>> This is the stacktrace:
>>>>>
>>>>>
>>>>> java.lang.IllegalStateException at
>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:491)
>>>>> at
>>>>> org.apache.beam.sdk.coders.RowCoderGenerator.getCoder(RowCoderGenerator.java:380)
>>>>> at
>>>>> org.apache.beam.sdk.coders.RowCoderGenerator.getCoder(RowCoderGenerator.java:371)
>>>>> at
>>>>> org.apache.beam.sdk.coders.RowCoderGenerator.createComponentCoders(RowCoderGenerator.java:337)
>>>>> at
>>>>> org.apache.beam.sdk.coders.RowCoderGenerator.generate(RowCoderGenerator.java:140)
>>>>> at
>>>>> org.apache.beam.sdk.schemas.SchemaCoder.getDelegateCoder(SchemaCoder.java:159)
>>>>> at org.apache.beam.sdk.schemas.SchemaCoder.toString(SchemaCoder.java:204)
>>>>> at java.lang.String.valueOf(String.java:2994) at
>>>>> java.lang.StringBuilder.append(StringBuilder.java:131) at
>>>>> org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
>>>>> at
>>>>> org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
>>>>> at
>>>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:623)
>>>>> at
>>>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:539)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:400)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:64)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:411)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:380)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:305)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>> In red is the case for LogicalType. Setting it to a fixed value makes
>>>>> it run on Dataflow. Note that the pipeline works perfect on DirectRunner.
>>>>>
>>>>>  _/
>>>>> _/ Alex Van Boxel
>>>>>
>>>>>
>>>>> On Mon, Jan 13, 2020 at 6:06 PM Reuven Lax <[email protected]> wrote:
>>>>>
>>>>>> I don't think that should be the case. Also SchemaCoder will
>>>>>> automatically set the UUID for such logical types.
>>>>>>
>>>>>> On Mon, Jan 13, 2020 at 8:24 AM Alex Van Boxel <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> OK, I've rechecked everything and eventually found the problem. The
>>>>>>> problem is when you use a LogicalType backed back a Row, then the UUID
>>>>>>> needs to be set to make it work. (this is the case for Proto based
>>>>>>> Timestamps). I'll create a fix.
>>>>>>>
>>>>>>>  _/
>>>>>>> _/ Alex Van Boxel
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jan 13, 2020 at 8:36 AM Reuven Lax <[email protected]> wrote:
>>>>>>>
>>>>>>>> Can you elucidate? All BeamSQL pipelines use schemas and I believe
>>>>>>>> those test are working just fine on the Dataflow runner. In addition, 
>>>>>>>> there
>>>>>>>> are a number of ValidatesRunner schema-aware pipelines that are running
>>>>>>>> regularly on the Dataflow runner.
>>>>>>>>
>>>>>>>> On Sun, Jan 12, 2020 at 1:43 AM Alex Van Boxel <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hey all,
>>>>>>>>>
>>>>>>>>> anyone tried master with a *schema aware pipeline* on Dataflow?
>>>>>>>>> I'm testing some PR's to see if the run on Dataflow (as they are 
>>>>>>>>> working on
>>>>>>>>> Direct) but they got:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Workflow failed. Causes: The Dataflow job appears to be stuck
>>>>>>>>> because no worker activity has been seen in the last 1h. You can get 
>>>>>>>>> help
>>>>>>>>> with Cloud Dataflow at
>>>>>>>>>
>>>>>>>>> because I got this I wanted to see if master (without my changes)
>>>>>>>>> also have the same behaviour.
>>>>>>>>>
>>>>>>>>> It's easy to simulate: Just read for BigQuery with:
>>>>>>>>>
>>>>>>>>> BigQueryIO.readTableRowsWithSchema()
>>>>>>>>>
>>>>>>>>> it works with the classic: readTableRows().
>>>>>>>>>
>>>>>>>>>  _/
>>>>>>>>> _/ Alex Van Boxel
>>>>>>>>>
>>>>>>>>

Reply via email to