Hi Arvid,

Thanks a lot for the detailed reply.

Just to clarify, I don't plan to ask user to implement
KafkaRecordDeserializationSchema#isEndOfStream(T). My current idea is to
not add any public API, but expect users to re-use the
existing Collector::close() API inside
KafkaRecordDeserializationSchema::deserialize(...). And if a message with
the user-specified pattern has arrived, the user can invoke
Collector::close() which signals Flink to stop reading from the
corresponding source split.

Here are a few clarifications in response to the discussion:

1) The boundedness of the source and execution.runtime-mode would not be
affected by this proposal. Users can keep using the existing setting
without or without the dynamic EOF.
2) The dynamic EOF works independently of the
stop stoppingOffsetsInitializer. When reading from Kafka Source, users can
rely on just the dynamic EOF without specifying stoppingOffsetsInitializer.
And if users specify both dynamic EOF and stoppingOffsetsInitializer, the
job stops reading from the source split when either condition is met.
3) Suppose users can specify the dynamic EOF in
KafkaRecordDeserializationSchema::deserialize(...), then users have access
to the entire ConsumerRecord. This approach could address Ayush's use-case.
4) Suppose we choose to do it in
KafkaRecordDeserializationSchema::deserialize(...), then the dynamic EOF
happens inside the RecordEmitter. Yes we will need to be able to close the
split.
5) For the majority of users who do not want dynamic EOF, those users can
keep using the existing out-of-the-box support for Avro/Json/Protobuf. For
advanced users who want dynamic EOF, those users anyway need to encode the
dynamic EOF logic in a method similar to  KafkaRecordDeserializationSchema
(with access to the raw message). Adding the dynamic EOF support would not
make their life harder.

Based on the discussion so far, it looks like there are two approaches
mentioned so far:

1) Let users call Collector::close() API inside
KafkaRecordDeserializationSchema::deserialize(...) to signal the EOF.

2) Add the API KafkaSourceBuilder::setBoundedStopCursor(StopCursor), where
StopCursor subsumes all existing functionalities of
the stoppingOffsetsInitializer. And StopCursor::shouldStop needs to take
both the raw and the deserialized message.

It seems that the second approach involves much more API change than the
first work (including deprecation of some existing APIs).

Regarding the first approach, could you help explain why "close is the
completely wrong method for that"? My understanding is the close() method
indicates that the caller no longer needs to read from this source split
and the associated network resource could be released. Why is it wrong for
a user to call this method?


