Hi Dong,

Could you help explain why we can not dynamically stop reading from a
> source in batch mode?
>
We can but we cannot easily determine if the source is supposed to run in
batch or streaming. A user would need to implement a special
KafkaRecordDeserializationSchema and still provide an OffsetInitializer for
the end offset to trigger batch mode.

How are both concepts supposed to interact? Are we only stopping if any of
the concept state that this is the end?

We could ofc offer some KafkaSourceBuilde#setBounded() without parameters
so that a user can implement a special KafkaRecordDeserializationSchema and
notify the builder but this looks awkward to me and is quite error-prone:
When a user uses setBounded without overwriting isEndOfStream, the
application would never emit anything.

My understanding is that when a message with the particular pattern
> (specified by the user) is encountered, we can have the source operator
> emit the high-watermark in such a way as if the particular partition of
> this source has reached EOF. And this must have worked since users have
> been using KafkaDeserializationSchema::isEndOfStream with the
> legacy FlinkKafkaConsumer. Did I miss something here?
>
Yes batch mode is different from bounded streaming. [1] We can only fully
leverage a statically bounded source by statically defining it as such with
the FLIP-27 Source interface. [2]

Hmm.. users already need to provide a KafkaRecordDeserializationSchema
> via KafkaSourceBuilder::setDeserializer(...) today even if we don't support
> dynamic EOF. Do you mean that if we support dynamic EOF, then it will be
> harder for user to implement KafkaRecordDeserializationSchema?
>
Users mostly use the factory methods that adapt to Flink's
DeserializationSchema. We should also offer a builder similarly to
KafkaRecordSerializationSchemaBuilder.

Regarding "how to use it from Table/SQL", support we allow user to encode
> this dynamic EOF logic inside KafkaRecordDeserializationSchema.

