Hi, Илья.
Thanks for your opinions!

Your are right, and in fact, in addition to the different fields numbers,
the names may also be different.
Currently, we can also support inconsistent schema, which was discussed in
the previous design,
for example, we can provide a `schema.fields.mappings` parameter.

If we have different schema like below:
true batch fields is: a, f1, c, f3
true streaming fields is: f0, b, f2 (lack of 1 field)


1.about inconsistent field names

If user ddl is: f0, f1, f2
`schema.fields.mappings`='[{"f0":"a", f2":"C"},{"f1":"b"}]'

then in hybrid table source, we generate child batch schema is: a, f1, c,
streaming schema is: f0, b, f2 and pass them to final child table source.
(note: we not use batch f3 field, just skip is ok)

2.about inconsistent field numbers

If user ddl is: f0, f1, f2, f3
`schema.fields.mappings`='[{"f0":"a", f2":"C"},{"f1":"b"}]'

then in hybrid table source, we generate child batch schema is: a, f1, c,
f3,
streaming schema has 2 options to set:

1. set f0, b, f2, f3 and pass them to final child table source. (if child
source format is k-v mode, f3 will be null)

2. add an option, e.g.`schema.virtual-fields`='[[],["f3"]]' means
streaming's field f3 is not existed.
then hybrid table source set null for streaming's field f3 actively and
just pass f0, b, f2 to child source to call real data.

In a word, we can use `schema.fields.mappings` to deal with inconsistent
filed name
and pass more fields to child source to get null to deal with inconsistent
field numbers(or add a `schema.virtual-fields` option).

But in order to maintain consistency with the current DataStream api,
we currently support the case where the batch and streaming schemas are
consistent.
I will update the POC pr then you can re-run your case. WDYT?


Best Regards,
Ran Tao



Илья Соин <ilya.soin...@gmail.com> 于2023年5月11日周四 03:12写道:

> Hi devs,
>
> I think for this approach to work, the internal record schema generated by
> Flink must be exactly the same for batch and stream records, because at
> runtime Flink will use the same serializer to send them downstream.
> However, it’s not always the case, because in batch mode Flink’s optimizer
> may realize that some fields are never actually used, so the records will
> not contain those fields. Such optimizations may not be done in the
> streaming mode, so records coming from the realtime source will have more
> fields. In that case, after switching to the realtime source, the job will
> fail, because record serializer expects records with the batch schema, but
> instead receives records with more fields and doesn’t know how to serialize
> them.
>
> Consider the following DDL:
> CREATE TABLE hybrid_table
> (
>     trade ROW(
>         `openTime` BIGINT,
>         `closeTime` BIGINT),
>     server  STRING,
>     tradeTime as to_timestamp(from_unixtime(trade.openTime)),
>     WATERMARK FOR tradeTime AS tradeTime - INTERVAL '1' MINUTE
> )
>     WITH (
>         'connector' = 'hybrid',
>         'source-identifiers' = 'historical,realtime',
>         'historical.connector' = 'filesystem',
>         'historical.path' = 's3://path.to.daa',
>         'historical.format' = 'json',
>         'realtime.connector' = 'kafka',
>         'realtime.topic' = 'trades',
>         'realtime.properties.bootstrap.servers' = '...',
>         'realtime.properties.group.id <
> http://realtime.properties.group.id/>' = 'flink.tv <http://flink.tv/>',
>         'realtime.format' = 'json',
>         'realtime.scan.startup.mode' = 'earliest-offset'
>         )
> This query will fail:
>
> select server from hybrid_table
>
> But this query will work:
>
> select * from hybrid_table
>
> In the first query internal records in the batch source will only have 2
> fields: server and trade. But in the streaming source they will have all
> the fields described in the schema. When switching to the realtime source,
> the job fails because record serializer expects records with the same
> schema as in the batch source. The IllegalArgumentException happens here <
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java#L74>,
> saying “Row arity: 4, but serializer arity: 2"
>
> The second example works because all fields are accessed in the SQL query,
> so Flink doesn’t remove any of them from the internal records in batch, and
> record schemas in batch and streaming match exactly.
>
> __
> Best regards,
> Ilya Soin
>
> On 2023/05/09 07:09:53 Ran Tao wrote:
> > Hi, devs.
> >
> > I don't know if you have any other considerations for this FLIP. All
> > discussions are welcome.
> > If there are no other opinions in the near days, I will try to initiate a
> > vote. thank you all.
> >
> >
> > Best Regards,
> > Ran Tao
> >
> >
> > Ran Tao <ch...@gmail.com> 于2023年4月10日周一 15:33写道:
> >
> > > Hi, devs. I want to reopen this discussion because some questions have
> > > been solved or need more discussions.
> > >
> > > In the previous discussion, there were some questions and problems.
> > >
> > > @Timo
> > > 1.about option prefix, we decide to use identifiers. e.g.
> > >
> > > ```
> > > create table hybrid_source(
> > >  f0 varchar,
> > >  f1 varchar,
> > >  f2 bigint
> > > ) with(
> > >  'connector'='hybrid',
> > >  'source-identifiers'='historical,realtime',
> > >  'historical.connector'='filesystem'
> > >  'historical.path' = '/tmp/a.csv',
> > >  'historical.format' = 'csv',
> > >  'realtime.connector'='kafka',
> > >  'realtime.topic' = 'test',
> > >  'realtime.properties.bootstrap.servers' = 'localhost:9092',
> > >  'realtime.properties.group.id' = 'test',
> > >  'realtime.scan.startup.mode' = 'earliest-offset',
> > >  'realtime.format' = 'csv'
> > > );
> > > ```
> > >
> > > @Martijn Visser <ma...@apache.org>
> > > 1.table api usage
> > >
> > > I updated the FLIP about table api usage.
> > >
> > > 2.how dynamic switched start timestamp works?
> > >
> > > In this FLIP, we introduce 2 interfaces to support it.
> > > If we open switched-start-position-enabled try to use dynamic switched
> > > start timestamp, then first source split numerator needs to
> > > implement SupportsGetEndTimestamp, next source needs to
> > > implement SupportsSwitchedStartTimestamp.
> > > We use SupportsGetEndTimestamp and SupportsSwitchedStartTimestamp to
> get
> > > the previous bounded source end timestamp and apply it to the next
> > > streaming source.
> > >
> > > @John Roesler
> > > 1.source handoff
> > >
> > > We both support Fixed-Start-Position And Switched-start-Position. The
> > > default is Fixed-Start-Position. Use option
> switched-start-position-enabled
> > > to control it.
> > > In Fixed-Start-Position, the next streaming source uses its own startup
> > > strategy, e.g. in kafka, we use predefined kafka scan.startup.mode in
> user
> > > sql.
> > > In Switched-start-Position, this is the same question as `how dynamic
> > > switched start timestamp works` from @Martijn above. We offer
> > > SupportsGetEndTimestamp interface to extract first source split
> enumerator
> > > endTimestamp
> > > and pass it to the next source. and Next source uses
> > > SupportsSwitchedStartTimestamp to apply it.
> > >
> > > 2.more child sources
> > >
> > > Yes, this is consistent with the hybrid source datastream api, there
> is no
> > > limit on the number of children sources.
> > > e.g. this is a 3 source case below.
> > >
> > > ```
> > > create table hybrid_source(
> > >  f0 varchar,
> > >  f1 varchar,
> > >  f2 bigint
> > > ) with(
> > >  'connector'='hybrid',
> > >  'source-identifiers'='historical01,historical02,realtime',
> > >  'historical01.connector'='filesystem'
> > >  'historical01.path' = '/tmp/a.csv',
> > >  'historical01.format' = 'csv',
> > >  'historical02.connector'='filesystem'
> > >  'historical02.path' = '/tmp/a.csv',
> > >  'historical02.format' = 'csv',
> > >  'realtime.connector'='kafka',
> > >  'realtime.topic' = 'test',
> > >  'realtime.properties.bootstrap.servers' = 'localhost:9092',
> > >  'realtime.properties.group.id' = 'testGroup',
> > >  'realtime.scan.startup.mode' = 'earliest-offset',
> > >  'realtime.format' = 'csv'
> > > );
> > > ```
> > >
> > > more details can be found at [1] & [2].
> > > Looking forward to your more concerns and opinions.
> > >
> > > 1.
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235836225
> > > 2.https://github.com/apache/flink/pull/21841
> > >
> > > Best Regards,
> > > Ran Tao
> > >
> > > Ran Tao <ch...@gmail.com> 于2022年12月15日周四 16:02写道:
> > >
> > >> Hi guys. HybridSource is a good feature, but now released version did
> not
> > >> support table & sql api for a long time.
> > >>
> > >> I have wrote a discussed FLIP.
> > >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235836225
> > >>
> > >> Sorry for my unclear subject of previous email,  so here i have copied
> > >> the respond from the Timo and sent this email.  look forward to your
> > >> comments.
> > >>
> > >> ```
> > >> Hi Ran,
> > >>
> > >> Thanks for proposing a FLIP. Btw according to the process, the subject
> > >> of this email should be `[DISCUSS] FLIP-278: Hybrid Source Connector`
> so
> > >> that people can identify this discussion as a FLIP discussion.
> > >>
> > >> Supporting the hybrid source for SQL was a long-standing issue on our
> > >> roadmap. Happy to give feedback here:
> > >>
> > >> 1) Options
> > >>
> > >> Coming up with stable long-term options should be a shared effort.
> > >> Having an index as a key could cause unintended side effects if the
> > >> index is not correctly chosen, I would suggest we use IDs instead.
> > >>
> > >> What do you think about the following structure?
> > >>
> > >> CREATE TABLE ... WITH (
> > >>    'sources'='historical;realtime',   -- Config option of type string
> list
> > >>    'historical.connector' = 'filesystem',
> > >>    'historical.path' = '/tmp/a.csv',
> > >>    'historcal.format' = 'csv',
> > >>    'realtime.path' = '/tmp/b.csv',
> > >>    'realtime.format' = 'csv'"
> > >> )
> > >>
> > >> I would limit the IDs to simple [a-z0-9_] identifiers. Once we support
> > >> metadata columns, we can also propagate these IDs easily.
> > >>
> > >> 2) Schema field mappings
> > >>
> > >> The FLIP mentions `schema-field-mappings` could you elaborate on this
> in
> > >> the document?
> > >>
> > >> 3) Start position strategies
> > >>
> > >> Have you thought about how we can represent start position strategies.
> > >> The FLIP is very minimal but it would be nice to at least hear some
> > >> opinions on this topic. Maybe we can come up with some general
> strategy
> > >> that makes the most common use case possible in the near future.
> > >>
> > >> Thanks,
> > >> Timo
> > >> ```
> > >>
> > >> --
> > >> Best Regards,
> > >> Ran Tao
> > >> https://github.com/chucheng92
> > >>
> > >
> >
>
>

Reply via email to