On Tue, Dec 28, 2021 at 5:15 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Dong,
>
> Could you help explain why we can not dynamically stop reading from a
>> source in batch mode?
>>
> We can but we cannot easily determine if the source is supposed to run in
> batch or streaming. A user would need to implement a special
> KafkaRecordDeserializationSchema and still provide an OffsetInitializer for
> the end offset to trigger batch mode.
>
> How are both concepts supposed to interact? Are we only stopping if any of
> the concept state that this is the end?
>
> We could ofc offer some KafkaSourceBuilde#setBounded() without parameters
> so that a user can implement a special KafkaRecordDeserializationSchema and
> notify the builder but this looks awkward to me and is quite error-prone:
> When a user uses setBounded without overwriting isEndOfStream, the
> application would never emit anything.
>
> My understanding is that when a message with the particular pattern
>> (specified by the user) is encountered, we can have the source operator
>> emit the high-watermark in such a way as if the particular partition of
>> this source has reached EOF. And this must have worked since users have
>> been using KafkaDeserializationSchema::isEndOfStream with the
>> legacy FlinkKafkaConsumer. Did I miss something here?
>>
> Yes batch mode is different from bounded streaming. [1] We can only fully
> leverage a statically bounded source by statically defining it as such with
> the FLIP-27 Source interface. [2]
>
> Hmm.. users already need to provide a KafkaRecordDeserializationSchema
>> via KafkaSourceBuilder::setDeserializer(...) today even if we don't support
>> dynamic EOF. Do you mean that if we support dynamic EOF, then it will be
>> harder for user to implement KafkaRecordDeserializationSchema?
>>
> Users mostly use the factory methods that adapt to Flink's
> DeserializationSchema. We should also offer a builder similarly to
> KafkaRecordSerializationSchemaBuilder.
>
> Regarding "how to use it from Table/SQL", support we allow user to encode
>> this dynamic EOF logic inside KafkaRecordDeserializationSchema.
>
> I'm not sure if we can/should expose dynamic EOF in SQL but at the very
> least we should properly support end offsets (as it's now possible). We
> must avoid removing the current end offsets in favor of
> KafkaRecordDeserializationSchema#isEndOfStream to enable Table/SQL to use
> bounded Kafka sources.
>
> e.g. call Collector::close() if the message content matches a
>> user-specified pattern
>>
> No, close is the completely wrong method for that. This method should have
> never been exposed to the user as it will close the network resources.
> However, we need a fully functional network stack for proper shutdown.
>
> It appears that StopCursor::shouldStop(...) takes a raw Message. While
>> user could implement the dynamic EOF logic in this method, I am worried
>> that this approach would lead to inferior performance due to double message
>> deserialization.
>>
> That is a fair point. In case of Ayush, however, it's the only way to
> determine that the pipeline should stop (you pretty much compare if the 5.
> byte in the message has changed). If you deserialize into a SpecificRecord,
> then the writer schema version is lost for isEndOfStream(T deserialized).
>
> Another concern I have for
> KafkaRecordDeserializationSchema#isEndOfStream(T) is where it is supposed
> to be called then. If it's in the RecordEmitter, we need to extend the
> RecordEmitter to support closing the split. If it's in the SplitReader, we
> probably also need double-deserialization because of FLINK-25132 (the
> record needs to be deserialized in the RecordEmitter). Maybe you can encode
> it in the SplitState but this sounds rather cumbersome if it needs to be
> done for all sources.
>
> The reason is that the user's logic will likely depend on the
>> de-serialized message (as opposed to the raw byte in the
>> org.apache.pulsar.client.api.Message.getData()). In this case, users will
>> need to deserialize the message inside StopCursor::shouldStop(...) first
>> and then the message would be de-serialized again by
>> the PulsarDeserializationSchema, which is specified via
>> the PulsarSourceBuilder::setDeserializationSchema.
>>
> As written before, this is not the case of the specific user. Having the
> raw message makes it much easier to determine a writer schema change. I'm
> sure that there are cases, where you need to look into the data though. To
> avoid double-deserialization, a better way may be to pass both the raw and
> the deserialized message to `shouldStop` but then we should move the stop
> logic to RecordEmitter as written before.
>
> Do you mean that you prefer to replace
>> KafkaSourceBuilder::setBounded(OffsetsInitializer) with something like
>> PulsarSourceBuilder::setBoundedStopCursor(StopCursor)?
>>
> Ideally, yes. But that needs to be backward compatible as it's a
> PublicEvolving interface.
>
> I agree it is cleaner to let PartitionOffsetsRetrieverImpl use adminClient
>> only without using KafkaClient. On the other hand, it seems that there is
>> no performance/correctness concern with the existing approach? Is this
>> issue related to the discussion of dynamic EOF?
>
> I just meant that a user probably only needs access to the adminClient to
> retrieve the offsets of a topic and that Kafka's PartitionOffsetsRetriever
> nicely hides the client from the user. I'm sure a user can easily mess up
> with the admin client in Pulsar (what happens if this is closed, is the
> client internally used somewhere else?).
>
>
> TL;DR
> In general, I like the separation of concerns:
> KafkaRecordDeserializationSchema is for deserializing and StopCondition (or
> however we call it) is for stopping. Then a user can reuse pre-defined
> KafkaRecordDeserializationSchema and mix in the stopping logic when needed
> (remember this is a rare case).
>
> In most cases, a user will use Avro/Json/Protobuf + schema registry, so if
> we provide out-of-the-box support for these formats, the user doesn't need
> to touch KafkaRecordDeserializationSchema at all. Then it would be nice to
> have a single interface that determines when the source stops (e.g.
> StopCondition) with pre-defined implementations (see factory methods in
> OffsetsInitializer) for Table/SQL. We could even provide a predefined
> strategy for schema changes when the schema registry is used.
>
> If you already have use-cases that relies on the deserialized data, then
> let's move the stopping logic to RecordEmitter. At this point, I'd propose
> to pass the raw and deserialized data to the StopCondition.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-BatchandStreamingUnification
>
>
> On Mon, Dec 27, 2021 at 5:01 PM Dong Lin <lindon...@gmail.com> wrote:
>
>> Hi Arvid,
>>
>> Thanks for the suggestion! Sorry for the late reply. I just finished
>> investigating the PulsarSource/StopCursor as you suggested. Please see
>> my reply inline.
>>
>> On Sun, Dec 19, 2021 at 6:53 PM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Hi Dong,
>>>
>>> I see your point. The main issue with dynamic EOF is that we can't run
>>> in batch mode. That may be desired in the case of Ayush but there may be
>>> other use cases where it's not.
>>>
>>
>> Could you help explain why we can not dynamically stop reading from a
>> source in batch mode?
>>
>> My understanding is that when a message with the particular pattern
>> (specified by the user) is encountered, we can have the source operator
>> emit the high-watermark in such a way as if the particular partition of
>> this source has reached EOF. And this must have worked since users have
>> been using KafkaDeserializationSchema::isEndOfStream with the
>> legacy FlinkKafkaConsumer. Did I miss something here?
>>
>> Additionally, it's quite a bit of code if you'd implement a
>>> KafkaRecordDeserializationSchema from scratch. There is also no obvious way
>>> on how to use it from Table/SQL.
>>>
>>
>> Hmm.. users already need to provide a KafkaRecordDeserializationSchema
>> via KafkaSourceBuilder::setDeserializer(...) today even if we don't support
>> dynamic EOF. Do you mean that if we support dynamic EOF, then it will be
>> harder for user to implement KafkaRecordDeserializationSchema?
>>
>> Regarding "how to use it from Table/SQL", support we allow user to encode
>> this dynamic EOF logic inside KafkaRecordDeserializationSchema (e.g. call
>> Collector::close() if the message content matches a user-specified
>> pattern), then effect of this change is same as if the partition has
>> reached EOF, and Table/SQL can handle this effect as they are doing now
>> without any extra change. Does this make sense?
>>
>>
>>>
>>> I think we should get inspired on how PulsarSource is solving it. They
>>> have an orthogonal interface StopCursor (we could call it StopCondition)
>>> [1]. It has some default values (I wonder if we could implement them as
>>> enums for easier Table integration).
>>>
>>
>> It appears that StopCursor::shouldStop(...) takes a raw Message. While
>> user could implement the dynamic EOF logic in this method, I am worried
>> that this approach would lead to inferior performance due to double message
>> deserialization.
>>
>> The reason is that the user's logic will likely depend on the
>> de-serialized message (as opposed to the raw byte in the
>> org.apache.pulsar.client.api.Message.getData()). In this case, users will
>> need to deserialize the message inside StopCursor::shouldStop(...) first
>> and then the message would be de-serialized again by
>> the PulsarDeserializationSchema, which is specified via
>> the PulsarSourceBuilder::setDeserializationSchema.
>>
>> In comparison, messages can be deserialized only once if we allow users
>> to specify the dynamic EOF logic inside
>> KafkaRecordDeserializationSchema/PulsarDeserializationSchema.
>>
>>
>>> Ideally, this interface would subsume OffsetsInitializer on stopping
>>> side. I think it was not wise to use OffsetsInitializer also for stop
>>> offsets as things like OffsetResetStrategy do not make any sense.
>>>
>>
>> Do you mean that you prefer to replace
>> KafkaSourceBuilder::setBounded(OffsetsInitializer) with something like
>> PulsarSourceBuilder::setBoundedStopCursor(StopCursor)?
>>
>> Without digging into detail whether this replacement is feasible, I
>> agree StopCursor seems to be cleaner than OffsetsInitializer. On the other
>> hand, if we don't plan to put the dynamic EOF logic inside StopCursor (e.g.
>> due to the double serialization issue described above), I guess it is
>> probably simpler to separate this from the discussion of the dynamic EOF?
>>
>>
>>>
>>> Compared to Pulsar, I like the PartitionOffsetsRetriever to avoid having
>>> to hand in the KafkaClient (as we do in Pulsar).
>>>
>>
>> Do you mean that you prefer to remove KafkaClient from
>> PartitionOffsetsRetrieverImpl?
>>
>> I agree it is cleaner to let PartitionOffsetsRetrieverImpl
>> use adminClient only without using KafkaClient. On the other hand, it seems
>> that there is no performance/correctness concern with the existing
>> approach? Is this issue related to the discussion of dynamic EOF?
>>
>>
>>> I hope I gave some pointers.
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java#L41-L41
>>>
>>> On Fri, Dec 17, 2021 at 9:00 AM Dong Lin <lindon...@gmail.com> wrote:
>>>
>>>> Yep,  dynamic schema change could be a good solution for the particular
>>>> use-case mentioned by Ayush.
>>>>
>>>> On the other hand, I have heard of valid use-cases where we want to
>>>> stop the job based on a control message. For example, let's say we have a
>>>> Flink job that keeps processing stock transaction data fetched from Kafka
>>>> in real time. Suppose the stock market closes at 4pm, we probably want the
>>>> Flink job to stop after it has processed all the transaction data of that
>>>> day, instead of running it for the whole day, in order to save CPU cost.
>>>>
>>>> As of Flink 1.13, users can achieve this goal by sending a special
>>>> message to the Kafka topic, and encode logic in the deserializer such that
>>>> Flink job stops when this message is observed. IMO, this seems like a
>>>> reasonable approach to support the above use-case.
>>>>
>>>> One possible approach to keep supporting this use-case in Flink 1.15 is
>>>> to allow user to signal the "end of stream" by calling
>>>> Collector::close(...) in KafkaRecordDeserializationSchema::deserialize(..).
>>>>
>>>> What do you think?
>>>>
>>>>
>>>>
>>>> On Fri, Dec 17, 2021 at 3:46 PM Arvid Heise <ar...@apache.org> wrote:
>>>>
>>>>> Wouldn't it be better to ask the Iceberg maintainers to support
>>>>> dynamic schema change?
>>>>>
>>>>> On Fri, Dec 17, 2021 at 3:03 AM Dong Lin <lindon...@gmail.com> wrote:
>>>>>
>>>>>> Hi Ayush,
>>>>>>
>>>>>> Your use-case should be supported.  Sorry, we don't have a good way
>>>>>> to support this in Flink 1.14.
>>>>>>
>>>>>> I am going to propose a FLIP to fix it in Flink 1.15.
>>>>>>
>>>>>> Thanks,
>>>>>> Dong
>>>>>>
>>>>>>
>>>>>> On Thu, Dec 9, 2021 at 7:11 PM Ayush Chauhan <
>>>>>> ayush.chau...@zomato.com> wrote:
>>>>>>
>>>>>>> My usecase is that as soon as the avro message version is changed, I
>>>>>>> want to reload the job graph so that I can update the downstream iceberg
>>>>>>> table.
>>>>>>>
>>>>>>> Iceberg FlinkSink take table schema during the job start and cannot
>>>>>>> be updated during runtime. So, I want to trigger graceful shutdown and
>>>>>>> restart the job.
>>>>>>>
>>>>>>> Can I reload the job graph to achieve that?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Dec 8, 2021 at 8:11 PM Arvid Heise <ar...@apache.org> wrote:
>>>>>>>
>>>>>>>> Hi Ayush,
>>>>>>>>
>>>>>>>> DeserializationSchema.isEndOfStream was only ever supported by
>>>>>>>> Kafka. For new Kafka source, the recommended way is to use the bounded 
>>>>>>>> mode
>>>>>>>> like this
>>>>>>>>
>>>>>>>> KafkaSource<PartitionAndValue> source =
>>>>>>>>         KafkaSource.<PartitionAndValue>builder()
>>>>>>>> ...
>>>>>>>>                 .setStartingOffsets(OffsetsInitializer.earliest())
>>>>>>>>                 .setBounded(OffsetsInitializer.latest())
>>>>>>>>                 .build();
>>>>>>>>
>>>>>>>> You can implement your own OffsetsInitializer or use a provided one.
>>>>>>>>
>>>>>>>> On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan <ruanhang1...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> There is no way to end the kafka stream from the deserializer.
>>>>>>>>>
>>>>>>>>> When would you want to end the stream? Could you explain why you
>>>>>>>>> need to end the kafka stream without using the offset?
>>>>>>>>>
>>>>>>>>> Ayush Chauhan <ayush.chau...@zomato.com> 于2021年12月8日周三 15:29写道:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger <
>>>>>>>>>> metrob...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Ayush,
>>>>>>>>>>>
>>>>>>>>>>> I couldn't find the documentation you've mentioned. Can you send
>>>>>>>>>>> me a link to it?
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan <
>>>>>>>>>>> ayush.chau...@zomato.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> Can you please let me know the alternatives of isEndOfStream()
>>>>>>>>>>>> as now according to docs this method will no longer be used to 
>>>>>>>>>>>> determine
>>>>>>>>>>>> the end of the stream.
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>>  Ayush Chauhan
>>>>>>>>>>>>  Data Platform
>>>>>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> This email is intended only for the person or the entity to
>>>>>>>>>>>> whom it is addressed. If you are not the intended recipient, 
>>>>>>>>>>>> please delete
>>>>>>>>>>>> this email and contact the sender.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>  Ayush Chauhan
>>>>>>>>>>  Data Platform
>>>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> This email is intended only for the person or the entity to whom
>>>>>>>>>> it is addressed. If you are not the intended recipient, please 
>>>>>>>>>> delete this
>>>>>>>>>> email and contact the sender.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>  Ayush Chauhan
>>>>>>>  Data Platform
>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>
>>>>>>>
>>>>>>> This email is intended only for the person or the entity to whom it
>>>>>>> is addressed. If you are not the intended recipient, please delete this
>>>>>>> email and contact the sender.
>>>>>>>
>>>>>>

Reply via email to