I'm not sure if we can/should expose dynamic EOF in SQL but at the very
least we should properly support end offsets (as it's now possible). We
must avoid removing the current end offsets in favor of
KafkaRecordDeserializationSchema#isEndOfStream to enable Table/SQL to use
bounded Kafka sources.

e.g. call Collector::close() if the message content matches a
> user-specified pattern
>
No, close is the completely wrong method for that. This method should have
never been exposed to the user as it will close the network resources.
However, we need a fully functional network stack for proper shutdown.

It appears that StopCursor::shouldStop(...) takes a raw Message. While user
> could implement the dynamic EOF logic in this method, I am worried that
> this approach would lead to inferior performance due to double message
> deserialization.
>
That is a fair point. In case of Ayush, however, it's the only way to
determine that the pipeline should stop (you pretty much compare if the 5.
byte in the message has changed). If you deserialize into a SpecificRecord,
then the writer schema version is lost for isEndOfStream(T deserialized).

Another concern I have for
KafkaRecordDeserializationSchema#isEndOfStream(T) is where it is supposed
to be called then. If it's in the RecordEmitter, we need to extend the
RecordEmitter to support closing the split. If it's in the SplitReader, we
probably also need double-deserialization because of FLINK-25132 (the
record needs to be deserialized in the RecordEmitter). Maybe you can encode
it in the SplitState but this sounds rather cumbersome if it needs to be
done for all sources.

The reason is that the user's logic will likely depend on the de-serialized
> message (as opposed to the raw byte in the
> org.apache.pulsar.client.api.Message.getData()). In this case, users will
> need to deserialize the message inside StopCursor::shouldStop(...) first
> and then the message would be de-serialized again by
> the PulsarDeserializationSchema, which is specified via
> the PulsarSourceBuilder::setDeserializationSchema.
>
As written before, this is not the case of the specific user. Having the
raw message makes it much easier to determine a writer schema change. I'm
sure that there are cases, where you need to look into the data though. To
avoid double-deserialization, a better way may be to pass both the raw and
the deserialized message to `shouldStop` but then we should move the stop
logic to RecordEmitter as written before.

Do you mean that you prefer to replace
> KafkaSourceBuilder::setBounded(OffsetsInitializer) with something like
> PulsarSourceBuilder::setBoundedStopCursor(StopCursor)?
>
Ideally, yes. But that needs to be backward compatible as it's a
PublicEvolving interface.

I agree it is cleaner to let PartitionOffsetsRetrieverImpl use adminClient
> only without using KafkaClient. On the other hand, it seems that there is
> no performance/correctness concern with the existing approach? Is this
> issue related to the discussion of dynamic EOF?

I just meant that a user probably only needs access to the adminClient to
retrieve the offsets of a topic and that Kafka's PartitionOffsetsRetriever
nicely hides the client from the user. I'm sure a user can easily mess up
with the admin client in Pulsar (what happens if this is closed, is the
client internally used somewhere else?).


TL;DR
In general, I like the separation of concerns:
KafkaRecordDeserializationSchema is for deserializing and StopCondition (or
however we call it) is for stopping. Then a user can reuse pre-defined
KafkaRecordDeserializationSchema and mix in the stopping logic when needed
(remember this is a rare case).

In most cases, a user will use Avro/Json/Protobuf + schema registry, so if
we provide out-of-the-box support for these formats, the user doesn't need
to touch KafkaRecordDeserializationSchema at all. Then it would be nice to
have a single interface that determines when the source stops (e.g.
StopCondition) with pre-defined implementations (see factory methods in
OffsetsInitializer) for Table/SQL. We could even provide a predefined
strategy for schema changes when the schema registry is used.

If you already have use-cases that relies on the deserialized data, then
let's move the stopping logic to RecordEmitter. At this point, I'd propose
to pass the raw and deserialized data to the StopCondition.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-BatchandStreamingUnification


On Mon, Dec 27, 2021 at 5:01 PM Dong Lin <lindon...@gmail.com> wrote:

> Hi Arvid,
>
> Thanks for the suggestion! Sorry for the late reply. I just finished
> investigating the PulsarSource/StopCursor as you suggested. Please see my
> reply inline.
>
> On Sun, Dec 19, 2021 at 6:53 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Dong,
>>
>> I see your point. The main issue with dynamic EOF is that we can't run in
>> batch mode. That may be desired in the case of Ayush but there may be other
>> use cases where it's not.
>>
>
> Could you help explain why we can not dynamically stop reading from a
> source in batch mode?
>
> My understanding is that when a message with the particular pattern
> (specified by the user) is encountered, we can have the source operator
> emit the high-watermark in such a way as if the particular partition of
> this source has reached EOF. And this must have worked since users have
> been using KafkaDeserializationSchema::isEndOfStream with the
> legacy FlinkKafkaConsumer. Did I miss something here?
>
> Additionally, it's quite a bit of code if you'd implement a
>> KafkaRecordDeserializationSchema from scratch. There is also no obvious way
>> on how to use it from Table/SQL.
>>
>
> Hmm.. users already need to provide a KafkaRecordDeserializationSchema
> via KafkaSourceBuilder::setDeserializer(...) today even if we don't support
> dynamic EOF. Do you mean that if we support dynamic EOF, then it will be
> harder for user to implement KafkaRecordDeserializationSchema?
>
> Regarding "how to use it from Table/SQL", support we allow user to encode
> this dynamic EOF logic inside KafkaRecordDeserializationSchema (e.g. call
> Collector::close() if the message content matches a user-specified
> pattern), then effect of this change is same as if the partition has
> reached EOF, and Table/SQL can handle this effect as they are doing now
> without any extra change. Does this make sense?
>
>
>>
>> I think we should get inspired on how PulsarSource is solving it. They
>> have an orthogonal interface StopCursor (we could call it StopCondition)
>> [1]. It has some default values (I wonder if we could implement them as
>> enums for easier Table integration).
>>
>
> It appears that StopCursor::shouldStop(...) takes a raw Message. While
> user could implement the dynamic EOF logic in this method, I am worried
> that this approach would lead to inferior performance due to double message
> deserialization.
>
> The reason is that the user's logic will likely depend on the
> de-serialized message (as opposed to the raw byte in the
> org.apache.pulsar.client.api.Message.getData()). In this case, users will
> need to deserialize the message inside StopCursor::shouldStop(...) first
> and then the message would be de-serialized again by
> the PulsarDeserializationSchema, which is specified via
> the PulsarSourceBuilder::setDeserializationSchema.
>
> In comparison, messages can be deserialized only once if we allow users to
> specify the dynamic EOF logic inside
> KafkaRecordDeserializationSchema/PulsarDeserializationSchema.
>
>
>> Ideally, this interface would subsume OffsetsInitializer on stopping
>> side. I think it was not wise to use OffsetsInitializer also for stop
>> offsets as things like OffsetResetStrategy do not make any sense.
>>
>
> Do you mean that you prefer to replace
> KafkaSourceBuilder::setBounded(OffsetsInitializer) with something like
> PulsarSourceBuilder::setBoundedStopCursor(StopCursor)?
>
> Without digging into detail whether this replacement is feasible, I
> agree StopCursor seems to be cleaner than OffsetsInitializer. On the other
> hand, if we don't plan to put the dynamic EOF logic inside StopCursor (e.g.
> due to the double serialization issue described above), I guess it is
> probably simpler to separate this from the discussion of the dynamic EOF?
>
>
>>
>> Compared to Pulsar, I like the PartitionOffsetsRetriever to avoid having
>> to hand in the KafkaClient (as we do in Pulsar).
>>
>
> Do you mean that you prefer to remove KafkaClient from
> PartitionOffsetsRetrieverImpl?
>
> I agree it is cleaner to let PartitionOffsetsRetrieverImpl use adminClient
> only without using KafkaClient. On the other hand, it seems that there is
> no performance/correctness concern with the existing approach? Is this
> issue related to the discussion of dynamic EOF?
>
>
>> I hope I gave some pointers.
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java#L41-L41
>>
>> On Fri, Dec 17, 2021 at 9:00 AM Dong Lin <lindon...@gmail.com> wrote:
>>
>>> Yep,  dynamic schema change could be a good solution for the particular
>>> use-case mentioned by Ayush.
>>>
>>> On the other hand, I have heard of valid use-cases where we want to stop
>>> the job based on a control message. For example, let's say we have a Flink
>>> job that keeps processing stock transaction data fetched from Kafka in real
>>> time. Suppose the stock market closes at 4pm, we probably want the Flink
>>> job to stop after it has processed all the transaction data of that day,
>>> instead of running it for the whole day, in order to save CPU cost.
>>>
>>> As of Flink 1.13, users can achieve this goal by sending a special
>>> message to the Kafka topic, and encode logic in the deserializer such that
>>> Flink job stops when this message is observed. IMO, this seems like a
>>> reasonable approach to support the above use-case.
>>>
>>> One possible approach to keep supporting this use-case in Flink 1.15 is
>>> to allow user to signal the "end of stream" by calling
>>> Collector::close(...) in KafkaRecordDeserializationSchema::deserialize(..).
>>>
>>> What do you think?
>>>
>>>
>>>
>>> On Fri, Dec 17, 2021 at 3:46 PM Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> Wouldn't it be better to ask the Iceberg maintainers to support dynamic
>>>> schema change?
>>>>
>>>> On Fri, Dec 17, 2021 at 3:03 AM Dong Lin <lindon...@gmail.com> wrote:
>>>>
>>>>> Hi Ayush,
>>>>>
>>>>> Your use-case should be supported.  Sorry, we don't have a good way to
>>>>> support this in Flink 1.14.
>>>>>
>>>>> I am going to propose a FLIP to fix it in Flink 1.15.
>>>>>
>>>>> Thanks,
>>>>> Dong
>>>>>
>>>>>
>>>>> On Thu, Dec 9, 2021 at 7:11 PM Ayush Chauhan <ayush.chau...@zomato.com>
>>>>> wrote:
>>>>>
>>>>>> My usecase is that as soon as the avro message version is changed, I
>>>>>> want to reload the job graph so that I can update the downstream iceberg
>>>>>> table.
>>>>>>
>>>>>> Iceberg FlinkSink take table schema during the job start and cannot
>>>>>> be updated during runtime. So, I want to trigger graceful shutdown and
>>>>>> restart the job.
>>>>>>
>>>>>> Can I reload the job graph to achieve that?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Dec 8, 2021 at 8:11 PM Arvid Heise <ar...@apache.org> wrote:
>>>>>>
>>>>>>> Hi Ayush,
>>>>>>>
>>>>>>> DeserializationSchema.isEndOfStream was only ever supported by
>>>>>>> Kafka. For new Kafka source, the recommended way is to use the bounded 
>>>>>>> mode
>>>>>>> like this
>>>>>>>
>>>>>>> KafkaSource<PartitionAndValue> source =
>>>>>>>         KafkaSource.<PartitionAndValue>builder()
>>>>>>> ...
>>>>>>>                 .setStartingOffsets(OffsetsInitializer.earliest())
>>>>>>>                 .setBounded(OffsetsInitializer.latest())
>>>>>>>                 .build();
>>>>>>>
>>>>>>> You can implement your own OffsetsInitializer or use a provided one.
>>>>>>>
>>>>>>> On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan <ruanhang1...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> There is no way to end the kafka stream from the deserializer.
>>>>>>>>
>>>>>>>> When would you want to end the stream? Could you explain why you
>>>>>>>> need to end the kafka stream without using the offset?
>>>>>>>>
>>>>>>>> Ayush Chauhan <ayush.chau...@zomato.com> 于2021年12月8日周三 15:29写道:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger <
>>>>>>>>> metrob...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Ayush,
>>>>>>>>>>
>>>>>>>>>> I couldn't find the documentation you've mentioned. Can you send
>>>>>>>>>> me a link to it?
>>>>>>>>>>
>>>>>>>>>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan <
>>>>>>>>>> ayush.chau...@zomato.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Can you please let me know the alternatives of isEndOfStream()
>>>>>>>>>>> as now according to docs this method will no longer be used to 
>>>>>>>>>>> determine
>>>>>>>>>>> the end of the stream.
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>  Ayush Chauhan
>>>>>>>>>>>  Data Platform
>>>>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> This email is intended only for the person or the entity to whom
>>>>>>>>>>> it is addressed. If you are not the intended recipient, please 
>>>>>>>>>>> delete this
>>>>>>>>>>> email and contact the sender.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>  Ayush Chauhan
>>>>>>>>>  Data Platform
>>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This email is intended only for the person or the entity to whom
>>>>>>>>> it is addressed. If you are not the intended recipient, please delete 
>>>>>>>>> this
>>>>>>>>> email and contact the sender.
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>> --
>>>>>>  Ayush Chauhan
>>>>>>  Data Platform
>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>
>>>>>>
>>>>>> This email is intended only for the person or the entity to whom it
>>>>>> is addressed. If you are not the intended recipient, please delete this
>>>>>> email and contact the sender.
>>>>>>
>>>>>

Reply via email to