Yep,  dynamic schema change could be a good solution for the particular
use-case mentioned by Ayush.

On the other hand, I have heard of valid use-cases where we want to stop
the job based on a control message. For example, let's say we have a Flink
job that keeps processing stock transaction data fetched from Kafka in real
time. Suppose the stock market closes at 4pm, we probably want the Flink
job to stop after it has processed all the transaction data of that day,
instead of running it for the whole day, in order to save CPU cost.

As of Flink 1.13, users can achieve this goal by sending a special message
to the Kafka topic, and encode logic in the deserializer such that Flink
job stops when this message is observed. IMO, this seems like a reasonable
approach to support the above use-case.

One possible approach to keep supporting this use-case in Flink 1.15 is to
allow user to signal the "end of stream" by calling Collector::close(...)
in KafkaRecordDeserializationSchema::deserialize(..).

What do you think?



On Fri, Dec 17, 2021 at 3:46 PM Arvid Heise <ar...@apache.org> wrote:

> Wouldn't it be better to ask the Iceberg maintainers to support dynamic
> schema change?
>
> On Fri, Dec 17, 2021 at 3:03 AM Dong Lin <lindon...@gmail.com> wrote:
>
>> Hi Ayush,
>>
>> Your use-case should be supported.  Sorry, we don't have a good way to
>> support this in Flink 1.14.
>>
>> I am going to propose a FLIP to fix it in Flink 1.15.
>>
>> Thanks,
>> Dong
>>
>>
>> On Thu, Dec 9, 2021 at 7:11 PM Ayush Chauhan <ayush.chau...@zomato.com>
>> wrote:
>>
>>> My usecase is that as soon as the avro message version is changed, I
>>> want to reload the job graph so that I can update the downstream iceberg
>>> table.
>>>
>>> Iceberg FlinkSink take table schema during the job start and cannot be
>>> updated during runtime. So, I want to trigger graceful shutdown and restart
>>> the job.
>>>
>>> Can I reload the job graph to achieve that?
>>>
>>>
>>>
>>> On Wed, Dec 8, 2021 at 8:11 PM Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> Hi Ayush,
>>>>
>>>> DeserializationSchema.isEndOfStream was only ever supported by Kafka.
>>>> For new Kafka source, the recommended way is to use the bounded mode like
>>>> this
>>>>
>>>> KafkaSource<PartitionAndValue> source =
>>>>         KafkaSource.<PartitionAndValue>builder()
>>>> ...
>>>>                 .setStartingOffsets(OffsetsInitializer.earliest())
>>>>                 .setBounded(OffsetsInitializer.latest())
>>>>                 .build();
>>>>
>>>> You can implement your own OffsetsInitializer or use a provided one.
>>>>
>>>> On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan <ruanhang1...@gmail.com>
>>>> wrote:
>>>>
>>>>> There is no way to end the kafka stream from the deserializer.
>>>>>
>>>>> When would you want to end the stream? Could you explain why you need
>>>>> to end the kafka stream without using the offset?
>>>>>
>>>>> Ayush Chauhan <ayush.chau...@zomato.com> 于2021年12月8日周三 15:29写道:
>>>>>
>>>>>>
>>>>>> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger <metrob...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Ayush,
>>>>>>>
>>>>>>> I couldn't find the documentation you've mentioned. Can you send me
>>>>>>> a link to it?
>>>>>>>
>>>>>>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan <
>>>>>>> ayush.chau...@zomato.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Can you please let me know the alternatives of isEndOfStream() as
>>>>>>>> now according to docs this method will no longer be used to determine 
>>>>>>>> the
>>>>>>>> end of the stream.
>>>>>>>>
>>>>>>>> --
>>>>>>>>  Ayush Chauhan
>>>>>>>>  Data Platform
>>>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>>>
>>>>>>>>
>>>>>>>> This email is intended only for the person or the entity to whom it
>>>>>>>> is addressed. If you are not the intended recipient, please delete this
>>>>>>>> email and contact the sender.
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>>  Ayush Chauhan
>>>>>>  Data Platform
>>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>>
>>>>>>
>>>>>> This email is intended only for the person or the entity to whom it
>>>>>> is addressed. If you are not the intended recipient, please delete this
>>>>>> email and contact the sender.
>>>>>>
>>>>>
>>>
>>> --
>>>  Ayush Chauhan
>>>  Data Platform
>>>  [image: mobile-icon]  +91 9990747111
>>>
>>>
>>> This email is intended only for the person or the entity to whom it is
>>> addressed. If you are not the intended recipient, please delete this email
>>> and contact the sender.
>>>
>>

Reply via email to