Hi Arvid,

Thanks for the suggestion! Sorry for the late reply. I just finished
investigating the PulsarSource/StopCursor as you suggested. Please see my
reply inline.

On Sun, Dec 19, 2021 at 6:53 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Dong,
>
> I see your point. The main issue with dynamic EOF is that we can't run in
> batch mode. That may be desired in the case of Ayush but there may be other
> use cases where it's not.
>

Could you help explain why we can not dynamically stop reading from a
source in batch mode?

My understanding is that when a message with the particular pattern
(specified by the user) is encountered, we can have the source operator
emit the high-watermark in such a way as if the particular partition of
this source has reached EOF. And this must have worked since users have
been using KafkaDeserializationSchema::isEndOfStream with the
legacy FlinkKafkaConsumer. Did I miss something here?

Additionally, it's quite a bit of code if you'd implement a
> KafkaRecordDeserializationSchema from scratch. There is also no obvious way
> on how to use it from Table/SQL.
>

Hmm.. users already need to provide a KafkaRecordDeserializationSchema
via KafkaSourceBuilder::setDeserializer(...) today even if we don't support
dynamic EOF. Do you mean that if we support dynamic EOF, then it will be
harder for user to implement KafkaRecordDeserializationSchema?

Regarding "how to use it from Table/SQL", support we allow user to encode
this dynamic EOF logic inside KafkaRecordDeserializationSchema (e.g. call
Collector::close() if the message content matches a user-specified
pattern), then effect of this change is same as if the partition has
reached EOF, and Table/SQL can handle this effect as they are doing now
without any extra change. Does this make sense?


>
> I think we should get inspired on how PulsarSource is solving it. They
> have an orthogonal interface StopCursor (we could call it StopCondition)
> [1]. It has some default values (I wonder if we could implement them as
> enums for easier Table integration).
>

It appears that StopCursor::shouldStop(...) takes a raw Message. While user
could implement the dynamic EOF logic in this method, I am worried that
this approach would lead to inferior performance due to double message
deserialization.

The reason is that the user's logic will likely depend on the de-serialized
message (as opposed to the raw byte in the
org.apache.pulsar.client.api.Message.getData()). In this case, users will
need to deserialize the message inside StopCursor::shouldStop(...) first
and then the message would be de-serialized again by
the PulsarDeserializationSchema, which is specified via
the PulsarSourceBuilder::setDeserializationSchema.

In comparison, messages can be deserialized only once if we allow users to
specify the dynamic EOF logic inside
KafkaRecordDeserializationSchema/PulsarDeserializationSchema.


> Ideally, this interface would subsume OffsetsInitializer on stopping side.
> I think it was not wise to use OffsetsInitializer also for stop offsets as
> things like OffsetResetStrategy do not make any sense.
>

Do you mean that you prefer to replace
KafkaSourceBuilder::setBounded(OffsetsInitializer) with something like
PulsarSourceBuilder::setBoundedStopCursor(StopCursor)?

Without digging into detail whether this replacement is feasible, I
agree StopCursor seems to be cleaner than OffsetsInitializer. On the other
hand, if we don't plan to put the dynamic EOF logic inside StopCursor (e.g.
due to the double serialization issue described above), I guess it is
probably simpler to separate this from the discussion of the dynamic EOF?


>
> Compared to Pulsar, I like the PartitionOffsetsRetriever to avoid having
> to hand in the KafkaClient (as we do in Pulsar).
>

Do you mean that you prefer to remove KafkaClient from
PartitionOffsetsRetrieverImpl?

I agree it is cleaner to let PartitionOffsetsRetrieverImpl use adminClient
only without using KafkaClient. On the other hand, it seems that there is
no performance/correctness concern with the existing approach? Is this
issue related to the discussion of dynamic EOF?


