Hi, Stephan and Timo, thanks very much for your replies. I try to
reply to you one by one, split into multiple emails.

First, according to my understanding, from the perspective of database
(mysql) and storage, think about what streaming and batch are:
- Batch is the snapshot, it contains the full amount of records.
- Streaming is the binlog, the log contains “events” that describe
changes to table data.

Binlog helps us do two things:
1. As a write-ahead-log, it helps recover a snapshot in case of
failure, which can reduce the time delay of transaction commit.
2. Downstream businesses can use binlog for database synchronization
(or active & standby synchronization), or directly consume binlog for
further computation.

So back to your comments:

> (1) Log Implementation

I agree with you about log.system and kafka.bootstrap.servers, but I
think log.retention is a common concept and we should hide kafka here.

> (4) Data retention: how and when data is ever cleaned up

- The data of the snapshot will never expire, and the user needs to
delete the partition by themselves if needed.
- The expiration time of the log is unlimited by default, but it can
be configured. In fact, we only need the latest log by default,
because we have saved the previous snapshot.

> (3) "table-storage.log.scan.startup.mode"

I agree with you. By default, of course, read the snapshot first and
then log changes. However, there are many scenarios that require two
other options, such as:
- My scenario is a monitoring scenario. I only need to calculate the
latest data. The old data is outdated and useless to me.
- Today, the data before 8 o'clock has been calculated. I don't need
the previous data. I just start consumption from 8 o'clock, and then
add a filter condition later.

This seems to bring some complexity, but a large number of users will
actually involve the above two similar scenarios.

Actually, even if the user uses "the snapshot first and then log
changes", most scenarios are read incremental data too, but this
increment is defined by the user. For example, the user can stream
read with the following SQL:
SELECT * FROM T WHERE dt >= '2021-11-15';
The "dt" is the partition field of the user-defined table. In this
way, the job only consumes the data starting from today. Storage will
help read, partition pruning and switch to the log.

> Also, this seems to be a per-query setting, more than a global setting, so 
> should this be part of the config with which the query is submitted that 
> reads from the table-storage?

Global settings only affect the create table DDL, after DDL, the table
options will be stored in the Catalog, users can use dynamic table
options to configure reading.

> The names could also be improved a bit

+1

Best,
Jingsong


