Hey everyone,

Summary: 

There is an issue with the Dataflow runner and the “Update” capability while 
using the beam native Row type, which I imagine also blocks the snapshots 
feature (as the docs say the snapshots have the same restrictions as the Update 
feature) but I have no experience there.

Currently when reading from KafkaIO with the valueCoder set as a SchemaCoder:

```
KafkaIO.Read<ByteArray, Row>()
    .withTopic(topic)
    .withKeyDeserializer(ByteArrayDeserializer::class.java)
    .withValueDeserializerAndCoder([Deserializer<Row>], 
SchemaCoder.of(outputSchema))
```

Updates fail consistently with the error:
```
The original job has not been aborted., The Coder or type for step 
ReadInputTopic/Read(KafkaUnboundedSource)/DataflowRunner.StreamingUnboundedRead.ReadWithIds
 has changed
```

There is an open issue about this, 
https://issues.apache.org/jira/browse/BEAM-9502 
<https://issues.apache.org/jira/browse/BEAM-9502> but I have not seen it 
discussed in the mailing list so I wanted to start it. 

Investigation so far: 

This failing on Beam 2.20 and below makes sense, as before the code path that 
called equals on this Coder first checked that the schema’s were equal (This 
part has not changed): 
https://github.com/apache/beam/blob/release-2.25.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java#L194
 
<https://github.com/apache/beam/blob/release-2.25.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java#L194>.
Then this called the equals on the schema here, which if UUIDs were different 
caused false to be returned: 
https://github.com/apache/beam/blob/release-2.20.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L272
 
<https://github.com/apache/beam/blob/release-2.20.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L272>

This means that as the above issue suggests, the UUID being random meant that 
no two SchemaCoders were ever the same, causing the equals to return false.



In [BEAM-4076] this was changed (PR link: 
https://github.com/apache/beam/pull/11041/files 
<https://github.com/apache/beam/pull/11041/files>, direct link: 
https://github.com/reuvenlax/incubator-beam/blob/37b1fbaa7ea1b64303eaacae9e670797a92b1a50/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L272
 
<https://github.com/reuvenlax/incubator-beam/blob/37b1fbaa7ea1b64303eaacae9e670797a92b1a50/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L272>
 )
So now, it appears that if the UUIDs don’t match, it still checks the 
fieldIndices, fields and fieldOptions. Reasonably, this should mean that we 
don’t get the incompatibility error, but is potentially slower than would be 
ideal.

But we are still getting the same error on Beam 2.24.

I added a way to deterministically generate a UUID which we do on the schema we 
use in `.withValueDeserializerAndCoder([Deserializer<Row>], 
SchemaCoder.of(outputSchema))`.

When we do not set the Schema UUID it fails with the same error every time so 
it is difficult to log the SchemaCoder.

When we do set the schema UUID deterministically, it fails sometimes and other 
times succeeds. When it does succeed, logging the coder, the fields are the 
same in the same order with the same UUID on the schema. When I launch a new 
job, the logging prints the same coder as in iteration 1, stumping me on why 
the update failed at all. I need some advice here.

Follow up discussion:

I am curious if it has been determined what makes a Schema the same as another 
schema. From what I have seen in the codebase, it changes.

In this PR: https://github.com/apache/beam/pull/13049 
<https://github.com/apache/beam/pull/13049> I have generated a Schema UUID 
based on the the field names, typeNames, and nullability. Let me know if this 
is the right direction.

I think this UUID is worth making deterministic even if it doesn’t solve the 
Dataflow update issues, so that comparison is faster. 

There was a previous draft PR but I got busy and didn’t finish: 
https://github.com/apache/beam/pull/11447#issuecomment-615442205, which had the 
reply:
```
I would like to talk about the use case a bit more. I think this is probably to 
do with update support on the Dataflow runner, in which case this may not be 
the right solution (and may not be a sufficient change).
```

Follow up questions that may have overlap:
This is the only issue related to Dataflow updates with the Row type I could 
see, but are there more internally, and if so, can they be made public so that 
we can work on them?
I know Row type is “Experimental” but I have only seen discussions that 
DATETIME should be removed before removing that tag. What other issues and 
blockers are required for this to happen, if there are any?
Is this the totally wrong approach/wrong place for this discussion?
Should SchemaOptions and field descriptions be included to create a UUID? I 
don’t think they should, so that descriptions and options can be updated (just 
my naive opinion, to be changed).
Perhaps there are greater plans for schemas. Can this (or something similar) 
merge as a stop gap?

This is my first message to the mailing list, so I apologize if any info is 
missing.

Thanks,
Cameron 








Reply via email to