> I hope I gave some pointers.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java#L41-L41
>
> On Fri, Dec 17, 2021 at 9:00 AM Dong Lin <lindon...@gmail.com> wrote:
>
>> Yep,  dynamic schema change could be a good solution for the particular
>> use-case mentioned by Ayush.
>>
>> On the other hand, I have heard of valid use-cases where we want to stop
>> the job based on a control message. For example, let's say we have a Flink
>> job that keeps processing stock transaction data fetched from Kafka in real
>> time. Suppose the stock market closes at 4pm, we probably want the Flink
>> job to stop after it has processed all the transaction data of that day,
>> instead of running it for the whole day, in order to save CPU cost.
>>
>> As of Flink 1.13, users can achieve this goal by sending a special
>> message to the Kafka topic, and encode logic in the deserializer such that
>> Flink job stops when this message is observed. IMO, this seems like a
>> reasonable approach to support the above use-case.
>>
>> One possible approach to keep supporting this use-case in Flink 1.15 is
>> to allow user to signal the "end of stream" by calling
>> Collector::close(...) in KafkaRecordDeserializationSchema::deserialize(..).
>>
>> What do you think?
>>
>>
>>
>> On Fri, Dec 17, 2021 at 3:46 PM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Wouldn't it be better to ask the Iceberg maintainers to support dynamic
>>> schema change?
>>>
>>> On Fri, Dec 17, 2021 at 3:03 AM Dong Lin <lindon...@gmail.com> wrote:
>>>
>>>> Hi Ayush,
>>>>
>>>> Your use-case should be supported.  Sorry, we don't have a good way to
>>>> support this in Flink 1.14.
>>>>
>>>> I am going to propose a FLIP to fix it in Flink 1.15.
>>>>
>>>> Thanks,
>>>> Dong
>>>>
>>>>
>>>> On Thu, Dec 9, 2021 at 7:11 PM Ayush Chauhan <ayush.chau...@zomato.com>
>>>> wrote:
>>>>
>>>>> My usecase is that as soon as the avro message version is changed, I
>>>>> want to reload the job graph so that I can update the downstream iceberg
>>>>> table.
>>>>>
>>>>> Iceberg FlinkSink take table schema during the job start and cannot be
>>>>> updated during runtime. So, I want to trigger graceful shutdown and 
>>>>> restart
>>>>> the job.
>>>>>
>>>>> Can I reload the job graph to achieve that?
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Dec 8, 2021 at 8:11 PM Arvid Heise <ar...@apache.org> wrote:
>>>>>
>>>>>> Hi Ayush,
>>>>>>
>>>>>> DeserializationSchema.isEndOfStream was only ever supported by Kafka.
>>>>>> For new Kafka source, the recommended way is to use the bounded mode like
>>>>>> this
>>>>>>
>>>>>> KafkaSource<PartitionAndValue> source =
>>>>>>         KafkaSource.<PartitionAndValue>builder()
>>>>>> ...
>>>>>>                 .setStartingOffsets(OffsetsInitializer.earliest())
>>>>>>                 .setBounded(OffsetsInitializer.latest())
>>>>>>                 .build();
>>>>>>
>>>>>> You can implement your own OffsetsInitializer or use a provided one.
>>>>>>
>>>>>> On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan <ruanhang1...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> There is no way to end the kafka stream from the deserializer.
>>>>>>>
>>>>>>> When would you want to end the stream? Could you explain why you
>>>>>>> need to end the kafka stream without using the offset?
>>>>>>>
>>>>>>> Ayush Chauhan <ayush.chau...@zomato.com> 于2021年12月8日周三 15:29写道:
>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger <metrob...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Ayush,
>>>>>>>>>
>>>>>>>>> I couldn't find the documentation you've mentioned. Can you send
>>>>>>>>> me a link to it?
>>>>>>>>>
>>>>>>>>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan <
>>>>>>>>> ayush.chau...@zomato.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Can you please let me know the alternatives of isEndOfStream() as
>>>>>>>>>> now according to docs this method will no longer be used to 
>>>>>>>>>> determine the
>>>>>>>>>> end of the stream.
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>  Ayush Chauhan
>>>>>>>>>>  Data Platform
>>>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> This email is intended only for the person or the entity to whom
>>>>>>>>>> it is addressed. If you are not the intended recipient, please 
>>>>>>>>>> delete this
>>>>>>>>>> email and contact the sender.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>  Ayush Chauhan
>>>>>>>>  Data Platform
>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>
>>>>>>>>
>>>>>>>> This email is intended only for the person or the entity to whom it
>>>>>>>> is addressed. If you are not the intended recipient, please delete this
>>>>>>>> email and contact the sender.
>>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>>  Ayush Chauhan
>>>>>  Data Platform
>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>
>>>>>
>>>>> This email is intended only for the person or the entity to whom it is
>>>>> addressed. If you are not the intended recipient, please delete this email
>>>>> and contact the sender.
>>>>>
>>>>

Reply via email to