Hi Dong, Thanks for the proposal! The FLIP is already in good shape. I got some NIT questions.
1. It is a little bit weird to write the hint right after the motivation that some features have been moved to FLIP-331, because at that time, readers don't know the context about what features does it mean. I would suggest moving the note to the beginning of "Public interfaces" sections. 2. It is also a little bit weird to describe all behaviour changes at first but only focus on one single feature, i.e. how to implement internalSorterSupported. TBH, I was lost while I was reading the Public interfaces. Maybe change the FLIP title? Another option could be to write a short summary of all features and point out that this FLIP will only focus on the internalSorterSupported feature. Others could be found in FLIP-331. WDYT? 3. There should be a typo at 4) Checkpoint and failover strategy -> Mixed mode -> - If any task fails when isBacklog=false true, this task is restarted to re-process its input from the beginning. Best regards Jing On Thu, Jul 6, 2023 at 1:24 PM Dong Lin <lindon...@gmail.com> wrote: > Hi Piotr, > > Thanks for your comments! Please see my reply inline. > > On Wed, Jul 5, 2023 at 11:44 PM Piotr Nowojski <piotr.nowoj...@gmail.com> > wrote: > > > Hi Dong, > > > > I have a couple of questions. > > > > Could you explain why those properties > > > > @Nullable private Boolean isOutputOnEOF = null; > > @Nullable private Boolean isOutputOnCheckpoint = null; > > @Nullable private Boolean isInternalSorterSupported = null; > > > > must be `@Nullable`, instead of having the default value set to `false`? > > > > By initializing these private variables in OperatorAttributesBuilder as > null, we can implement `OperatorAttributesBuilder#build()` in such a way > that it can print DEBUG level logging to say "isOutputOnCheckpoint is not > explicitly set". This can help user/SRE debug performance issues (or lack > of the expected optimization) due to operators not explicitly setting the > right operator attribute. > > For example, we might want a job to always use the longer checkpointing > interval (i.e. execution.checkpointing.interval-during-backlog) if all > running operators have isOutputOnCheckpoint==false, and use the short > checkpointing interval otherwise. If a user has explicitly configured the > execution.checkpointing.interval-during-backlog but the two-phase commit > sink library has not been upgraded to set isOutputOnCheckpoint=true, then > the job will end up using the long checkpointing interval, and it will be > useful to figure out what is going wrong in this case by checking the log. > > Note that the default value of these fields of the OperatorAttributes > instance built by OperatorAttributesBuilder will still be false. The > following is mentioned in the Java doc of > `OperatorAttributesBuilder#build()`: > > /** > * If any operator attribute is null, we will log it at DEBUG level and > use the following > * default values. > * - isOutputOnEOF defaults to false > * - isOutputOnCheckpoint defaults to false > * - isInternalSorterSupported defaults to false > */ > > > > > > Second question, have you thought about cases where someone is > > either bootstrapping from a streaming source like Kafka > > or simply trying to catch up after a long period of downtime in a purely > > streaming job? Generally speaking a cases where > > user doesn't care about latency in the catch up phase, regardless if the > > source is bounded or unbounded, but wants to process > > the data as fast as possible, and then switch dynamically to real time > > processing? > > > > Yes, I have thought about this. We should allow this job to effectively run > in batch mode when the job is in the catch-up phase. FLIP-327 is actually > an important step toward addressing this use-case. > > In order to address the above use-case, all we need is a way for source > operator (e.g. Kafka) to tell Flink runtime (via IsProcessingBacklog) > whether it is in the catch-up phase. > > Since every Kafka message has event-timestamp, we can allow users to > specify a job-level config such as backlog-watermark-lag-threshold, and > consider a Kafka Source to have IsProcessingBacklog=true if system_time - > watermark > backlog-watermark-lag-threshold. This effectively allows us to > determine whether Kafka is in the catch up phase. > > Once we have this capability (I plan to work on this in FLIP-328), we can > directly use the features proposed in FLIP-325 and FLIP-327 to optimize the > above use-case. > > What do you think? > > Best, > Dong > > > > > > Best, > > Piotrek > > > > niedz., 2 lip 2023 o 16:15 Dong Lin <lindon...@gmail.com> napisał(a): > > > > > Hi all, > > > > > > I am opening this thread to discuss FLIP-327: Support stream-batch > > unified > > > operator to improve job throughput when processing backlog data. The > > design > > > doc can be found at > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+stream-batch+unified+operator+to+improve+job+throughput+when+processing+backlog+data > > > . > > > > > > This FLIP enables a Flink job to initially operate in batch mode, > > achieving > > > high throughput while processing records that do not require low > > processing > > > latency. Subsequently, the job can seamlessly transition to stream mode > > for > > > processing real-time records with low latency. Importantly, the same > > state > > > can be utilized before and after this mode switch, making it > particularly > > > valuable when users wish to bootstrap the job's state using historical > > > data. > > > > > > We would greatly appreciate any comments or feedback you may have on > this > > > proposal. > > > > > > Cheers, > > > Dong > > > > > >