Hi Stephen,

I don't think you should compare the DataType with the AvroSchema directly.
They are for different purposes and sometimes cannot be mapped in both
directions.

As of now, the following conversions are needed in Flink format:
1. Avro Schema -> Flink Table Schema (DataType). This is required when
registering the Flink table.
2. Flink Table Schema (DataType) -> Avro Schema. This is because after
projection pushdown, maybe only some of the fields need to be read from the
Avro record. So Flink Avro format needs to generate an Avro reader schema
from the projected fields represented in DataType.

The issue today is when you convert an AvroSchema_A in step 1 to get the
DataType, and try to convert that DataType back to AvroSchema_B,
AvroSchema_A and AvroSchema_B are not compatible. The idea is to use the
original AvroSchema_A as the assistance in step 2, so that AvroSchema_A and
AvroSchema_B are compatible. In your case, the Avro schema stored in the
schema registry will be that original Avro schema, i.e. AvroSchema_A.

Thanks,

Jiangjie (Becket) Qin

On Wed, Sep 6, 2023 at 8:32 PM 吴 stephen <rstep...@live.cn> wrote:

> Hi Becket,
> I notice that a new config will introduce to Avro Format and user can
> input their own schema. Since the user can input their schema , should Avro
> Format support a validation utils that validate whether the input schema is
> compatible with table columns?
>
> I’m modifying the Avro-Confulent Format in my team and want to make it
> serialize/deserialize by the schema exists on the schema-registry instead
> of using the schema generate by datatype. And I am thinking how to compare
> the datatype from the ddl with Avro schema. As I see the
> AvroSchemaConverter can transfer the Avro schema to datatype, can
> validation be simple as to judge whether the dataype from ddl is equal to
> datatype from Avro schema? If no, may I ask what's your opinion about the
> validation.
>
> I'm interested in the flip. If there's anything I can help with, please
> feel free to reach out to me.
>
> Best regards,
> Stephen
>
>
> > 2023年9月5日 下午3:15,Becket Qin <becket....@gmail.com> 写道:
> >
> > Hi Jing,
> >
> > Thanks for the comments.
> >
> > 1. "For the batch cases, currently the BulkFormat for DataStream is
> >> missing" - true, and there is another option to leverage
> >> StreamFormatAdapter[1]
> >>
> > StreamFormatAdapter is internal and it requires a StreamFormat
> > implementation for Avro files which does not exist either.
> >
> > 2. "The following two interfaces should probably be marked as Public for
> >> now and Deprecated once we deprecate the InputFormat / OutputFormat" -
> >> would you like to share some background info of the deprecation of the
> >> InputFormat / OutputFormat? It is for me a little bit weird to mark
> APIs as
> >> public that are now known to be deprecated.
> >
> > InputFormat and OutputFormat are legacy APIs for SourceFunction and
> > SinkFunction. So when the SourceFunction and SinkFunction are deprecated,
> > the InputFormat and OutputFormat should also be deprecated accordingly.
> As
> > of now, technically speaking, we have not deprecated these two APIs. So,
> > making them public for now is just to fix the stability annotation
> because
> > they are already used publicly by the users.
> >
> > 3. "Remove the PublicEvolving annotation for the following deprecated
> >> classes. It does not make sense for an API to be PublicEvolving and
> >> Deprecated at the same time" - this is very common in the Flink code
> base
> >> to have PublicEvolving and Deprecated at the same time. APIs that do not
> >> survive the PublicEvolving phase will be marked as deprecated in
> addition.
> >> Removing PublicEvolving in this case will break Flink API graduation
> rule.
> >
> > Both PublicEvolving and Deprecated are status in the API lifecycle, they
> > are by definition mutually exclusive. When an API is marked as
> deprecated,
> > either the functionality is completely going away, or another API is
> > replacing the deprecated one. In either case, it does not make sense to
> > evolve that API any more. Even though Flink has some APIs marked with
> both
> > PublicEvolving and Deprecated at the same time, that does not make sense
> > and needs to be fixed. If a PublicEvolving API is deprecated, it should
> > only be marked as Deprecated, just like a Public API. I am not sure how
> > this would violate the API graduation rule, can you explain?
> >
> > By the way, there is another orthogonal abuse of the Deprecated
> annotation
> > in the Flink code base. For private methods, we should not mark them as
> > deprecated and leave the existing code base using it, while introducing a
> > new method. This is a bad practice adding to technical debts. Instead, a
> > proper refactor should be done immediately in the same patch to just
> remove
> > that private method and migrate all the usage to the new method.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Fri, Sep 1, 2023 at 12:00 AM Jing Ge <j...@ververica.com.invalid>
> wrote:
> >
> >> Hi Becket,
> >>
> >> It is a very useful proposal, thanks for driving it. +1. I'd like to ask
> >> some questions to make sure I understand your thoughts correctly:
> >>
> >> 1. "For the batch cases, currently the BulkFormat for DataStream is
> >> missing" - true, and there is another option to leverage
> >> StreamFormatAdapter[1]
> >> 2. "The following two interfaces should probably be marked as Public for
> >> now and Deprecated once we deprecate the InputFormat / OutputFormat" -
> >> would you like to share some background info of the deprecation of the
> >> InputFormat / OutputFormat? It is for me a little bit weird to mark
> APIs as
> >> public that are now known to be deprecated.
> >> 3. "Remove the PublicEvolving annotation for the following deprecated
> >> classes. It does not make sense for an API to be PublicEvolving and
> >> Deprecated at the same time" - this is very common in the Flink code
> base
> >> to have PublicEvolving and Deprecated at the same time. APIs that do not
> >> survive the PublicEvolving phase will be marked as deprecated in
> addition.
> >> Removing PublicEvolving in this case will break Flink API graduation
> rule.
> >>
> >> Best regards,
> >> Jing
> >>
> >>
> >>
> >> [1]
> >>
> >>
> https://github.com/apache/flink/blob/1d1247d4ae6d4313f7d952c4b2d66351314c9432/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapter.java#L61
> >>
> >> On Thu, Aug 31, 2023 at 4:16 PM Becket Qin <becket....@gmail.com>
> wrote:
> >>
> >>> Hi Ryan, thanks for the reply.
> >>>
> >>> Verifying the component with the schemas you have would be super
> helpful.
> >>>
> >>> I think enum is actually a type that is generally useful. Although it
> is
> >>> not a part of ANSI SQL, MySQL and some other databases have this type.
> >>> BTW, ENUM_STRING proposed in this FLIP is actually not a type by
> itself.
> >>> The ENUM_STRING is just a syntax sugar which actually creates a "new
> >>> AtomicDataType(new VarCharType(Integer.MAX_VALUE), ENUM_CLASS)".  So,
> we
> >>> are not really introducing a new type here. However, in order to make
> the
> >>> VARCHAR to ENUM conversion work, the ENUM class has to be considered
> as a
> >>> ConversionClass of the VARCHAR type, and a StringToEnum converter is
> >>> required.
> >>>
> >>> And yes, AvroSchemaUtils should be annotated as @PublicEvolving.
> >>>
> >>> Thanks,
> >>>
> >>> Jiangjie (Becket) Qin
> >>>
> >>>
> >>>
> >>> On Thu, Aug 31, 2023 at 5:22 PM Ryan Skraba
> <ryan.skr...@aiven.io.invalid
> >>>
> >>> wrote:
> >>>
> >>>> Hey -- I have a certain knowledge of Avro, and I'd be willing to help
> >>>> out with some of these enhancements, writing tests and reviewing.  I
> >>>> have a *lot* of Avro schemas available for validation!
> >>>>
> >>>> The FLIP looks pretty good and covers the possible cases pretty
> >>>> rigorously. I wasn't aware of some of the gaps you've pointed out
> >>>> here!
> >>>>
> >>>> How useful do you think the new ENUM_STRING DataType would be outside
> >>>> of the Avro use case?  It seems like a good enough addition that would
> >>>> solve the problem here.
> >>>>
> >>>> A small note: I assume the AvroSchemaUtils is meant to be annotated
> >>>> @PublicEvolving as well.
> >>>>
> >>>> All my best, Ryan
> >>>>
> >>>>
> >>>> On Tue, Aug 29, 2023 at 4:35 AM Becket Qin <becket....@gmail.com>
> >> wrote:
> >>>>>
> >>>>> Hi folks,
> >>>>>
> >>>>> I would like to start the discussion about FLIP-158[1] which proposes
> >>> to
> >>>>> clean up and enhance the Avro support in Flink. More specifically, it
> >>>>> proposes to:
> >>>>>
> >>>>> 1. Make it clear what are the public APIs in flink-avro components.
> >>>>> 2. Fix a few buggy cases in flink-avro
> >>>>> 3. Add more supported Avro use cases out of the box.
> >>>>>
> >>>>> Feedbacks are welcome!
> >>>>>
> >>>>> Thanks
> >>>>>
> >>>>> Jiangjie (Becket) Qin
> >>>>>
> >>>>> [1]
> >>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-358%3A+flink-avro+enhancement+and+cleanup
> >>>>
> >>>
> >>
>
>

Reply via email to