Another name is "connector=upsert-kafka', I think this can solve Timo's
concern on the "compacted" word.

Materialize also uses "ENVELOPE UPSERT" [1] keyword to identify such kafka
sources.
I think "upsert" is a well-known terminology widely used in many systems
and matches the
 behavior of how we handle the kafka messages.

What do you think?

Best,
Jark

[1]:
https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic




On Thu, 22 Oct 2020 at 22:53, Kurt Young <ykt...@gmail.com> wrote:

> Good validation messages can't solve the broken user experience, especially
> that
> such update mode option will implicitly make half of current kafka options
> invalid or doesn't
> make sense.
>
> Best,
> Kurt
>
>
> On Thu, Oct 22, 2020 at 10:31 PM Jark Wu <imj...@gmail.com> wrote:
>
> > Hi Timo, Seth,
> >
> > The default value "inserting" of "mode" might be not suitable,
> > because "debezium-json" emits changelog messages which include updates.
> >
> > On Thu, 22 Oct 2020 at 22:10, Seth Wiesman <s...@ververica.com> wrote:
> >
> > > +1 for supporting upsert results into Kafka.
> > >
> > > I have no comments on the implementation details.
> > >
> > > As far as configuration goes, I tend to favor Timo's option where we
> add
> > a
> > > "mode" property to the existing Kafka table with default value
> > "inserting".
> > > If the mode is set to "updating" then the validation changes to the new
> > > requirements. I personally find it more intuitive than a seperate
> > > connector, my fear is users won't understand its the same physical
> kafka
> > > sink under the hood and it will lead to other confusion like does it
> > offer
> > > the same persistence guarantees? I think we are capable of adding good
> > > valdiation messaging that solves Jark and Kurts concerns.
> > >
> > >
> > > On Thu, Oct 22, 2020 at 8:51 AM Timo Walther <twal...@apache.org>
> wrote:
> > >
> > > > Hi Jark,
> > > >
> > > > "calling it "kafka-compacted" can even remind users to enable log
> > > > compaction"
> > > >
> > > > But sometimes users like to store a lineage of changes in their
> topics.
> > > > Indepent of any ktable/kstream interpretation.
> > > >
> > > > I let the majority decide on this topic to not further block this
> > > > effort. But we might find a better name like:
> > > >
> > > > connector = kafka
> > > > mode = updating/inserting
> > > >
> > > > OR
> > > >
> > > > connector = kafka-updating
> > > >
> > > > ...
> > > >
> > > > Regards,
> > > > Timo
> > > >
> > > >
> > > >
> > > >
> > > > On 22.10.20 15:24, Jark Wu wrote:
> > > > > Hi Timo,
> > > > >
> > > > > Thanks for your opinions.
> > > > >
> > > > > 1) Implementation
> > > > > We will have an stateful operator to generate INSERT and
> > UPDATE_BEFORE.
> > > > > This operator is keyby-ed (primary key as the shuffle key) after
> the
> > > > source
> > > > > operator.
> > > > > The implementation of this operator is very similar to the existing
> > > > > `DeduplicateKeepLastRowFunction`.
> > > > > The operator will register a value state using the primary key
> fields
> > > as
> > > > > keys.
> > > > > When the value state is empty under current key, we will emit
> INSERT
> > > for
> > > > > the input row.
> > > > > When the value state is not empty under current key, we will emit
> > > > > UPDATE_BEFORE using the row in state,
> > > > > and emit UPDATE_AFTER using the input row.
> > > > > When the input row is DELETE, we will clear state and emit DELETE
> > row.
> > > > >
> > > > > 2) new option vs new connector
> > > > >> We recently simplified the table options to a minimum amount of
> > > > > characters to be as concise as possible in the DDL.
> > > > > I think this is the reason why we want to introduce a new
> connector,
> > > > > because we can simplify the options in DDL.
> > > > > For example, if using a new option, the DDL may look like this:
> > > > >
> > > > > CREATE TABLE users (
> > > > >    user_id BIGINT,
> > > > >    user_name STRING,
> > > > >    user_level STRING,
> > > > >    region STRING,
> > > > >    PRIMARY KEY (user_id) NOT ENFORCED
> > > > > ) WITH (
> > > > >    'connector' = 'kafka',
> > > > >    'model' = 'table',
> > > > >    'topic' = 'pageviews_per_region',
> > > > >    'properties.bootstrap.servers' = '...',
> > > > >    'properties.group.id' = 'testGroup',
> > > > >    'scan.startup.mode' = 'earliest',
> > > > >    'key.format' = 'csv',
> > > > >    'key.fields' = 'user_id',
> > > > >    'value.format' = 'avro',
> > > > >    'sink.partitioner' = 'hash'
> > > > > );
> > > > >
> > > > > If using a new connector, we can have a different default value for
> > the
> > > > > options and remove unnecessary options,
> > > > > the DDL can look like this which is much more concise:
> > > > >
> > > > > CREATE TABLE pageviews_per_region (
> > > > >    user_id BIGINT,
> > > > >    user_name STRING,
> > > > >    user_level STRING,
> > > > >    region STRING,
> > > > >    PRIMARY KEY (user_id) NOT ENFORCED
> > > > > ) WITH (
> > > > >    'connector' = 'kafka-compacted',
> > > > >    'topic' = 'pageviews_per_region',
> > > > >    'properties.bootstrap.servers' = '...',
> > > > >    'key.format' = 'csv',
> > > > >    'value.format' = 'avro'
> > > > > );
> > > > >
> > > > >> When people read `connector=kafka-compacted` they might not know
> > that
> > > it
> > > > >> has ktable semantics. You don't need to enable log compaction in
> > order
> > > > >> to use a KTable as far as I know.
> > > > > We don't need to let users know it has ktable semantics, as
> > Konstantin
> > > > > mentioned this may carry more implicit
> > > > > meaning than we want to imply here. I agree users don't need to
> > enable
> > > > log
> > > > > compaction, but from the production perspective,
> > > > > log compaction should always be enabled if it is used in this
> > purpose.
> > > > > Calling it "kafka-compacted" can even remind users to enable log
> > > > compaction.
> > > > >
> > > > > I don't agree to introduce "model = table/stream" option, or
> > > > > "connector=kafka-table",
> > > > > because this means we are introducing Table vs Stream concept from
> > > KSQL.
> > > > > However, we don't have such top-level concept in Flink SQL now,
> this
> > > will
> > > > > further confuse users.
> > > > > In Flink SQL, all the things are STREAM, the differences are
> whether
> > it
> > > > is
> > > > > bounded or unbounded,
> > > > >   whether it is insert-only or changelog.
> > > > >
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> > > > >
> > > > > On Thu, 22 Oct 2020 at 20:39, Timo Walther <twal...@apache.org>
> > wrote:
> > > > >
> > > > >> Hi Shengkai, Hi Jark,
> > > > >>
> > > > >> thanks for this great proposal. It is time to finally connect the
> > > > >> changelog processor with a compacted Kafka topic.
> > > > >>
> > > > >> "The operator will produce INSERT rows, or additionally generate
> > > > >> UPDATE_BEFORE rows for the previous image, or produce DELETE rows
> > with
> > > > >> all columns filled with values."
> > > > >>
> > > > >> Could you elaborate a bit on the implementation details in the
> FLIP?
> > > How
> > > > >> are UPDATE_BEFOREs are generated. How much state is required to
> > > perform
> > > > >> this operation.
> > > > >>
> > > > >>   From a conceptual and semantical point of view, I'm fine with
> the
> > > > >> proposal. But I would like to share my opinion about how we expose
> > > this
> > > > >> feature:
> > > > >>
> > > > >> ktable vs kafka-compacted
> > > > >>
> > > > >> I'm against having an additional connector like `ktable` or
> > > > >> `kafka-compacted`. We recently simplified the table options to a
> > > minimum
> > > > >> amount of characters to be as concise as possible in the DDL.
> > > Therefore,
> > > > >> I would keep the `connector=kafka` and introduce an additional
> > option.
> > > > >> Because a user wants to read "from Kafka". And the "how" should be
> > > > >> determined in the lower options.
> > > > >>
> > > > >> When people read `connector=ktable` they might not know that this
> is
> > > > >> Kafka. Or they wonder where `kstream` is?
> > > > >>
> > > > >> When people read `connector=kafka-compacted` they might not know
> > that
> > > it
> > > > >> has ktable semantics. You don't need to enable log compaction in
> > order
> > > > >> to use a KTable as far as I know. Log compaction and table
> semantics
> > > are
> > > > >> orthogonal topics.
> > > > >>
> > > > >> In the end we will need 3 types of information when declaring a
> > Kafka
> > > > >> connector:
> > > > >>
> > > > >> CREATE TABLE ... WITH (
> > > > >>     connector=kafka        -- Some information about the connector
> > > > >>     end-offset = XXXX      -- Some information about the
> boundedness
> > > > >>     model = table/stream   -- Some information about
> interpretation
> > > > >> )
> > > > >>
> > > > >>
> > > > >> We can still apply all the constraints mentioned in the FLIP. When
> > > > >> `model` is set to `table`.
> > > > >>
> > > > >> What do you think?
> > > > >>
> > > > >> Regards,
> > > > >> Timo
> > > > >>
> > > > >>
> > > > >> On 21.10.20 14:19, Jark Wu wrote:
> > > > >>> Hi,
> > > > >>>
> > > > >>> IMO, if we are going to mix them in one connector,
> > > > >>> 1) either users need to set some options to a specific value
> > > > explicitly,
> > > > >>> e.g. "scan.startup.mode=earliest", "sink.partitioner=hash", etc..
> > > > >>> This makes the connector awkward to use. Users may face to fix
> > > options
> > > > >> one
> > > > >>> by one according to the exception.
> > > > >>> Besides, in the future, it is still possible to use
> > > > >>> "sink.partitioner=fixed" (reduce network cost) if users are aware
> > of
> > > > >>> the partition routing,
> > > > >>> however, it's error-prone to have "fixed" as default for
> compacted
> > > > mode.
> > > > >>>
> > > > >>> 2) or make those options a different default value when
> > > > "compacted=true".
> > > > >>> This would be more confusing and unpredictable if the default
> value
> > > of
> > > > >>> options will change according to other options.
> > > > >>> What happens if we have a third mode in the future?
> > > > >>>
> > > > >>> In terms of usage and options, it's very different from the
> > > > >>> original "kafka" connector.
> > > > >>> It would be more handy to use and less fallible if separating
> them
> > > into
> > > > >> two
> > > > >>> connectors.
> > > > >>> In the implementation layer, we can reuse code as much as
> possible.
> > > > >>>
> > > > >>> Therefore, I'm still +1 to have a new connector.
> > > > >>> The "kafka-compacted" name sounds good to me.
> > > > >>>
> > > > >>> Best,
> > > > >>> Jark
> > > > >>>
> > > > >>>
> > > > >>> On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf <
> kna...@apache.org>
> > > > >> wrote:
> > > > >>>
> > > > >>>> Hi Kurt, Hi Shengkai,
> > > > >>>>
> > > > >>>> thanks for answering my questions and the additional
> > > clarifications. I
> > > > >>>> don't have a strong opinion on whether to extend the "kafka"
> > > connector
> > > > >> or
> > > > >>>> to introduce a new connector. So, from my perspective feel free
> to
> > > go
> > > > >> with
> > > > >>>> a separate connector. If we do introduce a new connector I
> > wouldn't
> > > > >> call it
> > > > >>>> "ktable" for aforementioned reasons (In addition, we might
> suggest
> > > > that
> > > > >>>> there is also a "kstreams" connector for symmetry reasons). I
> > don't
> > > > >> have a
> > > > >>>> good alternative name, though, maybe "kafka-compacted" or
> > > > >>>> "compacted-kafka".
> > > > >>>>
> > > > >>>> Thanks,
> > > > >>>>
> > > > >>>> Konstantin
> > > > >>>>
> > > > >>>>
> > > > >>>> On Wed, Oct 21, 2020 at 4:43 AM Kurt Young <ykt...@gmail.com>
> > > wrote:
> > > > >>>>
> > > > >>>>> Hi all,
> > > > >>>>>
> > > > >>>>> I want to describe the discussion process which drove us to
> have
> > > such
> > > > >>>>> conclusion, this might make some of
> > > > >>>>> the design choices easier to understand and keep everyone on
> the
> > > same
> > > > >>>> page.
> > > > >>>>>
> > > > >>>>> Back to the motivation, what functionality do we want to
> provide
> > in
> > > > the
> > > > >>>>> first place? We got a lot of feedback and
> > > > >>>>> questions from mailing lists that people want to write
> > > > Not-Insert-Only
> > > > >>>>> messages into kafka. They might be
> > > > >>>>> intentional or by accident, e.g. wrote an non-windowed
> aggregate
> > > > query
> > > > >> or
> > > > >>>>> non-windowed left outer join. And
> > > > >>>>> some users from KSQL world also asked about why Flink didn't
> > > leverage
> > > > >> the
> > > > >>>>> Key concept of every kafka topic
> > > > >>>>> and make kafka as a dynamic changing keyed table.
> > > > >>>>>
> > > > >>>>> To work with kafka better, we were thinking to extend the
> > > > functionality
> > > > >>>> of
> > > > >>>>> the current kafka connector by letting it
> > > > >>>>> accept updates and deletions. But due to the limitation of
> kafka,
> > > the
> > > > >>>>> update has to be "update by key", aka a table
> > > > >>>>> with primary key.
> > > > >>>>>
> > > > >>>>> This introduces a couple of conflicts with current kafka
> table's
> > > > >> options:
> > > > >>>>> 1. key.fields: as said above, we need the kafka table to have
> the
> > > > >> primary
> > > > >>>>> key constraint. And users can also configure
> > > > >>>>> key.fields freely, this might cause friction. (Sure we can do
> > some
> > > > >> sanity
> > > > >>>>> check on this but it also creates friction.)
> > > > >>>>> 2. sink.partitioner: to make the semantics right, we need to
> make
> > > > sure
> > > > >>>> all
> > > > >>>>> the updates on the same key are written to
> > > > >>>>> the same kafka partition, such we should force to use a hash by
> > key
> > > > >>>>> partition inside such table. Again, this has conflicts
> > > > >>>>> and creates friction with current user options.
> > > > >>>>>
> > > > >>>>> The above things are solvable, though not perfect or most user
> > > > >> friendly.
> > > > >>>>>
> > > > >>>>> Let's take a look at the reading side. The keyed kafka table
> > > contains
> > > > >> two
> > > > >>>>> kinds of messages: upsert or deletion. What upsert
> > > > >>>>> means is "If the key doesn't exist yet, it's an insert record.
> > > > >> Otherwise
> > > > >>>>> it's an update record". For the sake of correctness or
> > > > >>>>> simplicity, the Flink SQL engine also needs such information.
> If
> > we
> > > > >>>>> interpret all messages to "update record", some queries or
> > > > >>>>> operators may not work properly. It's weird to see an update
> > record
> > > > but
> > > > >>>> you
> > > > >>>>> haven't seen the insert record before.
> > > > >>>>>
> > > > >>>>> So what Flink should do is after reading out the records from
> > such
> > > > >> table,
> > > > >>>>> it needs to create a state to record which messages have
> > > > >>>>> been seen and then generate the correct row type
> correspondingly.
> > > > This
> > > > >>>> kind
> > > > >>>>> of couples the state and the data of the message
> > > > >>>>> queue, and it also creates conflicts with current kafka
> > connector.
> > > > >>>>>
> > > > >>>>> Think about if users suspend a running job (which contains some
> > > > reading
> > > > >>>>> state now), and then change the start offset of the reader.
> > > > >>>>> By changing the reading offset, it actually change the whole
> > story
> > > of
> > > > >>>>> "which records should be insert messages and which records
> > > > >>>>> should be update messages). And it will also make Flink to deal
> > > with
> > > > >>>>> another weird situation that it might receive a deletion
> > > > >>>>> on a non existing message.
> > > > >>>>>
> > > > >>>>> We were unsatisfied with all the frictions and conflicts it
> will
> > > > create
> > > > >>>> if
> > > > >>>>> we enable the "upsert & deletion" support to the current kafka
> > > > >>>>> connector. And later we begin to realize that we shouldn't
> treat
> > it
> > > > as
> > > > >> a
> > > > >>>>> normal message queue, but should treat it as a changing keyed
> > > > >>>>> table. We should be able to always get the whole data of such
> > table
> > > > (by
> > > > >>>>> disabling the start offset option) and we can also read the
> > > > >>>>> changelog out of such table. It's like a HBase table with
> binlog
> > > > >> support
> > > > >>>>> but doesn't have random access capability (which can be
> fulfilled
> > > > >>>>> by Flink's state).
> > > > >>>>>
> > > > >>>>> So our intention was instead of telling and persuading users
> what
> > > > kind
> > > > >> of
> > > > >>>>> options they should or should not use by extending
> > > > >>>>> current kafka connector when enable upsert support, we are
> > actually
> > > > >>>> create
> > > > >>>>> a whole new and different connector that has total
> > > > >>>>> different abstractions in SQL layer, and should be treated
> > totally
> > > > >>>>> different with current kafka connector.
> > > > >>>>>
> > > > >>>>> Hope this can clarify some of the concerns.
> > > > >>>>>
> > > > >>>>> Best,
> > > > >>>>> Kurt
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang <
> fskm...@gmail.com
> > >
> > > > >> wrote:
> > > > >>>>>
> > > > >>>>>> Hi devs,
> > > > >>>>>>
> > > > >>>>>> As many people are still confused about the difference option
> > > > >>>> behaviours
> > > > >>>>>> between the Kafka connector and KTable connector, Jark and I
> > list
> > > > the
> > > > >>>>>> differences in the doc[1].
> > > > >>>>>>
> > > > >>>>>> Best,
> > > > >>>>>> Shengkai
> > > > >>>>>>
> > > > >>>>>> [1]
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>
> > > >
> > >
> >
> https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit
> > > > >>>>>>
> > > > >>>>>> Shengkai Fang <fskm...@gmail.com> 于2020年10月20日周二 下午12:05写道:
> > > > >>>>>>
> > > > >>>>>>> Hi Konstantin,
> > > > >>>>>>>
> > > > >>>>>>> Thanks for your reply.
> > > > >>>>>>>
> > > > >>>>>>>> It uses the "kafka" connector and does not specify a primary
> > > key.
> > > > >>>>>>> The dimensional table `users` is a ktable connector and we
> can
> > > > >>>> specify
> > > > >>>>>> the
> > > > >>>>>>> pk on the KTable.
> > > > >>>>>>>
> > > > >>>>>>>> Will it possible to use a "ktable" as a dimensional table in
> > > > >>>> FLIP-132
> > > > >>>>>>> Yes. We can specify the watermark on the KTable and it can be
> > > used
> > > > >>>> as a
> > > > >>>>>>> dimension table in temporal join.
> > > > >>>>>>>
> > > > >>>>>>>> Introduce a new connector vs introduce a new property
> > > > >>>>>>> The main reason behind is that the KTable connector almost
> has
> > no
> > > > >>>>> common
> > > > >>>>>>> options with the Kafka connector. The options that can be
> > reused
> > > by
> > > > >>>>>> KTable
> > > > >>>>>>> connectors are 'topic', 'properties.bootstrap.servers' and
> > > > >>>>>>> 'value.fields-include' . We can't set cdc format for
> > 'key.format'
> > > > and
> > > > >>>>>>> 'value.format' in KTable connector now, which is  available
> in
> > > > Kafka
> > > > >>>>>>> connector. Considering the difference between the options we
> > can
> > > > use,
> > > > >>>>>> it's
> > > > >>>>>>> more suitable to introduce an another connector rather than a
> > > > >>>> property.
> > > > >>>>>>>
> > > > >>>>>>> We are also fine to use "compacted-kafka" as the name of the
> > new
> > > > >>>>>>> connector. What do you think?
> > > > >>>>>>>
> > > > >>>>>>> Best,
> > > > >>>>>>> Shengkai
> > > > >>>>>>>
> > > > >>>>>>> Konstantin Knauf <kna...@apache.org> 于2020年10月19日周一
> 下午10:15写道:
> > > > >>>>>>>
> > > > >>>>>>>> Hi Shengkai,
> > > > >>>>>>>>
> > > > >>>>>>>> Thank you for driving this effort. I believe this a very
> > > important
> > > > >>>>>> feature
> > > > >>>>>>>> for many users who use Kafka and Flink SQL together. A few
> > > > questions
> > > > >>>>> and
> > > > >>>>>>>> thoughts:
> > > > >>>>>>>>
> > > > >>>>>>>> * Is your example "Use KTable as a reference/dimension
> table"
> > > > >>>> correct?
> > > > >>>>>> It
> > > > >>>>>>>> uses the "kafka" connector and does not specify a primary
> key.
> > > > >>>>>>>>
> > > > >>>>>>>> * Will it be possible to use a "ktable" table directly as a
> > > > >>>>> dimensional
> > > > >>>>>>>> table in temporal join (*based on event time*) (FLIP-132)?
> > This
> > > is
> > > > >>>> not
> > > > >>>>>>>> completely clear to me from the FLIP.
> > > > >>>>>>>>
> > > > >>>>>>>> * I'd personally prefer not to introduce a new connector and
> > > > instead
> > > > >>>>> to
> > > > >>>>>>>> extend the Kafka connector. We could add an additional
> > property
> > > > >>>>>>>> "compacted"
> > > > >>>>>>>> = "true"|"false". If it is set to "true", we can add
> > additional
> > > > >>>>>> validation
> > > > >>>>>>>> logic (e.g. "scan.startup.mode" can not be set, primary key
> > > > >>>> required,
> > > > >>>>>>>> etc.). If we stick to a separate connector I'd not call it
> > > > "ktable",
> > > > >>>>> but
> > > > >>>>>>>> rather "compacted-kafka" or similar. KTable seems to carry
> > more
> > > > >>>>> implicit
> > > > >>>>>>>> meaning than we want to imply here.
> > > > >>>>>>>>
> > > > >>>>>>>> * I agree that this is not a bounded source. If we want to
> > > > support a
> > > > >>>>>>>> bounded mode, this is an orthogonal concern that also
> applies
> > to
> > > > >>>> other
> > > > >>>>>>>> unbounded sources.
> > > > >>>>>>>>
> > > > >>>>>>>> Best,
> > > > >>>>>>>>
> > > > >>>>>>>> Konstantin
> > > > >>>>>>>>
> > > > >>>>>>>> On Mon, Oct 19, 2020 at 3:26 PM Jark Wu <imj...@gmail.com>
> > > wrote:
> > > > >>>>>>>>
> > > > >>>>>>>>> Hi Danny,
> > > > >>>>>>>>>
> > > > >>>>>>>>> First of all, we didn't introduce any concepts from KSQL
> > (e.g.
> > > > >>>>> Stream
> > > > >>>>>> vs
> > > > >>>>>>>>> Table notion).
> > > > >>>>>>>>> This new connector will produce a changelog stream, so it's
> > > still
> > > > >>>> a
> > > > >>>>>>>> dynamic
> > > > >>>>>>>>> table and doesn't conflict with Flink core concepts.
> > > > >>>>>>>>>
> > > > >>>>>>>>> The "ktable" is just a connector name, we can also call it
> > > > >>>>>>>>> "compacted-kafka" or something else.
> > > > >>>>>>>>> Calling it "ktable" is just because KSQL users can migrate
> to
> > > > >>>> Flink
> > > > >>>>>> SQL
> > > > >>>>>>>>> easily.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Regarding to why introducing a new connector vs a new
> > property
> > > in
> > > > >>>>>>>> existing
> > > > >>>>>>>>> kafka connector:
> > > > >>>>>>>>>
> > > > >>>>>>>>> I think the main reason is that we want to have a clear
> > > > separation
> > > > >>>>> for
> > > > >>>>>>>> such
> > > > >>>>>>>>> two use cases, because they are very different.
> > > > >>>>>>>>> We also listed reasons in the FLIP, including:
> > > > >>>>>>>>>
> > > > >>>>>>>>> 1) It's hard to explain what's the behavior when users
> > specify
> > > > the
> > > > >>>>>> start
> > > > >>>>>>>>> offset from a middle position (e.g. how to process non
> exist
> > > > >>>> delete
> > > > >>>>>>>>> events).
> > > > >>>>>>>>>       It's dangerous if users do that. So we don't provide
> > the
> > > > >>>> offset
> > > > >>>>>>>> option
> > > > >>>>>>>>> in the new connector at the moment.
> > > > >>>>>>>>> 2) It's a different perspective/abstraction on the same
> kafka
> > > > >>>> topic
> > > > >>>>>>>> (append
> > > > >>>>>>>>> vs. upsert). It would be easier to understand if we can
> > > separate
> > > > >>>>> them
> > > > >>>>>>>>>       instead of mixing them in one connector. The new
> > > connector
> > > > >>>>>> requires
> > > > >>>>>>>>> hash sink partitioner, primary key declared, regular
> format.
> > > > >>>>>>>>>       If we mix them in one connector, it might be
> confusing
> > > how
> > > > to
> > > > >>>>> use
> > > > >>>>>>>> the
> > > > >>>>>>>>> options correctly.
> > > > >>>>>>>>> 3) The semantic of the KTable connector is just the same as
> > > > KTable
> > > > >>>>> in
> > > > >>>>>>>> Kafka
> > > > >>>>>>>>> Stream. So it's very handy for Kafka Stream and KSQL users.
> > > > >>>>>>>>>       We have seen several questions in the mailing list
> > asking
> > > > how
> > > > >>>> to
> > > > >>>>>>>> model
> > > > >>>>>>>>> a KTable and how to join a KTable in Flink SQL.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Best,
> > > > >>>>>>>>> Jark
> > > > >>>>>>>>>
> > > > >>>>>>>>> On Mon, 19 Oct 2020 at 19:53, Jark Wu <imj...@gmail.com>
> > > wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>>> Hi Jingsong,
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> As the FLIP describes, "KTable connector produces a
> > changelog
> > > > >>>>>> stream,
> > > > >>>>>>>>>> where each data record represents an update or delete
> > event.".
> > > > >>>>>>>>>> Therefore, a ktable source is an unbounded stream source.
> > > > >>>>> Selecting
> > > > >>>>>> a
> > > > >>>>>>>>>> ktable source is similar to selecting a kafka source with
> > > > >>>>>>>> debezium-json
> > > > >>>>>>>>>> format
> > > > >>>>>>>>>> that it never ends and the results are continuously
> updated.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> It's possible to have a bounded ktable source in the
> future,
> > > for
> > > > >>>>>>>> example,
> > > > >>>>>>>>>> add an option 'bounded=true' or 'end-offset=xxx'.
> > > > >>>>>>>>>> In this way, the ktable will produce a bounded changelog
> > > stream.
> > > > >>>>>>>>>> So I think this can be a compatible feature in the future.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> I don't think we should associate with ksql related
> > concepts.
> > > > >>>>>>>> Actually,
> > > > >>>>>>>>> we
> > > > >>>>>>>>>> didn't introduce any concepts from KSQL (e.g. Stream vs
> > Table
> > > > >>>>>> notion).
> > > > >>>>>>>>>> The "ktable" is just a connector name, we can also call it
> > > > >>>>>>>>>> "compacted-kafka" or something else.
> > > > >>>>>>>>>> Calling it "ktable" is just because KSQL users can migrate
> > to
> > > > >>>>> Flink
> > > > >>>>>>>> SQL
> > > > >>>>>>>>>> easily.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Regarding the "value.fields-include", this is an option
> > > > >>>> introduced
> > > > >>>>>> in
> > > > >>>>>>>>>> FLIP-107 for Kafka connector.
> > > > >>>>>>>>>> I think we should keep the same behavior with the Kafka
> > > > >>>> connector.
> > > > >>>>>> I'm
> > > > >>>>>>>>> not
> > > > >>>>>>>>>> sure what's the default behavior of KSQL.
> > > > >>>>>>>>>> But I guess it also stores the keys in value from this
> > example
> > > > >>>>> docs
> > > > >>>>>>>> (see
> > > > >>>>>>>>>> the "users_original" table) [1].
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Best,
> > > > >>>>>>>>>> Jark
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> [1]:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>
> > > >
> > >
> >
> https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On Mon, 19 Oct 2020 at 18:17, Danny Chan <
> > > yuzhao....@gmail.com>
> > > > >>>>>>>> wrote:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> The concept seems conflicts with the Flink abstraction
> > > “dynamic
> > > > >>>>>>>> table”,
> > > > >>>>>>>>>>> in Flink we see both “stream” and “table” as a dynamic
> > table,
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> I think we should make clear first how to express stream
> > and
> > > > >>>>> table
> > > > >>>>>>>>>>> specific features on one “dynamic table”,
> > > > >>>>>>>>>>> it is more natural for KSQL because KSQL takes stream and
> > > table
> > > > >>>>> as
> > > > >>>>>>>>>>> different abstractions for representing collections. In
> > KSQL,
> > > > >>>>> only
> > > > >>>>>>>>> table is
> > > > >>>>>>>>>>> mutable and can have a primary key.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Does this connector belongs to the “table” scope or
> > “stream”
> > > > >>>>> scope
> > > > >>>>>> ?
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Some of the concepts (such as the primary key on stream)
> > > should
> > > > >>>>> be
> > > > >>>>>>>>>>> suitable for all the connectors, not just Kafka,
> Shouldn’t
> > > this
> > > > >>>>> be
> > > > >>>>>> an
> > > > >>>>>>>>>>> extension of existing Kafka connector instead of a
> totally
> > > new
> > > > >>>>>>>>> connector ?
> > > > >>>>>>>>>>> What about the other connectors ?
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Because this touches the core abstraction of Flink, we
> > better
> > > > >>>>> have
> > > > >>>>>> a
> > > > >>>>>>>>>>> top-down overall design, following the KSQL directly is
> not
> > > the
> > > > >>>>>>>> answer.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> P.S. For the source
> > > > >>>>>>>>>>>> Shouldn’t this be an extension of existing Kafka
> connector
> > > > >>>>>> instead
> > > > >>>>>>>> of
> > > > >>>>>>>>> a
> > > > >>>>>>>>>>> totally new connector ?
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> How could we achieve that (e.g. set up the parallelism
> > > > >>>>> correctly) ?
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Best,
> > > > >>>>>>>>>>> Danny Chan
> > > > >>>>>>>>>>> 在 2020年10月19日 +0800 PM5:17,Jingsong Li <
> > > jingsongl...@gmail.com
> > > > >>>>>>> ,写道:
> > > > >>>>>>>>>>>> Thanks Shengkai for your proposal.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> +1 for this feature.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Future Work: Support bounded KTable source
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> I don't think it should be a future work, I think it is
> > one
> > > > >>>> of
> > > > >>>>>> the
> > > > >>>>>>>>>>>> important concepts of this FLIP. We need to understand
> it
> > > > >>>> now.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Intuitively, a ktable in my opinion is a bounded table
> > > rather
> > > > >>>>>> than
> > > > >>>>>>>> a
> > > > >>>>>>>>>>>> stream, so select should produce a bounded table by
> > default.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> I think we can list Kafka related knowledge, because the
> > > word
> > > > >>>>>>>> `ktable`
> > > > >>>>>>>>>>> is
> > > > >>>>>>>>>>>> easy to associate with ksql related concepts. (If
> > possible,
> > > > >>>>> it's
> > > > >>>>>>>>> better
> > > > >>>>>>>>>>> to
> > > > >>>>>>>>>>>> unify with it)
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> What do you think?
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> value.fields-include
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> What about the default behavior of KSQL?
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Best,
> > > > >>>>>>>>>>>> Jingsong
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang <
> > > > >>>>> fskm...@gmail.com
> > > > >>>>>>>
> > > > >>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Hi, devs.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Jark and I want to start a new FLIP to introduce the
> > KTable
> > > > >>>>>>>>>>> connector. The
> > > > >>>>>>>>>>>>> KTable is a shortcut of "Kafka Table", it also has the
> > same
> > > > >>>>>>>>> semantics
> > > > >>>>>>>>>>> with
> > > > >>>>>>>>>>>>> the KTable notion in Kafka Stream.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> FLIP-149:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Currently many users have expressed their needs for the
> > > > >>>>> upsert
> > > > >>>>>>>> Kafka
> > > > >>>>>>>>>>> by
> > > > >>>>>>>>>>>>> mail lists and issues. The KTable connector has several
> > > > >>>>>> benefits
> > > > >>>>>>>> for
> > > > >>>>>>>>>>> users:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 1. Users are able to interpret a compacted Kafka Topic
> as
> > > > >>>> an
> > > > >>>>>>>> upsert
> > > > >>>>>>>>>>> stream
> > > > >>>>>>>>>>>>> in Apache Flink. And also be able to write a changelog
> > > > >>>> stream
> > > > >>>>>> to
> > > > >>>>>>>>> Kafka
> > > > >>>>>>>>>>>>> (into a compacted topic).
> > > > >>>>>>>>>>>>> 2. As a part of the real time pipeline, store join or
> > > > >>>>> aggregate
> > > > >>>>>>>>>>> result (may
> > > > >>>>>>>>>>>>> contain updates) into a Kafka topic for further
> > > > >>>> calculation;
> > > > >>>>>>>>>>>>> 3. The semantic of the KTable connector is just the
> same
> > as
> > > > >>>>>>>> KTable
> > > > >>>>>>>>> in
> > > > >>>>>>>>>>> Kafka
> > > > >>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and KSQL
> > users.
> > > > >>>>> We
> > > > >>>>>>>> have
> > > > >>>>>>>>>>> seen
> > > > >>>>>>>>>>>>> several questions in the mailing list asking how to
> > model a
> > > > >>>>>>>> KTable
> > > > >>>>>>>>>>> and how
> > > > >>>>>>>>>>>>> to join a KTable in Flink SQL.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> We hope it can expand the usage of the Flink with
> Kafka.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> I'm looking forward to your feedback.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Best,
> > > > >>>>>>>>>>>>> Shengkai
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> --
> > > > >>>>>>>>>>>> Best, Jingsong Lee
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> --
> > > > >>>>>>>>
> > > > >>>>>>>> Konstantin Knauf
> > > > >>>>>>>>
> > > > >>>>>>>> https://twitter.com/snntrable
> > > > >>>>>>>>
> > > > >>>>>>>> https://github.com/knaufk
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> --
> > > > >>>>
> > > > >>>> Konstantin Knauf
> > > > >>>>
> > > > >>>> https://twitter.com/snntrable
> > > > >>>>
> > > > >>>> https://github.com/knaufk
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> > > --
> > >
> > > Seth Wiesman | Solutions Architect
> > >
> > > +1 314 387 1463
> > >
> > > <https://www.ververica.com/>
> > >
> > > Follow us @VervericaData
> > >
> > > --
> > >
> > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> > > Conference
> > >
> > > Stream Processing | Event Driven | Real Time
> > >
> >
>

Reply via email to