On Sat, Nov 13, 2021 at 1:31 AM Timo Walther <twal...@apache.org> wrote:
>
> Hi everyone,
>
> sorry for the delay in joining this thread. I went through the FLIP and
> have some comments (maybe overlapping with Stephan's comments, which I
> haven't read yet):
>
> a. > More importantly, in order to solve the cognitive bar...
>
> It would be great if we can add not only `Receive any type of changelog`
> but also `Receive any type of datatype`.
>
>
> b. > COMPACT [...] Compact table for high performance query. Launch a
> job to rewrite files.
>
> Please clarify whether this is a synchronous or asynchrounous operation
> in the API? So far all DDL was synchrounous. And only DML asynchrounous.
>
> c. > 'change-tracking' = 'false'
>
> I find this option a bit confusing. Even in batch scenarios we have a
> changelog, only with insert-only changes. Can you elaborate? Wouldn't
> 'exclude-from-log-store' or 'exclude-log-store' or 'log.disabled' be
> more accurate?
>
> d. > DESCRIBE DETAIL TABLE
>
> This seems very uncommon for SQL. How about `DESCRIBE TABLE EXTENDED`?
>
> e. > Set checkpoint interval to 1 min if checkpoint is not enabled when
> the planner detects a sink to built-in dynamic table.
>
> This sounds like too much magic to me. It will be super hard to debug
> why suddenly checkpointing is enabled. If a user has not configured the
> checkpointing yet, it could lead to unitended behavior without a proper
> checkpoint storage. It would be better to either throw an exception or
> just have weaker consistency guarantees in this case.
>
> f. > GenericCatalog
>
> Why do we need an additional marker interface without any methods in
> there? It only further complicates the catalog interfaces. Isn't a
> `Catalog#supportesTableStorage` enough? Also, we just introduced
> `CatalogBaseTable.TableKind` for exactly such new features. We can add a
> new table kind. A managed table can set this for a `CatalogTable`.
>
> g. > enrichOptions(Context context)
>
> Why is this method returning a Map<String, String>? Shouldn't the caller
> assume that all options enriched via `CatalogTable.copy` should have
> been applied by `enrichOptions`?
>
>
> h. Partitioning and Event-time:
>
> Have you considered to support semantics similar to
> `sink.partition-commit.trigger` based on `partition-time`. It could
> beneficial to have the partitions committed by watermarks as well. My
> biggest concern is how we can enable watermarking end-to-end using a
> file store (I think for log store this should not be a problem?).
>
> Looking forward to your feedback.
>
> Regards,
> Timo
>
>
> On 12.11.21 16:35, Stephan Ewen wrote:
> > Hi all!
> >
> > Thank you for the writeup of this feature. I like the general direction a
> > lot.
> >
> > There are some open questions and confusing details still, which I think we
> > need to clarify first to make this feature really good.
> > Below are questions/suggestions on the FLIP:
> >
> > Best,
> > Stephan
> >
> > ===============
> >
> > *(1) Log Implementation*
> >
> > I agree with Eron that we should not design this hardwired to Kafka. Let's
> > have the internal interfaces in place to make this open to other streaming
> > storage systems as well.
> > The config options seem to be designed in a way that is Kafka-exclusive.
> > Can we change this, for example to something like
> >    - storage.log.system=kafka
> >    - storage.log.kafka.properties.bootstrap.servers
> >    - storage.log.kafka.retention
> >
> > *(2) Change Tracking*
> >
> > I am not sure I understand this fully. When a batch query inserts without
> > change tracking what happens then?
> >    - does it skip writing to the change log?
> >    - does it simply overwrite the managed table with the new result?
> >    - something different?
> >
> > *(3) "table-storage.log.scan.startup.mode"*
> >
> > Somehow the presence of this flag seems to break the abstraction of managed
> > tables.
> > Let's say someone creates a managed table that is computed via a query over
> > another managed table. It would need all the data from the previous table,
> > or it would be inconsistent.
> >
> > What is the reason to have this setting? Support cases where one doesn't
> > need all past data (let's say only data from the previous month)? Exposing
> > this again somewhat destroys the nice "transparent out of the box"
> > behavior, because now users need to think again about the incremental
> > building of the tables. I think that case shows that we miss a bit better
> > handling of data retention (see next point).
> >
> > Also, this seems to be a per-query setting, more than a global setting, so
> > should this be part of the config with which the query is submitted that
> > reads from the table-storage?
> >
> > The names could also be improved a bit, I think, for example we could call
> > it just  "table-storage.log.scan" with values "full", "latest",
> > "from-timestamp".
> >
> > *(4) Data retention*
> >
> > I am wondering how and when data is ever cleaned up.
> > For example, when the table definition has a time attribute and predicate
> > so that the managed table should only contain the data from the previous
> > month. How does old data get cleaned up? Only through deletes coming from
> > timers in the Flink SQL layer?
> >
> > I think if we want this to be really good and efficient, we need to look at
> > dropping data during the compaction. The compaction should know it needs to
> > retain only data from WaterMark - 1 month or so. That is somewhat similar
> > to the optimization I proposed also for SQL in general, to get rid of
> > timers and only use TTL (and compaction filters) for data expiration. I
> > think for managed tables, this is even more crucial for performance.
> >
> > But it would mean that we need to have a better model for inferring
> > required data retention based on predicates over the time columns, and not
> > simply just have fixed retention based on the watermark.
> >
> >
> > *(5) Different formats for cases with PK and without PK*
> >
> > The FLIP proposes Debezium-Avro for cases without a PK and just Arvo for
> > cases with PK.
> >
> > Do we expect that some users directly subscribe to the Table Changelog,
> > meaning directly read via a Kafka Consumer from the topic?
> >    - I would expect that this will happen, because users want to avoid
> > writing the log twice (one for Flink managed table queries, one for
> > external subscribers).
> >    - If this is publicly exposed, then the fact that it uses different
> > formats in different cases (PK or no PK) seems really confusing and not
> > intuitive for users.
> >    - Can the format be just Debezium-JSON in all cases?
> >
> > *(6) Different consistency guarantees with PK and without PK*
> >
> > Is this purely an internal implementation detail, or will users see a
> > difference? My understanding is that users see a difference.
> > Having that difference implicitly happen when users add a PK reference
> > seems very confusing to me. What about cases where the table has a PK
> > (because users want the data in Kafka that way) but want transactional
> > consistency?
> >
> > If we need the "low-latency eventual consistency" mode with PKs, I would
> > suggest making this a separate mode that users can choose to activate if
> > they want.
> > We can restrict it to cases that have a PK, but not automatically change
> > the behavior when a PK is declared.
> >
> > *(7) Eventual Consistency Mode vs. Faster Checkpoints*
> >
> > The eventual consistency mode with PK seems mainly there to get lower
> > latency for the changelog. What latencies are we looking for here?
> > There is also the work on generalized incremental checkpoints, which should
> > get the latency down to a few seconds, would that be good enough?
> >
> > The current Upsert Kafka Source (which would be used with PK eventual
> > consistency mode) has a big inefficiency in the way it needs to retain all
> > state to convert the records to changelog records. That is also a high
> > price to pay for that mode.
> >
> > *(8) Concurrent Write / Locking*
> >
> > I don't fully understand what the plan is.
> >
> >    - What is the locking philosophy in general, what operations are mutually
> > exclusive (probably multiple insert statements), what can happen at the
> > same time (only query and compaction)?
> >    - what exactly is the optimistic locking behavior here, when can
> > something that failed succeed on the next retry? Can a compaction fail
> > while there is an insert statement? How can it then succeed when retried?
> > Or can the compaction only interfere with the insert statement in special
> > situations?
> >
> >    - what is the lock scoped to? a single query (via jobID)?
> >    - what happens if a system that holds (like an entry in the Hive
> > Metastore Database) a lock crashes and is manually restarted as a new job
> > (from the latest checkpoint)? does that need manual admin intervention to
> > release the lock (manually update the HMS Database)?
> >
> > *(9) Miscellaneous:*
> >
> > Language/typos:
> >    - "A single bucket can only be written by a single parallelism" => "A
> > single bucket can only be written by a single parallel task"
> >
> > Maybe call the "GenericCatalog" instead "FlinkInternalCatalog" or just
> > "FlinkCatalog"?
> >
>


--
Best, Jingsong Lee

Reply via email to