Hi Cameron,

Thanks for bringing this up on the dev list. I'm quite familiar with Beam
schemas, but I should be clear I'm not that familiar with
Dataflow's pipeline update. +Reuven Lax <re...@google.com> may need to
check me there.

> I am curious if it has been determined what makes a Schema the same as
another schema. From what I have seen in the codebase, it changes.

You're right, schema equality means different things in different contexts,
and we should be more clear about this. As I understand it, for pipeline
update the important thing isn't so much whether the schemas are actually
equal, but whether data encoded with the old schema can be understood by a
SchemaCoder referencing the new schema, because it's probable that the new
SchemaCoder will receive data that was encoded with the old SchemaCoder. In
order to satisfy that requirement, the old and the new schemas must have
the same fields* *in the same order*.
It might not seem like maintaining the ordering is an issue, but it is for
schemas inferred from Java types. That's because there's no guarantee about
the order in which we'll discover the fields or methods when using
reflection APIs. I believe Reuven did some experiments here and found that
the ordering is essentially random, so when we infer a schema from a Java
type in two different executions it can result in two completely different
field orders.

There are a couple of things we definitely need to do on the Beam side to
support pipeline update for SchemaCoder with possibly out-of-order fields:
- BEAM-10277: Java's RowCoder needs to respect the encoding_position field
in the schema proto. This provides a layer of indirection for field
ordering that runners can modify to "fix" schemas that have the same fields
but out of order.
- Java's SchemaCoder needs to encode the schema in a portable way, so that
runners will be able to inspect and modify the schema proto as described
above. Currently SchemaCoder is still represented in the pipeline proto as
a serialized Java class, so runners can't easily inspect/modify it.



All that being said, it looks like you may not be using SchemaCoder with a
schema inferred from a Java type. Where is `outputSchema` coming from? Is
it possible to make sure it maintains a consistent field order?
If you can do that, this may be an easier problem. I think then we could
make a change on the Dataflow side to ignore the schema's UUID when
checking for update compatibility.
On the other hand, if you need to support pipeline update for schemas with
out-of-order fields, we'd need to address the above tasks first. If you're
willing to work on them I can help direct you, these are things I've been
hoping to work on but haven't been able to get to.

Brian

* Looking forward we don't actually want to require the schemas to have the
same fields, we could allow adding/removing fields with certain limitations.

On Thu, Oct 8, 2020 at 12:55 PM Cameron Morgan <cameron.mor...@shopify.com>
wrote:

> Hey everyone,
>
> *Summary: *
>
> There is an issue with the Dataflow runner and the “Update” capability
> while using the beam native Row type, which I imagine also blocks the
> snapshots feature (as the docs say the snapshots have the same restrictions
> as the Update feature) but I have no experience there.
>
> Currently when reading from KafkaIO with the valueCoder set as a
> SchemaCoder:
>
> ```
> KafkaIO.Read<ByteArray, Row>()
>     .withTopic(topic)
>     .withKeyDeserializer(ByteArrayDeserializer::class.java)
>     .withValueDeserializerAndCoder([Deserializer<Row>
> ], SchemaCoder.of(outputSchema))
> ```
>
> Updates fail consistently with the error:
> ```
> The original job has not been aborted., The Coder or type for step
> ReadInputTopic/Read(KafkaUnboundedSource)/DataflowRunner.StreamingUnboundedRead.ReadWithIds
> has changed
> ```
>
> There is an open issue about this,
> https://issues.apache.org/jira/browse/BEAM-9502 but I have not seen it
> discussed in the mailing list so I wanted to start it.
>
> *Investigation so far: *
>
> This failing on Beam 2.20 and below makes sense, as before the code path
> that called equals on this Coder first checked that the schema’s were equal
> (This part has not changed): 
> *https://github.com/apache/beam/blob/release-2.25.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java#L194
> <https://github.com/apache/beam/blob/release-2.25.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java#L194>.*
> Then this called the equals on the schema here, which if UUIDs were
> different caused false to be returned:
> https://github.com/apache/beam/blob/release-2.20.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L272
>
> This means that as the above issue suggests, the UUID being random meant
> that no two SchemaCoders were ever the same, causing the equals to return
> false.
>
>
>
> In [BEAM-4076] this was changed (PR link:
> https://github.com/apache/beam/pull/11041/files, direct link:
> https://github.com/reuvenlax/incubator-beam/blob/37b1fbaa7ea1b64303eaacae9e670797a92b1a50/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L272
>  )
> So now, it appears that if the UUIDs don’t match, it still checks the
> fieldIndices, fields and fieldOptions. Reasonably, this should mean that
> we don’t get the incompatibility error, but is potentially slower than
> would be ideal.
>
> But we are still getting the same error on Beam 2.24.
>
> I added a way to deterministically generate a UUID which we do on the
> schema we use in
> `.withValueDeserializerAndCoder([Deserializer<Row>], 
> SchemaCoder.of(outputSchema))`.
>
> When we do not set the Schema UUID it fails with the same error every time
> so it is difficult to log the SchemaCoder.
>
> When we do set the schema UUID deterministically, it fails sometimes and
> other times succeeds. When it does succeed, logging the coder, the fields
> are the same in the same order with the same UUID on the schema. When I
> launch a new job, the logging prints the same coder as in iteration 1,
> stumping me on why the update failed at all. I need some advice here.
>
> *Follow up discussion:*
>
> I am curious if it has been determined what makes a Schema the same as
> another schema. From what I have seen in the codebase, it changes.
>
> In this PR: https://github.com/apache/beam/pull/13049 I have generated a
> Schema UUID based on the the field names, typeNames, and nullability. Let
> me know if this is the right direction.
>
> I think this UUID is worth making deterministic even if it doesn’t solve
> the Dataflow update issues, so that comparison is faster.
>
> There was a previous draft PR but I got busy and didn’t finish:
> https://github.com/apache/beam/pull/11447#issuecomment-615442205, which
> had the reply:
> ```
> I would like to talk about the use case a bit more. I think this is
> probably to do with update support on the Dataflow runner, in which case
> this may not be the right solution (and may not be a sufficient change).
> ```
>
> Follow up questions that may have overlap:
>
>    - This is the only issue related to Dataflow updates with the Row type
>    I could see, but are there more internally, and if so, can they be made
>    public so that we can work on them?
>    - I know Row type is “Experimental” but I have only seen discussions
>    that DATETIME should be removed before removing that tag. What other issues
>    and blockers are required for this to happen, if there are any?
>    - Is this the totally wrong approach/wrong place for this discussion?
>    - Should SchemaOptions and field descriptions be included to create a
>    UUID? I don’t think they should, so that descriptions and options can be
>    updated (just my naive opinion, to be changed).
>    - Perhaps there are greater plans for schemas. Can this (or something
>    similar) merge as a stop gap?
>
>
> This is my first message to the mailing list, so I apologize if any info
> is missing.
>
> Thanks,
> Cameron
>
>
>
>
>
>
>
>
>

Reply via email to