Hi Dale,

> Aren’t we already fairly dependent on the schema remaining consistent, 
> because otherwise we’d need to update the table schema as well?

No, because the schema can be updated with optional fields and
depending on the compatibility mode, Flink will just consume or
produce nulls in that case.

> I’m not sure what you mean here, sorry. Are you thinking about issues if you 
> needed to mix-and-match with both formatters at the same time? (Rather than 
> just using the Avro formatter as I was describing)

Flink doesn't distinguish a table being a source or a sink. If you
change the Avro format to support reading Schema Registry encoded Avro
format, you would also change it when writing it. However, in order to
write the proper Schema Registry Avro format, you need to have the
magic byte included.

I think the entire point of the Schema Registry Avro messages is that
there is a tight coupling towards a Schema Registry service; that's
the point of the format. I think opening up for alternative processing
is opening up a potential Pandora's box of issues that can be derived
from that: (de)serialization errors, issues with schema evolution
checks as a consumer or a producer etc. I don't see much value for the
Flink project to go in that direction, which would be supporting edge
cases anyway.

Best regards,

Martijn

On Wed, Nov 1, 2023 at 10:36 PM Dale Lane <dale.l...@uk.ibm.com> wrote:
>
> Thanks for the pointer to FLINK-33045 - I hadn’t spotted that. That sounds 
> like it’d address one possible issue (where someone using Flink shouldn’t be, 
> or perhaps doesn’t have access/permission to, register new schemas).
>
> I should be clear that I absolutely agree that using a schema registry is 
> optimum. It should be the norm – it should be the default, preferred and 
> recommended option.
>
> However, I think that there may still be times where the schema registry 
> isn’t available.
>
> Maybe you’re using a mirrored copy of the topic on another kafka cluster and 
> don’t have the original Kafka cluster’s schema registry available. Maybe 
> networking restrictions means where you are running Flink doesn’t have 
> connectivity to the schema registry. Maybe the developer using Flink doesn’t 
> have permission for or access to the schema registry. Maybe the schema 
> registry is currently unavailable. Maybe the developer using Flink is 
> developing their Flink job offline, disconnected from the environment where 
> the schema registry is running (ahead of in future deploying their finished 
> Flink job where it will have access to the schema registry).
>
> It is in such circumstances that I think the approach the avro formatter 
> offers is a useful fallback. Through the table schema, it lets you specify 
> the shape of your data, allowing you to process it without requiring an 
> external dependency.
>
> It seems to me that making it impossible to process Confluent Avro-encoded 
> messages without access to an additional external component is too strict a 
> restriction (as much as there are completely valid reasons for it to be a 
> recommendation).
>
> And, with a very small modification to the avro formatter, it’s a restriction 
> we could remove.
>
> Kind regards
>
> Dale
>
>
>
> From: Ryan Skraba <ryan.skr...@aiven.io.INVALID>
> Date: Monday, 30 October 2023 at 16:42
> To: dev@flink.apache.org <dev@flink.apache.org>
> Subject: [EXTERNAL] Re: [DISCUSS] Confluent Avro support without Schema 
> Registry access
> Hello!  I took a look at FLINK-33045, which is somewhat related: In
> that improvement, the author wants to control who registers schemas.
> The Flink job would know the Avro schema to use, and would look up the
> ID to use in framing the Avro binary.  It uses but never changes the
> schema registry.
>
> Here it sounds like you want nearly the same thing with one more step:
> if the Flink job is configured with the schema to use, it could also
> be pre-configured with the ID that the schema registry knows.
> Technically, it could be configured with a *set* of schemas mapped to
> their IDs when the job starts, but I imagine this would be pretty
> clunky.
>
> I'm curious if you can share what customer use cases wouldn't want
> access to the schema registry!  One of the reasons it exists is to
> prevent systems from writing unreadable or corrupted data to a Kafka
> topic (or other messaging system) -- which I think is what Martijn is
> asking about.  It's unlikely to be a performance gain from hiding it.
>
> Thanks for bringing this up for discussion!  Ryan
>
> [FLINK-33045]: https://issues.apache.org/jira/browse/FLINK-33045
> [Single Object Encoding]:
> https://avro.apache.org/docs/1.11.1/specification/_print/#single-object-encoding-specification
>
> On Fri, Oct 27, 2023 at 3:13 PM Dale Lane <dale.l...@uk.ibm.com> wrote:
> >
> > > if you strip the magic byte, and the schema has
> > > evolved when you're consuming it from Flink,
> > > you can end up with deserialization errors given
> > > that a field might have been deleted/added/
> > > changed etc.
> >
> > Aren’t we already fairly dependent on the schema remaining consistent, 
> > because otherwise we’d need to update the table schema as well?
> >
> > > it wouldn't work when you actually want to
> > > write avro-confluent, because that requires a
> > > check when producing if you're still being compliant.
> >
> > I’m not sure what you mean here, sorry. Are you thinking about issues if 
> > you needed to mix-and-match with both formatters at the same time? (Rather 
> > than just using the Avro formatter as I was describing)
> >
> > Kind regards
> >
> > Dale
> >
> >
> >
> > From: Martijn Visser <martijnvis...@apache.org>
> > Date: Friday, 27 October 2023 at 14:03
> > To: dev@flink.apache.org <dev@flink.apache.org>
> > Subject: [EXTERNAL] Re: [DISCUSS] Confluent Avro support without Schema 
> > Registry access
> > Hi Dale,
> >
> > I'm struggling to understand in what cases you want to read data
> > serialized in connection with Confluent Schema Registry, but can't get
> > access to the Schema Registry service. It seems like a rather exotic
> > situation and it beats the purposes of using a Schema Registry in the
> > first place? I also doubt that it's actually really useful: if you
> > strip the magic byte, and the schema has evolved when you're consuming
> > it from Flink, you can end up with deserialization errors given that a
> > field might have been deleted/added/changed etc. Also, it wouldn't
> > work when you actually want to write avro-confluent, because that
> > requires a check when producing if you're still being compliant.
> >
> > Best regards,
> >
> > Martijn
> >
> > On Fri, Oct 27, 2023 at 2:53 PM Dale Lane <dale.l...@uk.ibm.com> wrote:
> > >
> > > TLDR:
> > > We currently require a connection to a Confluent Schema Registry to be 
> > > able to work with Confluent Avro data. With a small modification to the 
> > > Avro formatter, I think we could also offer the ability to process this 
> > > type of data without requiring access to the schema registry.
> > >
> > > What would people think of such an enhancement?
> > >
> > > -----
> > >
> > > When working with Avro data, there are two formats available to us: avro 
> > > and avro-confluent.
> > >
> > > avro
> > > Data it supports: Avro records
> > > Approach: You specify a table schema and it derives an appropriate Avro 
> > > schema from this.
> > >
> > > avro-confluent
> > > Data it supports: Confluent’s variant[1] of the Avro encoding
> > > Approach: You provide connection details (URL, credentials, 
> > > keystore/truststore, schema lookup strategy, etc.) for retrieving an 
> > > appropriate schema from the Confluent Schema Registry.
> > >
> > > What this means is if you have Confluent Avro data[2] that you want to 
> > > use in Flink, you currently have to use the avro-confluent format, and 
> > > that means you need to provide Flink with access to your Schema Registry.
> > >
> > > I think there will be times where you may not want, or may not be able, 
> > > to provide Flink with direct access to a Schema Registry. In such cases, 
> > > it would be useful to support the same behaviour that the avro format 
> > > does (i.e. allow you to explicitly specify a table schema)
> > >
> > > This could be achieved with a very minor modification to the avro 
> > > formatter.
> > >
> > > For reading records, we could add an option to the formatter to highlight 
> > > when records will be Confluent Avro. If that option is set, we just need 
> > > the formatter to skip the first bytes with the schema ID/version (it can 
> > > then use the remaining bytes with a regular Avro decoder as it does today 
> > > – the existing implementation would be essentially unchanged).
> > >
> > > For writing records, something similar would work. An option to the 
> > > formatter to highlight when to write records using Confluent Avro. We 
> > > would need a way to specify what ID value to use for the first bytes [3]. 
> > > (After that, the record can be encoded with a regular Avro encoder as it 
> > > does today – the rest of the implementation would be unchanged).
> > >
> > >
> > > -----
> > > [1] – This is the same as regular Avro, but prefixing the payload with 
> > > extra bytes that identify which schema to use, to allow an appropriate 
> > > schema to be retrieved from a schema registry.
> > >
> > > [2] – Records that were serialized by 
> > > io.confluent.kafka.serializers.KafkaAvroSerializer and could be read by 
> > > io.confluent.kafka.serializers.KafkaAvroDeserializer.
> > >
> > > [3] – Either by making them fixed options for that formatter, or by 
> > > allowing it to be specified from something in the record.
> > >
> > > Unless otherwise stated above:
> > >
> > > IBM United Kingdom Limited
> > > Registered in England and Wales with number 741598
> > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> >
> > Unless otherwise stated above:
> >
> > IBM United Kingdom Limited
> > Registered in England and Wales with number 741598
> > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU

Reply via email to