Hi Xintong, Thanks for your reply.
> Does this mean if we want to support (KeyedStream, BroadcastStream) -> (KeyedStream), we must make sure that no data can be output upon processing records from the input BroadcastStream? That's probably a reasonable limitation. I think so, this is the restriction that has to be imposed in order to avoid re-partition(i.e. shuffle). If one just want to get a keyed-stream and don't care about the data distribution, then explicit KeyBy partitioning works as expected. > The problem is would this limitation be too implicit for the users to understand. Since we can't check for this limitation at compile time, if we were to add support for this case, we would have to introduce additional runtime checks to ensure program correctness. For now, I'm inclined not to support it, as it's hard for users to understand this restriction unless we have something better. And we can always add it later if we do realize there's a strong demand for it. > 1. I'd suggest renaming the method with timestamp to something like `collectAndOverwriteTimestamp`. That might help users understand that they don't always need to call this method, unless they explicitly want to overwrite the timestamp. Make sense, I have updated this FLIP toward this new method name. > 2. While this method provides a way to set timestamps, how would users read timestamps from the records? Ah, good point. I will introduce a new method to get the timestamp of the current record in RuntimeContext. Best regards, Weijie Xintong Song <tonysong...@gmail.com> 于2024年1月30日周二 14:04写道: > Just trying to understand. > > > Is there a particular reason we do not support a > > `TwoInputProcessFunction` to combine a KeyedStream with a > > BroadcastStream to result in a KeyedStream? There seems to be a valid > > use case where a KeyedStream is enriched with a BroadcastStream and > > returns a Stream that is partitioned in the same way. > > > > The key point here is that if the returned stream is a KeyedStream, we > > require that the partition of input and output be the same. As for the > > data on the broadcast edge, it will be broadcast to all parallelism, we > > cannot keep the data partition consistent. For example, if a specific > > record is sent to both SubTask1 and SubTask2, after processing, the > > partition index calculated by the new KeySelector is `1`, then the data > > distribution of SubTask2 has obviously changed. > > > Does this mean if we want to support (KeyedStream, BroadcastStream) -> > (KeyedStream), we must make sure that no data can be output upon processing > records from the input BroadcastStream? That's probably a reasonable > limitation. The problem is would this limitation be too implicit for the > users to understand. > > > > I noticed that there are two `collect` methods in the Collector, > > > > one with a timestamp and one without. Could you elaborate on the > > differences between them? Additionally, in what use case would one use > > the method that includes the timestamp? > > > > > > That's a good question, and it's mostly used with time-related operators > > such as Window. First, we want to give the process function the ability > to > > reset timestamps, which makes it more flexible than the original > > API. Second, we don't want to take the timestamp extraction > > operator/function as a base primitive, it's more like a high-level > > extension. Therefore, the framework must provide this functionality. > > > > > 1. I'd suggest renaming the method with timestamp to something like > `collectAndOverwriteTimestamp`. That might help users understand that they > don't always need to call this method, unless they explicitly want to > overwrite the timestamp. > > 2. While this method provides a way to set timestamps, how would users read > timestamps from the records? > > > Best, > > Xintong > > > > On Tue, Jan 30, 2024 at 12:45 PM weijie guo <guoweijieres...@gmail.com> > wrote: > > > Hi Xuannan, > > > > Thank you for your attention. > > > > > In the partitioning section, it says that "broadcast can only be > > used as a side-input of other Inputs." Could you clarify what is meant > > by "side-input"? If I understand correctly, it refer to one of the > > inputs of the `TwoInputStreamProcessFunction`. If that is the case, > > the term "side-input" may not be accurate. > > > > Yes, you got it right! I have rewrote this sentence to avoid > > misunderstanding. > > > > > Is there a particular reason we do not support a > > `TwoInputProcessFunction` to combine a KeyedStream with a > > BroadcastStream to result in a KeyedStream? There seems to be a valid > > use case where a KeyedStream is enriched with a BroadcastStream and > > returns a Stream that is partitioned in the same way. > > > > The key point here is that if the returned stream is a KeyedStream, we > > require that the partition of input and output be the same. As for the > > data on the broadcast edge, it will be broadcast to all parallelism, we > > cannot keep the data partition consistent. For example, if a specific > > record is sent to both SubTask1 and SubTask2, after processing, the > > partition index calculated by the new KeySelector is `1`, then the data > > distribution of SubTask2 has obviously changed. > > > > > 3. There appears to be a typo in the example code. The > > `SingleStreamProcessFunction` should probably be > > `OneInputStreamProcessFunction`. > > > > Yes, good catch. I have updated this FLIP. > > > > > 4. How do we set the global configuration for the > > ExecutionEnvironment? Currently, we have the > > StreamExecutionEnvironment.getExecutionEnvironment(Configuration) > > method to provide the global configuration in the API. > > > > This is because we don't want to allow set config programmatically in the > > new API, everything best comes from configuration files. However, this > may > > be too ideal, and the specific details need to be considered and > discussed > > in more detail, and I propose to devote a new sub-FLIP to this issue > later. > > We can easily provide the `getExecutionEnvironment(Configuration)` or > > `withConfiguration(Configuration)` method later. > > > > > I noticed that there are two `collect` methods in the Collector, > > one with a timestamp and one without. Could you elaborate on the > > differences between them? Additionally, in what use case would one use > > the method that includes the timestamp? > > > > That's a good question, and it's mostly used with time-related operators > > such as Window. First, we want to give the process function the ability > to > > reset timestamps, which makes it more flexible than the original > > API. Second, we don't want to take the timestamp extraction > > operator/function as a base primitive, it's more like a high-level > > extension. Therefore, the framework must provide this functionality. > > > > > > Best regards, > > > > Weijie > > > > > > weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二 11:45写道: > > > > > Hi Yunfeng, > > > > > > Thank you for your attention > > > > > > > 1. Will we provide any API to support choosing which input to consume > > > between the two inputs of TwoInputStreamProcessFunction? It would be > > > helpful in online machine learning cases, where a process function > > > needs to receive the first machine learning model before it can start > > > predictions on input data. Similar requirements might also exist in > > > Flink CEP, where a rule set needs to be consumed by the process > > > function first before it can start matching the event stream against > > > CEP patterns. > > > > > > Good point! I think we can provide a `nextInputSelection()` method for > > > `TwoInputStreamProcessFunction`. It returns a ·First/Second· enum that > > > determines which Input the mailbox thread will read next. But I'm > > > considering putting it in the sub-FLIP related to Join, since features > > like > > > HashJoin have a more specific need for this. > > > > > > > A typo might exist in the current FLIP describing the API to > > > generate a global stream, as I can see either global() or coalesce() > > > in different places of the FLIP. These two methods might need to be > > > unified into one method. > > > > > > Good catch! I have updated this FLIP to fix this typo. > > > > > > > The order of parameters in the current ProcessFunction is (record, > > > context, output), while this FLIP proposes to change the order into > > > (record, output, context). Is there any reason to make this change? > > > > > > No, it's just the order we decide. But please note that there is no > > > relationship between the two ProcessFunction's anyway. I think it's > okay > > to > > > use our own order of parameters in new API. > > > > > > 4. Why does this FLIP propose to use connectAndProcess() instead of > > > connect() (+ keyBy()) + process()? The latter looks simpler to me. > > > > > > > I actually also considered this way at first, but it would have to > > > introduce some concepts like ConnectedStreams. But we hope that streams > > > will be more clearly defined in the DataStream API, otherwise we will > end > > > up going the same way as the original API, which you have to understand > > > `JoinedStreams/ConnectedStreams` and so on. > > > > > > > > > > > > Best regards, > > > > > > Weijie > > > > > > > > > weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二 11:20写道: > > > > > >> Hi Wencong: > > >> > > >> Thank you for your attention > > >> > > >> > Q1. Other DataStream types are converted into > > >> Non-Keyed DataStreams by using a "shuffle" operation > > >> to convert Input into output. Does this "shuffle" include the > > >> various repartition operations (rebalance/rescale/shuffle) > > >> from DataStream V1? > > >> > > >> Yes, The name `shuffle` is used only to represent the transformation > of > > >> an arbitrary stream into a non-keyed partitioned stream and does not > > >> restrict how the data is partitioned. > > >> > > >> > > >> > Q2. Why is the design for TwoOutputStreamProcessFunction, > > >> when dealing with a KeyedStream, only outputting combinations > > >> of (Keyed + Keyed) and (Non-Keyed + Non-Keyed)? > > >> > > >> In theory, we could only provide functions that return Non-Keyed > > streams. > > >> If you do want a KeyedStream, you explicitly convert it to a > KeyedStream > > >> via keyBy. However, because sometimes data is processed without > changing > > >> the partition, we choose to provide an additional KeyedStream > > counterpart > > >> to reduce the shuffle overhead. We didn't introduce the non-keyed + > > >> keyed combo here simply because it's not very common, and if we really > > see > > >> a lot of users asking for it later on, it's easy to support it then. > > >> > > >> > > >> Best regards, > > >> > > >> Weijie > > >> > > >> > > >> Xuannan Su <suxuanna...@gmail.com> 于2024年1月29日周一 18:28写道: > > >> > > >>> Hi Weijie, > > >>> > > >>> Thank you for driving the design of the new DataStream API. I have a > > >>> few questions regarding the FLIP: > > >>> > > >>> 1. In the partitioning section, it says that "broadcast can only be > > >>> used as a side-input of other Inputs." Could you clarify what is > meant > > >>> by "side-input"? If I understand correctly, it refer to one of the > > >>> inputs of the `TwoInputStreamProcessFunction`. If that is the case, > > >>> the term "side-input" may not be accurate. > > >>> > > >>> 2. Is there a particular reason we do not support a > > >>> `TwoInputProcessFunction` to combine a KeyedStream with a > > >>> BroadcastStream to result in a KeyedStream? There seems to be a valid > > >>> use case where a KeyedStream is enriched with a BroadcastStream and > > >>> returns a Stream that is partitioned in the same way. > > >>> > > >>> 3. There appears to be a typo in the example code. The > > >>> `SingleStreamProcessFunction` should probably be > > >>> `OneInputStreamProcessFunction`. > > >>> > > >>> 4. How do we set the global configuration for the > > >>> ExecutionEnvironment? Currently, we have the > > >>> StreamExecutionEnvironment.getExecutionEnvironment(Configuration) > > >>> method to provide the global configuration in the API. > > >>> > > >>> 5. I noticed that there are two `collect` methods in the Collector, > > >>> one with a timestamp and one without. Could you elaborate on the > > >>> differences between them? Additionally, in what use case would one > use > > >>> the method that includes the timestamp? > > >>> > > >>> Best regards, > > >>> Xuannan > > >>> > > >>> > > >>> > > >>> On Fri, Jan 26, 2024 at 2:21 PM Yunfeng Zhou > > >>> <flink.zhouyunf...@gmail.com> wrote: > > >>> > > > >>> > Hi Weijie, > > >>> > > > >>> > Thanks for raising discussions about the new DataStream API. I > have a > > >>> > few questions about the content of the FLIP. > > >>> > > > >>> > 1. Will we provide any API to support choosing which input to > consume > > >>> > between the two inputs of TwoInputStreamProcessFunction? It would > be > > >>> > helpful in online machine learning cases, where a process function > > >>> > needs to receive the first machine learning model before it can > start > > >>> > predictions on input data. Similar requirements might also exist in > > >>> > Flink CEP, where a rule set needs to be consumed by the process > > >>> > function first before it can start matching the event stream > against > > >>> > CEP patterns. > > >>> > > > >>> > 2. A typo might exist in the current FLIP describing the API to > > >>> > generate a global stream, as I can see either global() or > coalesce() > > >>> > in different places of the FLIP. These two methods might need to be > > >>> > unified into one method. > > >>> > > > >>> > 3. The order of parameters in the current ProcessFunction is > (record, > > >>> > context, output), while this FLIP proposes to change the order into > > >>> > (record, output, context). Is there any reason to make this change? > > >>> > > > >>> > 4. Why does this FLIP propose to use connectAndProcess() instead of > > >>> > connect() (+ keyBy()) + process()? The latter looks simpler to me. > > >>> > > > >>> > Looking forward to discussing these questions with you. > > >>> > > > >>> > Best regards, > > >>> > Yunfeng Zhou > > >>> > > > >>> > On Tue, Dec 26, 2023 at 2:44 PM weijie guo < > > guoweijieres...@gmail.com> > > >>> wrote: > > >>> > > > > >>> > > Hi devs, > > >>> > > > > >>> > > > > >>> > > I'd like to start a discussion about FLIP-409: DataStream V2 > > Building > > >>> > > Blocks: DataStream, Partitioning and ProcessFunction [1]. > > >>> > > > > >>> > > > > >>> > > As the first sub-FLIP for DataStream API V2, we'd like to discuss > > and > > >>> > > try to answer some of the most fundamental questions in stream > > >>> > > processing: > > >>> > > > > >>> > > 1. What kinds of data streams do we have? > > >>> > > 2. How to partition data over the streams? > > >>> > > 3. How to define a processing on the data stream? > > >>> > > > > >>> > > The answer to these questions involve three core concepts: > > >>> DataStream, > > >>> > > Partitioning and ProcessFunction. In this FLIP, we will discuss > the > > >>> > > definitions and related API primitives of these concepts in > detail. > > >>> > > > > >>> > > > > >>> > > You can find more details in FLIP-409 [1]. This sub-FLIP is at > the > > >>> > > heart of the entire DataStream API V2, and its relationship with > > >>> other > > >>> > > sub-FLIPs can be found in the umbrella FLIP [2]. > > >>> > > > > >>> > > > > >>> > > Looking forward to hearing from you, thanks! > > >>> > > > > >>> > > > > >>> > > Best regards, > > >>> > > > > >>> > > Weijie > > >>> > > > > >>> > > > > >>> > > > > >>> > > [1] > > >>> > > > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-409%3A+DataStream+V2+Building+Blocks%3A+DataStream%2C+Partitioning+and+ProcessFunction > > >>> > > > > >>> > > [2] > > >>> > > > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2 > > >>> > > >> > > >