Hi all!

Thank you for the writeup of this feature. I like the general direction a
lot.

There are some open questions and confusing details still, which I think we
need to clarify first to make this feature really good.
Below are questions/suggestions on the FLIP:

Best,
Stephan

===============

*(1) Log Implementation*

I agree with Eron that we should not design this hardwired to Kafka. Let's
have the internal interfaces in place to make this open to other streaming
storage systems as well.
The config options seem to be designed in a way that is Kafka-exclusive.
Can we change this, for example to something like
  - storage.log.system=kafka
  - storage.log.kafka.properties.bootstrap.servers
  - storage.log.kafka.retention

*(2) Change Tracking*

I am not sure I understand this fully. When a batch query inserts without
change tracking what happens then?
  - does it skip writing to the change log?
  - does it simply overwrite the managed table with the new result?
  - something different?

*(3) "table-storage.log.scan.startup.mode"*

Somehow the presence of this flag seems to break the abstraction of managed
tables.
Let's say someone creates a managed table that is computed via a query over
another managed table. It would need all the data from the previous table,
or it would be inconsistent.

What is the reason to have this setting? Support cases where one doesn't
need all past data (let's say only data from the previous month)? Exposing
this again somewhat destroys the nice "transparent out of the box"
behavior, because now users need to think again about the incremental
building of the tables. I think that case shows that we miss a bit better
handling of data retention (see next point).

Also, this seems to be a per-query setting, more than a global setting, so
should this be part of the config with which the query is submitted that
reads from the table-storage?

The names could also be improved a bit, I think, for example we could call
it just  "table-storage.log.scan" with values "full", "latest",
"from-timestamp".

*(4) Data retention*

I am wondering how and when data is ever cleaned up.
For example, when the table definition has a time attribute and predicate
so that the managed table should only contain the data from the previous
month. How does old data get cleaned up? Only through deletes coming from
timers in the Flink SQL layer?

I think if we want this to be really good and efficient, we need to look at
dropping data during the compaction. The compaction should know it needs to
retain only data from WaterMark - 1 month or so. That is somewhat similar
to the optimization I proposed also for SQL in general, to get rid of
timers and only use TTL (and compaction filters) for data expiration. I
think for managed tables, this is even more crucial for performance.

But it would mean that we need to have a better model for inferring
required data retention based on predicates over the time columns, and not
simply just have fixed retention based on the watermark.


*(5) Different formats for cases with PK and without PK*

The FLIP proposes Debezium-Avro for cases without a PK and just Arvo for
cases with PK.

Do we expect that some users directly subscribe to the Table Changelog,
meaning directly read via a Kafka Consumer from the topic?
  - I would expect that this will happen, because users want to avoid
writing the log twice (one for Flink managed table queries, one for
external subscribers).
  - If this is publicly exposed, then the fact that it uses different
formats in different cases (PK or no PK) seems really confusing and not
intuitive for users.
  - Can the format be just Debezium-JSON in all cases?

*(6) Different consistency guarantees with PK and without PK*

Is this purely an internal implementation detail, or will users see a
difference? My understanding is that users see a difference.
Having that difference implicitly happen when users add a PK reference
seems very confusing to me. What about cases where the table has a PK
(because users want the data in Kafka that way) but want transactional
consistency?

If we need the "low-latency eventual consistency" mode with PKs, I would
suggest making this a separate mode that users can choose to activate if
they want.
We can restrict it to cases that have a PK, but not automatically change
the behavior when a PK is declared.

*(7) Eventual Consistency Mode vs. Faster Checkpoints*

The eventual consistency mode with PK seems mainly there to get lower
latency for the changelog. What latencies are we looking for here?
There is also the work on generalized incremental checkpoints, which should
get the latency down to a few seconds, would that be good enough?

The current Upsert Kafka Source (which would be used with PK eventual
consistency mode) has a big inefficiency in the way it needs to retain all
state to convert the records to changelog records. That is also a high
price to pay for that mode.

*(8) Concurrent Write / Locking*

I don't fully understand what the plan is.

  - What is the locking philosophy in general, what operations are mutually
exclusive (probably multiple insert statements), what can happen at the
same time (only query and compaction)?
  - what exactly is the optimistic locking behavior here, when can
something that failed succeed on the next retry? Can a compaction fail
while there is an insert statement? How can it then succeed when retried?
Or can the compaction only interfere with the insert statement in special
situations?

  - what is the lock scoped to? a single query (via jobID)?
  - what happens if a system that holds (like an entry in the Hive
Metastore Database) a lock crashes and is manually restarted as a new job
(from the latest checkpoint)? does that need manual admin intervention to
release the lock (manually update the HMS Database)?

*(9) Miscellaneous:*

Language/typos:
  - "A single bucket can only be written by a single parallelism" => "A
single bucket can only be written by a single parallel task"

Maybe call the "GenericCatalog" instead "FlinkInternalCatalog" or just
"FlinkCatalog"?

Reply via email to