Hi all.

It seems that we've reached at an agreement. We'll rename "full" scan.mode
to "latest-full" and "compacted" scan.mode to "compacted-full".

I've created a ticket about this [1] and will work on it soon.

[1] https://issues.apache.org/jira/browse/FLINK-30410

Shammon FY <zjur...@gmail.com> 于2022年12月12日周一 16:00写道:

> Hi jinsong, caizhi
>
> Thank you for your reply.
>
> I found that what really confused me was the starting position of the
> streaming job to read data. As @caizhi mentioned in kafka, streaming jobs
> only read incremental data and its definition is very clear. In the Table
> Store, streaming jobs also have the mode of reading full data first, we
> need special consideration for it.
>
> So I really agree with @jinsong's "Solution 3", it looks good to me. We
> define that streaming and batch jobs have different read modes by default,
> and read data according to the starting position defined by scan.mode.
> Personally, I think it is easy for users to understand. For reading modes
> beyond the default, for example, full reading for streaming jobs, add
> suffixes such as "full" in scan.mode looks nice. In this way, the
> definition will be clearer, and the read mode will be unified through
> scan.mode. Thanks
>
>
> Best,
> Shammon
>
>
> On Mon, Dec 12, 2022 at 2:49 PM Caizhi Weng <tsreape...@gmail.com> wrote:
>
>> Thanks Shammon for bringing up the discussion.
>>
>> My opinion is to combine everything into scan.mode so that we don't
>> have orthogonal options.
>>
>> You first mention that there are two disadvantages for this solution.
>>
>> *1. The behaviors of some StartupModes in Stream and Batch jobs are
>> inconsistent*
>> This is acceptable from my point of view, because streaming jobs and
>> batch jobs are different after all. Streaming jobs should monitor new
>> changes while batch jobs should only see the records when it is started.
>> This behavior is expected by the users.
>>
>> *2. StartupMode does not define all data reading modes.*
>> Not all reading modes are useful. For example, you mention a reading mode
>> where user first want to read the snapshot at a timestamp, then read all
>> incremental changes. Could you explain in what scenario a user will need
>> this?
>>
>> About the solution you proposed, I think the biggest problem is the
>> default value. That is, if we should read full snapshots by default.
>>
>> When user runs a streaming job with "from-timestamp", he expects to just
>> read incremental changes after the timestamp, just like he is using a
>> "from-timestamp" starting mode of Kafka.
>>
>> However, when user runs a streaming job without a starting timestamp, he
>> expects the result to be correct. In order to provide correct result, we
>> have to produce full snapshots first.
>>
>> So you can see that, for different streaming jobs, the default behavior
>> about whether to read full snapshots is different. It is hard to pick a
>> default value without disturbing the users.
>>
>> Jingsong Li <jingsongl...@gmail.com> 于2022年12月9日周五 17:33写道:
>>
>>> Thanks Shammon!
>>>
>>> Your summary was very good and very detailed.
>>>
>>> I thought about it again.
>>>
>>> ## Solution 1
>>> Actually, according to what you said, there should be so many modes in
>>> theory.
>>> - Runtime-mode: streaming or batch.
>>> - Range: full or incremental.
>>> - Position: Latest, timestamp, snapshot-id, compacted.
>>>
>>> Advantages: The disassembly is very detailed, and every action is very
>>> clear.
>>> Disadvantages: There are many combinations from orthogonality. In
>>> combination with runtime-mode stream or batch, we can say that there
>>> are 16 modes from orthogonality, many of which are meaningless. As you
>>> said, default behavior is also a problem.
>>>
>>> ## Solution 2
>>> Currently [1]:
>>> - The environment determines the runtime-mode whether it is streaming
>>> or a batch.
>>> - The `scan.mode` determines position.
>>> - No specific option determines `range`, but it is determined by
>>> runtime-mode. However, it is not completely determined by runtime
>>> mode, such as `full` and `compacted`, which are also read in
>>> full-range under the stream.
>>>
>>> Advantages: Simple. The default values of options are what we want for
>>> streaming and batch.
>>> Disadvantages:
>>> 1. The semantics of from timestamp are different in the case of
>>> streaming and batch.
>>> 2. `full` and `compacted` are special.
>>>
>>> ## Solution 3
>>>
>>> I understand that the core problem of solution2 may be more problem 2:
>>> `full` and `compacted` are special.
>>> How about:
>>> - the runtime mode determines whether to read incremental only or full
>>> data.
>>> - `scan.mode` contains: Latest, timestamp, snapshot-id.
>>> The default is full in batch mode and incremental in stream mode.
>>>
>>> However, we have two other choices for `scan.mode`: `latest-full`,
>>> `compacted-full`. Regardless of the runtime-mode, the two choices
>>> force full range to read.
>>>
>>> I think solution 3 is a compromise solution. It can also ensure the
>>> availability of default values. Conceptually, it can at least explain
>>> the current options.
>>>
>>> What do you think?
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/configuration/
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Thu, Dec 8, 2022 at 4:11 PM Shammon FY <zjur...@gmail.com> wrote:
>>> >
>>> > Hi devs:
>>> >
>>> > I'm an engineer from ByteDance, and here I'd link to discuss "Scan
>>> mode in Table Store for Flink Stream and Batch job".
>>> >
>>> > Users can execute Flink Steam and Batch jobs on Table Store. In Table
>>> Store 0.2 there're two items which determine how the Stream and Batch jobs'
>>> sources read data: StartupMode and config in Options.
>>> > 1. StartupMode
>>> >   a) DEFAULT. Determines actual startup mode according to other table
>>> properties.  If \"scan.timestamp-millis\" is set, the actual startup mode
>>> will be \"from-timestamp\" mode. Otherwise, the actual startup mode will be
>>> \"full\".
>>> >   b) FULL. For streaming sources, read the latest snapshot on the
>>> table upon first startup, and continue to read the latest changes. For
>>> batch sources, just consume the latest snapshot but do not read new changes.
>>> >   c) LATEST. For streaming sources, continuously reads the latest
>>> changes without reading a snapshot at the beginning.  For batch sources,
>>> behaves the same as the \"full\" startup mode.
>>> >   d) FROM_TIMESTAMP. For streaming sources, continuously reads changes
>>> starting from timestamp specified by \"scan.timestamp-millis\", without
>>> reading a snapshot at the beginning. For batch sources, read a snapshot at
>>> timestamp specified by \"scan.timestamp-millis\" but do not read new
>>> changes.
>>> > 2. Config in Options
>>> >   a) scan.timestamp-millis, log.scan.timestamp-millis. Optional
>>> timestamp used in case of \"from-timestamp\" scan mode.
>>> >   b) read.compacted. Read the latest compact snapshots only.
>>> >
>>> > After discussing with @Jingsong Li and @Caizhi wen, we found that the
>>> config in Options and StartupMode are not orthogonal. For example,
>>> read.compacted and FROM_TIMESTAMP mode and its behavior in Stream and Batch
>>> sources. We want to improve StartupMode to unify the data reading mode of
>>> Stream and Batch jobs, and add the following StartupMode item:
>>> > COMPACTED: For streaming sources, read a snapshot after the latest
>>> compaction on the table upon first startup, and continue to read the latest
>>> changes. For batch sources, just read a snapshot after the latest
>>> compaction but do not read new changes.
>>> > The advantage is that for Stream and Batch jobs, we only need to
>>> determine their behavior through StartupMode, but we also found two main
>>> problems:
>>> > 1. The behaviors of some StartupModes in Stream and Batch jobs are
>>> inconsistent, which may cause user misunderstanding, such as FROM_
>>> TIMESTAMP: streaming job reads incremental data, while batch job reads full
>>> data
>>> > 2. StartupMode does not define all data reading modes. For example,
>>> streaming jobs read snapshots according to timestamp, and then read
>>> incremental data.
>>> >
>>> > To support all data reading modes in Table Store such as time travel,
>>> we try to divide data reading into two orthogonal dimensions: data reading
>>> range and startup position.
>>> > 1. Data reading range
>>> >   a) Incremental, read delta data
>>> >   b) Full, read a snapshot first, then read the incremental data
>>> according to different job types (Streaming job).
>>> > 2. Startup position
>>> >   a) Latest, read the latest snapshot.
>>> >   b) From-timestamp,read a snapshot according to the timestamp.
>>> >   c) From-snapshot-id,read a snapshot according to the snapshot id.
>>> >   d) Compacted,read the latest compacted snapshot.
>>> >
>>> > Then there're two questions:
>>> >
>>> > Q1: Does it need to divide into two configuration items, or combine
>>> and unify them in StartupMode?
>>> > In StartupMode, it can be unified into eight modes. The advantage is
>>> that through StartupMode, we can clearly explain the behavior of stream and
>>> batch jobs, but there are many defined items, some of which are meaningless
>>> > There can be two configurations, users may need to understand the
>>> different meanings of the two configurations. After combination, stream and
>>> batch jobs will have different processing logic.
>>> >
>>> > Q2: In Table Store 0.2, there are some conflicting definitions in
>>> Stream and Batch. Does Table Store 0.3 need to be fully compatible or
>>> directly implemented according to the new definitions? E.g. behavior in
>>> FROM_TIMESTAMP mode.
>>> > Fully compatible with the definition of Table Store 0.2, users can
>>> directly execute queries in 0.2 on 0.3, but semantic conflicts in the
>>> original definition will always exist in the next versions.
>>> > Only limited compatibility is available, and the default behavior is
>>> implemented according to the new definition, which may result in the query
>>> error of 0.3 when the user queries on 0.2 successfully.
>>> >
>>> > Look forward to your feedback, thanks
>>> >
>>> > Best,
>>> > Shammon
>>> >
>>> >
>>> >
>>>
>>

Reply via email to