Re: java.lang.Object serialization by Kryo

2024-11-08 Thread Kirill Ternovsky

Hello,

I've had good luck implementing `Value` to have Flink (1.19, not sure 
about earlier versions) use my custom serialization code for a 
particular type. More details in the docs here: 
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#values


If you're stuck pushing around `java.lang.Object`, you can still wrap it 
in another type that implements `Value`.


Best,

Kirill

On 11/8/24 12:01 AM, Kamal Mittal via user wrote:


Hello,

java.lang.Object is serialized by Kryo as it is taken as generic data 
type by Flink.


Is there any way to switch it to some other serializer for performance 
improvement or not? Also can you please give some documentation for 
reference around it?


I profiled my application and then found that Kryo is being used, I 
tried to disable it by disableGenericTypes() and application breaks at 
start-up with error “Object type treated as generic type”.


Rgds,

Kamal


RE: [External] Re: Flink table materialization

2024-11-07 Thread Schwalbe Matthias
Hi Jacob,

It’s a little bit of guesswork …

The disappearing records remind me a bit of a peculiarity of Oracle, that each 
(e.g. INSERT) statement is in an implicit transaction and hence needs to be 
committed.
In Flink committing transaction happen together with the checkpoint cycle, i.e. 
this needs to be setup properly for your job.
I work mostly with streaming API not table API, but I guess there the matter is 
just the same.

For the database in use (instead of ‘default’), I think you can specify this in 
your JDBC connection string.

Hope that helps 😊

Thias



From: Jacob Rollings 
Sent: Friday, November 8, 2024 7:39 AM
To: user@flink.apache.org
Subject: [External] Re: Flink table materialization

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


After correcting the properties of the connector, now iam getting the error
 in the screenshot

I have also attached the jars in the class path while launching flink sql cli.

Full description of the usecase is in the first email in this loop.

On Thu, Nov 7, 2024, 11:38 PM Jacob Rollings 
mailto:jacobrolling...@gmail.com>> wrote:
Added attachment of the error message.

On Thu, Nov 7, 2024, 11:12 PM Jacob Rollings 
mailto:jacobrolling...@gmail.com>> wrote:
Hi,

I want to make the tables created by Flink Table API/SQL durable and permanent. 
To achieve this, I am trying the following basic example using the JDBC Oracle 
connector. I have added both the Flink JDBC and Oracle JDBC drivers to the 
Flink lib directory. I am using the Flink SQL client to run the queries. While 
it successfully creates tables in the default-database, I don't see the actual 
tables in the Oracle database. Moreover, the tables created under the Flink 
default-database seem to exist only as long as the session is active.

What steps should I take to ensure that the in-memory tables I work with during 
my Flink job are permanently stored in the database?

The documentation mentions using Catalogs and Connectors to persist in-memory 
tables to a database. I want to store my tables and data in Oracle DB. However, 
I noticed that Flink supports only Hive, PostgreSQL, and MySQL catalogs. Does 
this mean my data will reside in Oracle while the metadata about the tables can 
be stored only in a Hive, PostgreSQL, or MySQL metastore?

The documentation on this topic seems to cover only basic concepts and lacks 
complete examples on how to achieve this. Any pointers or detailed guidance 
would be greatly appreciated.

Thanks.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Re: "Not all slot managed memory is freed"

2024-11-06 Thread Jad Naous
Thanks, Gabor! We switched back to the hashmap state backend for now. We'll
troubleshoot if we go back to rocksdb.
Jad


On Wed, Nov 6, 2024 at 6:51 AM Gabor Somogyi 
wrote:

> Hi Jad,
>
> There is no low hanging fruit here if you really want to find this out.
> Such case the memory manager tries to allocate and deallocate the total
> memory which is prepared for.
> When not all the memory is available then it's not going to be successful
> and you see the mentioned exception.
>
> I would suggest the following:
> * Presume this is already fulfilled but double check that Java 8u72+ used
> (as the exception message writes)
> * Create a custom Flink image with additional log entries
> in UnsafeMemoryBudget
> * Remote debug with a breakpoint in UnsafeMemoryBudget
>
> All in all one must find out why the available memory is not equals to the
> total memory in the memory manager.
>
> G
>
>
> On Tue, Oct 29, 2024 at 6:03 PM Jad Naous  wrote:
>
>> Hi Flink Community,
>> I'd really appreciate your help. We're trying to switch from using the
>> heap state backend to rocksdb, and have been encountering a warning "Not
>> all slot managed memory is freed at TaskSlot..." when the pipeline
>> restarts. Any pointers to troubleshoot this issue?
>> Many thanks!
>> Jad.
>>
>


Re: "Not all slot managed memory is freed"

2024-11-06 Thread Gabor Somogyi
Hi Jad,

There is no low hanging fruit here if you really want to find this out.
Such case the memory manager tries to allocate and deallocate the total
memory which is prepared for.
When not all the memory is available then it's not going to be successful
and you see the mentioned exception.

I would suggest the following:
* Presume this is already fulfilled but double check that Java 8u72+ used
(as the exception message writes)
* Create a custom Flink image with additional log entries
in UnsafeMemoryBudget
* Remote debug with a breakpoint in UnsafeMemoryBudget

All in all one must find out why the available memory is not equals to the
total memory in the memory manager.

G


On Tue, Oct 29, 2024 at 6:03 PM Jad Naous  wrote:

> Hi Flink Community,
> I'd really appreciate your help. We're trying to switch from using the
> heap state backend to rocksdb, and have been encountering a warning "Not
> all slot managed memory is freed at TaskSlot..." when the pipeline
> restarts. Any pointers to troubleshoot this issue?
> Many thanks!
> Jad.
>


Re: [ANNOUNCE] Apache Flink 2.0 Preview released

2024-11-06 Thread Benoit Tailhades
Clear, thank you Zakelly

Le mer. 6 nov. 2024 à 09:16, Zakelly Lan  a écrit :

> Hi Benoit,
>
> Please find the result here[1].
>
> The Nexmark repo[2] does not officially support the flink 2.0 preview
> version. However, we have made a PR[3] for this and once it is merged, we
> will offer a guide to run Nexmark Q20 with disaggregated state management.
>
>
> [1] https://github.com/ververica/ForSt/releases/tag/v0.1.2-beta
> [2] https://github.com/nexmark/nexmark
> [3] https://github.com/nexmark/nexmark/pull/62
>
>
> Best,
> Zakelly
>
>
> On Wed, Nov 6, 2024 at 12:12 AM Benoit Tailhades <
> benoit.tailha...@gmail.com> wrote:
>
>> Hello,
>>
>> Release note is talking about a complete end-to-end trial using Nexmark.
>> Where could this be found ?
>>
>> Thank you.
>>
>> Le lun. 4 nov. 2024 à 02:48, Enric Ott <243816...@qq.com> a écrit :
>>
>>> Hello,Community:
>>>   Is there a compelete benchmark for Apache Flink 2.0 Preview?
>>>   Thanks.
>>>
>>>
>>> -- 原始邮件 --
>>> *发件人:* "Enric Ott" <243816...@qq.com>;
>>> *发送时间:* 2024年10月23日(星期三) 晚上6:03
>>> *收件人:* "Xintong Song";"dev">> >;"user";"user-zh">> >;"announce";
>>> *主题:* 回复:[ANNOUNCE] Apache Flink 2.0 Preview released
>>>
>>> How to import the source code(from github) to Intelligent Idea,seems
>>> that a project descriptor is missing.
>>>
>>>
>>> -- 原始邮件 --
>>> *发件人:* "Xintong Song" ;
>>> *发送时间:* 2024年10月23日(星期三) 下午5:26
>>> *收件人:* "dev";"user">> >;"user-zh";"announce";
>>> *主题:* [ANNOUNCE] Apache Flink 2.0 Preview released
>>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 2.0 Preview.
>>>
>>> Apache Flink® is an open-source unified stream and batch data processing
>>> framework for distributed, high-performing, always-available, and accurate
>>> data applications.
>>>
>>> This release is a preview of the upcoming Flink 2.0 release. The purpose
>>> is to facilitate early adaptation to the breaking changes for our users and
>>> partner projects (e.g., connectors), and to offer a sneak peek into the
>>> exciting new features while gathering feedback.
>>>
>>> Note: Flink 2.0 Preview is not a stable release and should not be used
>>> in production environments.
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Please checkout the release blog post for an overview of this release:
>>> https://flink.apache.org/2024/10/23/preview-release-of-apache-flink-2.0/
>>>
>>> The full release notes are available in jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12355070
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>>
>>> Best,
>>>
>>> Becket, Jark, Martijn and Xintong
>>>
>>>


Re: [DISCUSS] Is it a bug that the AdaptiveScheduler does not prioritize releasing TaskManagers during downscaling in Application mode?

2024-11-06 Thread Rui Fan
Thanks Yuepeng for the PR and starting this discussion!

And thanks Gyula and Yuanfeng for the input!

I also agree to fix this behaviour in the 1.x line.

The adaptive scheduler and rescaling API provide powerful capabilities to
increase or decrease parallelism.

The main benefit I understand of decreasing parallelism is saving resources.
If decreasing parallelism can't save resources, why do users decrease it?
This is why I think releasing TM resources when decreasing parallelism is
a basic capability that the Adaptive Scheduler should have.

Please correct me if I miss anything, thanks~

Also, I believe it does not work as the user expects. Because this
behaviour
was reported multiple times in the flink community, such as:
FLINK-33977[1],
FLINK-35594[2], FLINK-35903[3] and Slack channel[4].
And 1.20.x is a LTS version, so I agree to fix it in the 1.x line.

[1] https://issues.apache.org/jira/browse/FLINK-33977
[2] https://issues.apache.org/jira/browse/FLINK-35594
[3] https://issues.apache.org/jira/browse/FLINK-35903
[4] https://apache-flink.slack.com/archives/C03G7LJTS2G/p1729167222445569

Best,
Rui

On Wed, Nov 6, 2024 at 4:15 PM yuanfeng hu  wrote:

> > Is it considered an error if the adaptive scheduler fails to release the
> task manager during scaling?
>
> +1 . When we enable adaptive mode and perform scaling operations on tasks,
> a significant part of the goal is to reduce resource usage for the tasks.
> However, due to some logic in the adaptive scheduler's scheduling process,
> the task manager cannot be released, and the ultimate goal cannot be
> achieved. Therefore, I consider this to be a mistake.
>
> Additionally, many tasks are currently running in this mode and will
> continue to run for quite a long time (many users are in this situation).
> So whether or not it is considered a bug, I believe we need to fix it in
> the 1.x version.
>
> Yuepeng Pan  于2024年11月6日周三 14:32写道:
>
> > Hi, community.
> >
> >
> >
> >
> > When working on ticket[1] we have received some lively discussions and
> > valuable
> > feedback[2](thanks for Matthias, Rui, Gyula, Maximilian, Tison, etc.),
> the
> > main issues are that:
> >
> > When the job runs in an application cluster, could the default behavior
> of
> > AdaptiveScheduler not actively releasing Taskmanagers resources during
> > downscaling be considered a bug?
> >
> > If so,should we fix it in flink 1.x?
> >
> >
> >
> > I’d like to start a discussion to hear more comments about it to define
> > the next step and I have sorted out some information in the doc[3]
> > regarding this discussion for you.
> >
> >
> >
> > Looking forward to your comments and attention.
> >
> > Thank you.
> >
> > Best,
> > Yuepeng Pan
> >
> >
> >
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-33977
> >
> > [2] https://github.com/apache/flink/pull/25218#issuecomment-2401913141
> >
> > [3]
> >
> https://docs.google.com/document/d/1Rwwl2aGVz9g5kUJFMP5GMlJwzEO_a-eo4gPf7gITpdw/edit?tab=t.0#heading=h.s4i4hehbbli5
> >
> >
> >
>
> --
> Best,
> Yuanfeng
>


Re: [ANNOUNCE] Apache Flink 2.0 Preview released

2024-11-06 Thread Zakelly Lan
Hi Benoit,

Please find the result here[1].

The Nexmark repo[2] does not officially support the flink 2.0 preview
version. However, we have made a PR[3] for this and once it is merged, we
will offer a guide to run Nexmark Q20 with disaggregated state management.


[1] https://github.com/ververica/ForSt/releases/tag/v0.1.2-beta
[2] https://github.com/nexmark/nexmark
[3] https://github.com/nexmark/nexmark/pull/62


Best,
Zakelly


On Wed, Nov 6, 2024 at 12:12 AM Benoit Tailhades 
wrote:

> Hello,
>
> Release note is talking about a complete end-to-end trial using Nexmark.
> Where could this be found ?
>
> Thank you.
>
> Le lun. 4 nov. 2024 à 02:48, Enric Ott <243816...@qq.com> a écrit :
>
>> Hello,Community:
>>   Is there a compelete benchmark for Apache Flink 2.0 Preview?
>>   Thanks.
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Enric Ott" <243816...@qq.com>;
>> *发送时间:* 2024年10月23日(星期三) 晚上6:03
>> *收件人:* "Xintong Song";"dev"> >;"user";"user-zh"> >;"announce";
>> *主题:* 回复:[ANNOUNCE] Apache Flink 2.0 Preview released
>>
>> How to import the source code(from github) to Intelligent Idea,seems that
>> a project descriptor is missing.
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Xintong Song" ;
>> *发送时间:* 2024年10月23日(星期三) 下午5:26
>> *收件人:* "dev";"user"> >;"user-zh";"announce";
>> *主题:* [ANNOUNCE] Apache Flink 2.0 Preview released
>>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 2.0 Preview.
>>
>> Apache Flink® is an open-source unified stream and batch data processing
>> framework for distributed, high-performing, always-available, and accurate
>> data applications.
>>
>> This release is a preview of the upcoming Flink 2.0 release. The purpose
>> is to facilitate early adaptation to the breaking changes for our users and
>> partner projects (e.g., connectors), and to offer a sneak peek into the
>> exciting new features while gathering feedback.
>>
>> Note: Flink 2.0 Preview is not a stable release and should not be used in
>> production environments.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please checkout the release blog post for an overview of this release:
>> https://flink.apache.org/2024/10/23/preview-release-of-apache-flink-2.0/
>>
>> The full release notes are available in jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12355070
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Best,
>>
>> Becket, Jark, Martijn and Xintong
>>
>>


Re: [DISCUSS] Is it a bug that the AdaptiveScheduler does not prioritize releasing TaskManagers during downscaling in Application mode?

2024-11-06 Thread yuanfeng hu
> Is it considered an error if the adaptive scheduler fails to release the
task manager during scaling?

+1 . When we enable adaptive mode and perform scaling operations on tasks,
a significant part of the goal is to reduce resource usage for the tasks.
However, due to some logic in the adaptive scheduler's scheduling process,
the task manager cannot be released, and the ultimate goal cannot be
achieved. Therefore, I consider this to be a mistake.

Additionally, many tasks are currently running in this mode and will
continue to run for quite a long time (many users are in this situation).
So whether or not it is considered a bug, I believe we need to fix it in
the 1.x version.

Yuepeng Pan  于2024年11月6日周三 14:32写道:

> Hi, community.
>
>
>
>
> When working on ticket[1] we have received some lively discussions and
> valuable
> feedback[2](thanks for Matthias, Rui, Gyula, Maximilian, Tison, etc.), the
> main issues are that:
>
> When the job runs in an application cluster, could the default behavior of
> AdaptiveScheduler not actively releasing Taskmanagers resources during
> downscaling be considered a bug?
>
> If so,should we fix it in flink 1.x?
>
>
>
> I’d like to start a discussion to hear more comments about it to define
> the next step and I have sorted out some information in the doc[3]
> regarding this discussion for you.
>
>
>
> Looking forward to your comments and attention.
>
> Thank you.
>
> Best,
> Yuepeng Pan
>
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-33977
>
> [2] https://github.com/apache/flink/pull/25218#issuecomment-2401913141
>
> [3]
> https://docs.google.com/document/d/1Rwwl2aGVz9g5kUJFMP5GMlJwzEO_a-eo4gPf7gITpdw/edit?tab=t.0#heading=h.s4i4hehbbli5
>
>
>

-- 
Best,
Yuanfeng


Re: [DISCUSS] Is it a bug that the AdaptiveScheduler does not prioritize releasing TaskManagers during downscaling in Application mode?

2024-11-05 Thread Gyula Fóra
Hey All!

The main purpose of the adaptive scheduler is to be able to adapt to
changing resource availability and requirements.
Originally it was designed to work based on resource availability (with
reactive scaling) so when we have more resources we scale up, if we have
less scale down, at that point the question of releasing TMs was not a
problem.

Now with the resource requirements api added somewhat recently the adaptive
scheduler actually adapts to job resource requirements too. This works well
when increasing the requirements, however when decreasing the requirements
the TMs are not released in many cases as highlighted in this thread.

I don't necessarily want to label it as a bug but I feel that the behaviour
definitely does not align with the original intention of the adaptive
scheduler and how it works in other cases.

I think we should definitely fix this in 1.X line too given how important
this feature is, and this behaviour can actually cause a lot of problems
for some users. 1.X will be around for quite some time still.

Cheers,
Gyula

On Wed, Nov 6, 2024 at 7:32 AM Yuepeng Pan  wrote:

> Hi, community.
>
>
> When working on ticket[1] we have received some lively discussions and
> valuable
> feedback[2](thanks for Matthias, Rui, Gyula, Maximilian, Tison, etc.), the
> main issues are that:
>
>1.
>
>When the job runs in an application cluster, could the default
>behavior of AdaptiveScheduler not actively releasing Taskmanagers
>resources during downscaling be considered a bug?
>2.
>
>If so,should we fix it in flink 1.x?
>
>
> I’d like to start a discussion to hear more comments about it to define
> the next step and I have sorted out some information in the doc[3]
> regarding this discussion for you.
>
> Looking forward to your comments and attention.
>
> Thank you.
>
> Best,
> Yuepeng Pan
>
> [1] https://issues.apache.org/jira/browse/FLINK-33977
>
> [2] https://github.com/apache/flink/pull/25218#issuecomment-2401913141
> [3]
> https://docs.google.com/document/d/1Rwwl2aGVz9g5kUJFMP5GMlJwzEO_a-eo4gPf7gITpdw/edit?tab=t.0#heading=h.s4i4hehbbli5
>
>
>


Re: [ANNOUNCE] Apache Flink 2.0 Preview released

2024-11-05 Thread Benoit Tailhades
Hello,

Release note is talking about a complete end-to-end trial using Nexmark.
Where could this be found ?

Thank you.

Le lun. 4 nov. 2024 à 02:48, Enric Ott <243816...@qq.com> a écrit :

> Hello,Community:
>   Is there a compelete benchmark for Apache Flink 2.0 Preview?
>   Thanks.
>
>
> -- 原始邮件 --
> *发件人:* "Enric Ott" <243816...@qq.com>;
> *发送时间:* 2024年10月23日(星期三) 晚上6:03
> *收件人:* "Xintong Song";"dev" >;"user";"user-zh" >;"announce";
> *主题:* 回复:[ANNOUNCE] Apache Flink 2.0 Preview released
>
> How to import the source code(from github) to Intelligent Idea,seems that
> a project descriptor is missing.
>
>
> -- 原始邮件 --
> *发件人:* "Xintong Song" ;
> *发送时间:* 2024年10月23日(星期三) 下午5:26
> *收件人:* "dev";"user" >;"user-zh";"announce";
> *主题:* [ANNOUNCE] Apache Flink 2.0 Preview released
>
> The Apache Flink community is very happy to announce the release of Apache
> Flink 2.0 Preview.
>
> Apache Flink® is an open-source unified stream and batch data processing
> framework for distributed, high-performing, always-available, and accurate
> data applications.
>
> This release is a preview of the upcoming Flink 2.0 release. The purpose
> is to facilitate early adaptation to the breaking changes for our users and
> partner projects (e.g., connectors), and to offer a sneak peek into the
> exciting new features while gathering feedback.
>
> Note: Flink 2.0 Preview is not a stable release and should not be used in
> production environments.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please checkout the release blog post for an overview of this release:
> https://flink.apache.org/2024/10/23/preview-release-of-apache-flink-2.0/
>
> The full release notes are available in jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12355070
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Best,
>
> Becket, Jark, Martijn and Xintong
>
>


Re: Re:FlinkSQL Hints - Boradcast table.

2024-11-04 Thread Shengkai Fang
I think you may need to dump the upsert-kafka data to some storage that
accepts cdc data, e.g. paimon or hudi. Then look up the data in these data
lake storage. But Flink SQL doesn't support event time lookup join.

Best,
Shengkai


Re: Re:FlinkSQL Hints - Boradcast table.

2024-11-04 Thread Guillermo Ortiz Fernández
We are trying to migrate a kafka streams applications to FlinkSql. Kafka
Streams app uses GKTables to avoid shuffles for the lookup tables. Is there
any option to Flink?

El lun, 4 nov 2024 a las 11:27, Guillermo Ortiz Fernández (<
guillermo.ortiz.f...@gmail.com>) escribió:

> The small table use upsert-kafka and doesn't support lookup table, do you
> know another possibility? Thanks.
>
> El lun, 4 nov 2024 a las 11:02, Xuyang () escribió:
>
>> Additionally, does the lookup table with CACHE[1][2] meet your needs? If
>> so, you might need to use or implement a dimension table connector with
>> cache.
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-28415
>>
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/jdbc/#lookup-cache
>>
>>
>> --
>> Best!
>> Xuyang
>>
>>
>> 在 2024-11-04 17:54:36,"Xuyang"  写道:
>>
>> Hi,
>>
>>The BROADCAST[1] join hint currently applies only to batch mode.
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#broadcast
>> [1]
>>
>>
>> --
>> Best!
>> Xuyang
>>
>>
>> At 2024-11-04 17:06:59, "Guillermo Ortiz Fernández" <
>> guillermo.ortiz.f...@gmail.com> wrote:
>>
>> Hi,
>>
>> I'm running a simple query that joins two tables, where one table is much
>> larger than the other, with the second table being very small. I believe it
>> would be optimal to use a broadcast on the second table for the join. All
>> my tests are being done locally, with very little data in either table.
>> When I apply the hint to perform the broadcast and check the execution
>> plan, I see that, whether the hint is present or not, the join is done
>> using a hash shuffle. Does the hint not enforce the broadcast? Could it be
>> because I’m running it locally or because the tables contain very few
>> records?
>> I'm executing all test from FlinkSQL and sql-client.
>>
>>
>> EXPLAIN  PLAN FOR
>>  SELECT /*+ BROADCAST(smalltable) */
>> bigtable.eventTimestamp,
>> bigtable.field1,
>> 
>> smalltable.technology
>> FROM bigtable
>> JOIN smalltable FOR SYSTEM_TIME AS OF EventTimestampLtz
>> ON bigtable.cgi = smalltable.cgi;
>>
>>
>>


Re: Re:FlinkSQL Hints - Boradcast table.

2024-11-04 Thread Guillermo Ortiz Fernández
The small table use upsert-kafka and doesn't support lookup table, do you
know another possibility? Thanks.

El lun, 4 nov 2024 a las 11:02, Xuyang () escribió:

> Additionally, does the lookup table with CACHE[1][2] meet your needs? If
> so, you might need to use or implement a dimension table connector with
> cache.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-28415
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/jdbc/#lookup-cache
>
>
> --
> Best!
> Xuyang
>
>
> 在 2024-11-04 17:54:36,"Xuyang"  写道:
>
> Hi,
>
>The BROADCAST[1] join hint currently applies only to batch mode.
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#broadcast
> [1]
>
>
> --
> Best!
> Xuyang
>
>
> At 2024-11-04 17:06:59, "Guillermo Ortiz Fernández" <
> guillermo.ortiz.f...@gmail.com> wrote:
>
> Hi,
>
> I'm running a simple query that joins two tables, where one table is much
> larger than the other, with the second table being very small. I believe it
> would be optimal to use a broadcast on the second table for the join. All
> my tests are being done locally, with very little data in either table.
> When I apply the hint to perform the broadcast and check the execution
> plan, I see that, whether the hint is present or not, the join is done
> using a hash shuffle. Does the hint not enforce the broadcast? Could it be
> because I’m running it locally or because the tables contain very few
> records?
> I'm executing all test from FlinkSQL and sql-client.
>
>
> EXPLAIN  PLAN FOR
>  SELECT /*+ BROADCAST(smalltable) */
> bigtable.eventTimestamp,
> bigtable.field1,
> 
> smalltable.technology
> FROM bigtable
> JOIN smalltable FOR SYSTEM_TIME AS OF EventTimestampLtz
> ON bigtable.cgi = smalltable.cgi;
>
>
>


Re: Critical CVE-2024-47561 on Apache Avro

2024-11-03 Thread Chirag Dewan via user
 Thanks Jim, appreciate the detailed response.
I dont allow my flink jobs to parse the schema per se. But my client 
application does that. It parses the schema and generates the POJOs which are 
then used in the Flink job. 
So I must upgrade the Avro version in my client application and dont want a 
situation where classes are generated on a different version and are serialized 
and deserialized using different versions (although reading the RNs for Avro 
from 1.11.1 to 1.11.4 suggests that should not be a problem too). 
Thanks,Chirag
On Thursday 31 October, 2024 at 08:35:17 am IST, Jim Hughes 
 wrote:  
 
 Hi Chirag,
How are you using Flink?  Do you allow users to pass in arbitrary Avro schemas 
to a Flink cluster?  

If not, then I don't think the CVE applies to you.  If so, then I'd imagine 
that replacing the Avro 1.11.3 jar with the 1.11.4 may be a suitable 
mitigation.  The fix in Apache Flink only changed the versions: 
https://github.com/apache/flink/commit/411c788cc25581be9801ba0980c3e4957c33bc80

The CVE description reads:

"Schema parsing in the Java SDK of Apache Avro 1.11.3 and previous versions 
allows bad actors to execute arbitrary code. Users are recommended to upgrade 
to version 1.11.4  or 1.12.0, which fix this issue."
Cheers,
Jim
On Wed, Oct 30, 2024 at 1:26 AM Chirag Dewan via user  
wrote:

 Any view on this? 

On Monday 28 October, 2024 at 04:16:17 pm IST, Chirag Dewan via user 
 wrote:  
 
 Hi,
There is a critical CVE on Apache Avro - NVD - CVE-2024-47561

Is there a released Flink version which has upgraded Avro to 1.11.4 or 1.12?
If not, is it safe to upgrade just AVRO, keeping flink-avro on 1.16.3 (my 
current Flink version).

Appreciate any inputs. 
Thanks,Chirag

| 
| 
| 
|  |  |

 |

 |
| 
|  | 
NVD - CVE-2024-47561


 |

 |

 |



  
  

Re: Tenantive Flink 2.0 release date?

2024-10-31 Thread Anil Dasari
Hi Yanquan,
Yes. Mainly, I am looking for JDK 11 support. Thank you for sharing the
details.

On Thu, Oct 31, 2024 at 4:26 AM Yanquan Lv  wrote:

> Hi, Anil.
> Flink can maintain compatibility with one previous version, so a bump to
> 1.19 can also meet your requirements?
> There are already PR with bump up to 1.19[1] and JDK11[2] adaptation in
> the FlinkCDC community, maybe you can refer to it for modification.
>
> [1]https://github.com/apache/flink-cdc/pull/3660
> 
> [2]https://github.com/apache/flink-cdc/pull/3633
> 
>
>
>
> 2024年10月28日 22:46,Anil Dasari  写道:
>
> Thanks for the information.
> I created https://issues.apache.org/jira/browse/FLINK-36605
> 
> yesterday.
>
> The Flink repository includes Java 11 profiles. Are there specific Flink
> JAR files compiled for Java 11 that I can use? I’ve seen that there are
> Flink Docker images for Java 11 available for Flink 1.20. Is there a plan
> to upgrade to Flink 1.20?.
>
> I attempted to upgrade Flink CDC to version 1.20 (and Java 11), and it
> failed. Also, some of the imports still depend on Guava 31 (from the older
> Flink shaded JAR i.e jre31).
>
> Thanks
>
> On Mon, Oct 28, 2024 at 4:37 AM Yanquan Lv  wrote:
>
>> Hi, Anil.
>> In Flink 2.0, the Depreciated APIs was removed, which requires connector
>> adaptation. It is expected that most external connectors will complete the
>> migration work in Flink 2.3.
>> The FlinkCDC community hopes to bump this version and JDK 11 after the
>> release of Flink 2.0 for a period of time, and upgrading to Debezium 2. x
>> also requires a lot of adaptation work, so the expected upgrade time should
>> be in Flink 2.3 or one or two versions after that.
>>
>>
>> 2024年10月23日 13:40,Anil Dasari  写道:
>>
>> Hi,
>>
>> We are planning to explore Flink CDC for our CDC pipelines and have
>> quickly noticed that Flink CDC is still using DBZ 1.9.2.Final.
>>
>> DBZ 2.0.0.Final is a major release that requires JDK 11, while the latest
>> version, DBZ 3.0.0.Final, requires JDK 17. Currently, Flink CDC 3.2.0 is
>> using Flink 1.19 and JDK 1.8, but Flink only supports Java 11 from version
>> 2.0 - https://github.com/apache/flink/blob/master/pom.xml#L128
>> 
>> .
>>
>> Could you please share the tentative release timeline for Flink 2.0?
>> Thanks in advance.
>> Thanks
>>
>>
>>
>


Re: Tenantive Flink 2.0 release date?

2024-10-31 Thread Yanquan Lv
Hi, Anil.
Flink can maintain compatibility with one previous version, so a bump to 1.19 
can also meet your requirements?
There are already PR with bump up to 1.19[1] and JDK11[2] adaptation in the 
FlinkCDC community, maybe you can refer to it for modification.

[1]https://github.com/apache/flink-cdc/pull/3660
[2]https://github.com/apache/flink-cdc/pull/3633



> 2024年10月28日 22:46,Anil Dasari  写道:
> 
> Thanks for the information. 
> I created https://issues.apache.org/jira/browse/FLINK-36605 yesterday. 
> 
> The Flink repository includes Java 11 profiles. Are there specific Flink JAR 
> files compiled for Java 11 that I can use? I’ve seen that there are Flink 
> Docker images for Java 11 available for Flink 1.20. Is there a plan to 
> upgrade to Flink 1.20?.
> 
> I attempted to upgrade Flink CDC to version 1.20 (and Java 11), and it 
> failed. Also, some of the imports still depend on Guava 31 (from the older 
> Flink shaded JAR i.e jre31). 
> 
> Thanks
> 
> On Mon, Oct 28, 2024 at 4:37 AM Yanquan Lv  > wrote:
>> Hi, Anil.
>> In Flink 2.0, the Depreciated APIs was removed, which requires connector 
>> adaptation. It is expected that most external connectors will complete the 
>> migration work in Flink 2.3.
>> The FlinkCDC community hopes to bump this version and JDK 11 after the 
>> release of Flink 2.0 for a period of time, and upgrading to Debezium 2. x 
>> also requires a lot of adaptation work, so the expected upgrade time should 
>> be in Flink 2.3 or one or two versions after that.
>> 
>> 
>>> 2024年10月23日 13:40,Anil Dasari >> > 写道:
>>> 
>>> Hi,
>>> We are planning to explore Flink CDC for our CDC pipelines and have quickly 
>>> noticed that Flink CDC is still using DBZ 1.9.2.Final.
>>> 
>>> DBZ 2.0.0.Final is a major release that requires JDK 11, while the latest 
>>> version, DBZ 3.0.0.Final, requires JDK 17. Currently, Flink CDC 3.2.0 is 
>>> using Flink 1.19 and JDK 1.8, but Flink only supports Java 11 from version 
>>> 2.0 - https://github.com/apache/flink/blob/master/pom.xml#L128. 
>>> 
>>> Could you please share the tentative release timeline for Flink 2.0? Thanks 
>>> in advance.
>>> 
>>> Thanks
>> 



Re: Critical CVE-2024-47561 on Apache Avro

2024-10-30 Thread Jim Hughes via user
Hi Chirag,

How are you using Flink?  Do you allow users to pass in arbitrary Avro
schemas to a Flink cluster?

If not, then I don't think the CVE applies to you.  If so, then I'd imagine
that replacing the Avro 1.11.3 jar with the 1.11.4 may be a suitable
mitigation.  The fix in Apache Flink only changed the versions:
https://github.com/apache/flink/commit/411c788cc25581be9801ba0980c3e4957c33bc80

The CVE description reads:

"Schema parsing in the Java SDK of Apache Avro 1.11.3 and previous versions
allows bad actors to execute arbitrary code. Users are recommended to
upgrade to version 1.11.4  or 1.12.0, which fix this issue."

Cheers,

Jim

On Wed, Oct 30, 2024 at 1:26 AM Chirag Dewan via user 
wrote:

> Any view on this?
>
>
> On Monday 28 October, 2024 at 04:16:17 pm IST, Chirag Dewan via user <
> user@flink.apache.org> wrote:
>
>
> Hi,
>
> There is a critical CVE on Apache Avro - NVD - CVE-2024-47561
> 
>
> Is there a released Flink version which has upgraded Avro to 1.11.4 or
> 1.12?
>
> If not, is it safe to upgrade just AVRO, keeping flink-avro on 1.16.3 (my
> current Flink version).
>
> Appreciate any inputs.
>
> Thanks,
> Chirag
>
> NVD - CVE-2024-47561
>
> 
>
>
>


Re: Unittesting async functions with mocked fields

2024-10-30 Thread Alexey Novakov via user
Hi Burak,

I recommend not using Mock libraries with Scala as it does not really need
that. Just substitute the CaffeineHelper with another implementation for
tests.
You could create two implementations of the trait CacheHelper[V]:

trait CacheHelper[V] extends Serializable {
  def get(id: String): Option[V]

  def put(key: String)(value: V): Unit
}


class CaffeineHelper[V](cache: CaffeineCache[V]) extends CacheHelper[V] {

  def get(id: String): Option[V] = Try(cache.get(id).get).toOption

  def put(key: String)(value: V): Unit = cache.put(key)(value); ()

}


class CacheHelperMock[V](map: mutable.Map[V, V]) extends CacheHelper[V] {

  def get(id: String): Option[V] = map.get(id)

  def put(key: String)(value: V): Unit = cache.put(key, value); ()

}

Then use the second one in your tests.

Also, I see that Flink complains that scala.Option is serializable. It
seems this import "import org.apache.flink.api.scala.createTypeInformation"
or this one "import org.apache.flink.api.scala._" is missing in one of your
Scala classes/files.
In more details, this could be a case when Scala implicit type converter
was not found/available in the scope of the Flink job graph construction,
so that Flink either fallbacks to Kryo serialization framework or gives up
completely like in your case.


Alexey

On Wed, Oct 30, 2024 at 3:38 PM Burak Dursunlar <
burak.dursun...@trendyol.com> wrote:

> I am struggling with writing unittests on scala flink application.
>
> For instance I have an async mapper like below. It takes a User object
> with id, and enriches with age:
>
> ```
>
> case class User(id: String)case class UserWithMeta(id: String, age: Long)
> class EnrichWithUserMeta extends RichAsyncFunction[User, UserWithMeta] {
>   protected var userMetadataCache: CaffeineHelper[Long] = _
>
>   override def open(parameters: Configuration): Unit = {
> userMetadataCache = CaffeineHelper.newCache[Long](10.seconds)
>   }
>
>   override def asyncInvoke(input: User, resultFuture: 
> ResultFuture[UserWithMeta]): Unit = {
> val age = getUserAge(input.id)
> resultFuture.complete(Seq(UserWithMeta(input.id, age)))
>   }
>
>   private def getUserAge(userId: String): Long = {
> userMetadataCache.get(userId) match {
>   case Some(age) => age
>   case None => { // Cache miss. Get it from DB.
> val age = getAgeFromDB(content)
> userMetadataCache.put(userId)(age)
> age
>   }
> }
>   }
> }
>
> ```
>
> The above mapper has a cache instance of CaffeineHelper. CaffeineHelper is a 
> wrapper of a Caffeine caching library.
>
> Here is the CaffeineHelper.scala:
>
> ```
>
> trait CaffeineHelper[V] extends Serializable {
>   @transient protected var cache: CaffeineCache[V]
>
>   def get(id: String): Option[V] = Try(cache.get(id).get).toOption
>
>   def put(key: String)(value: V) = cache.put(key)(value)
> }
> object CaffeineHelper {
>   def newCache[V](d: Duration): CaffeineHelper[V] = {
> new CaffeineHelper[V] {
>   override protected var cache: CaffeineCache[V] = ??? // Actual 
> implementation.
> }
>   }
> }
>
> ```
>
> The main flink application that uses the async mapper above works fine. 
> However, when I try to write unittests mocked CaffeineHelper cannot be 
> serialized.
>
> ```
>
> package org.example
>
> import org.apache.flink.api.scala.createTypeInformation
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
> import org.apache.flink.streaming.api.scala.{AsyncDataStream, 
> StreamExecutionEnvironment}
> import org.apache.flink.test.util.MiniClusterWithClientResource
> import org.mockito.IdiomaticMockito.StubbingOps
> import org.mockito.Mockito.withSettings
> import org.mockito.mock.SerializableMode
> import org.scalatest.BeforeAndAfter
> import org.scalatest.flatspec.AnyFlatSpec
> import org.scalatest.matchers.should.Matchers
> import org.scalatestplus.mockito.MockitoSugar.mock
> import scalacache.caffeine.CaffeineCache
> import java.util.concurrent.TimeUnit
>
>
> class EnrichWithUserMetaTest extends AnyFlatSpec with Matchers with 
> BeforeAndAfter {
>   private val flinkCluster = new MiniClusterWithClientResource(new 
> MiniClusterResourceConfiguration.Builder().build())
>
>   before(flinkCluster.before())
>   after(flinkCluster.after())
>
>   "EnrichWithUserMetaMapper" should "enrich with brand" in {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> val caffeineMock = 
> mock[CaffeineHelper[Long]](withSettings().serializable(SerializableMode.ACROSS_CLASSLOADERS))
>
> val ageMapper = new EnrichWithUserMeta() {
>   override def open(parameters: Configuration): Unit = {
> userMetadataCache = caffeineMock
>   }
> }
>
> caffeineMock.get("content1") returns Some(4L)
>
> val stream = env.fromElements(
>   User("user1")
> )
>
> val enrichedProducts = AsyncDataStream
>   .unorderedWait(stream, ageMapper, 5, TimeUnit.SECONDS, 1)
>  

Re: Trying to understand watermark in join with FlinkSQL and late events

2024-10-30 Thread Alexey Novakov via user
Hi Guillermo.

ORD007 is included due to "LEFT Join" logic. The LEFT JOIN keyword returns
all records from the left table, and the matching records from the right
table.

Watermark table configuration and "FOR SYSTEM_TIME AS OF" do not discard
normal LEFT join behavior here.

Best regards,
Alexey

On Wed, Oct 30, 2024 at 8:18 AM Guillermo  wrote:

> I'm trying to understand how watermarks work in FlinkSQL. I’ve created the
> following tables:
>
>   CREATE TABLE currency_rates (
>   currency STRING,
>   conversion_rate STRING,
>   update_time TIMESTAMP(3),
>   WATERMARK FOR update_time AS update_time,
>   PRIMARY KEY(currency) NOT ENFORCED
>   ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'currency_rates',
>   'properties.bootstrap.servers' = 'kafka:9092',
>   'properties.group.id' = 'currency_rates1',
>   'key.format' = 'raw',
>   'value.format' = 'avro',
>   'properties.auto.offset.reset' = 'earliest',
>   'value.fields-include' = 'ALL',
>   'scan.watermark.idle-timeout' = '1000'
>   );
>
>
>   CREATE TABLE orders (
>   order_idSTRING,
>   price   DECIMAL(32,2),
>   currencySTRING,
>   order_time  TIMESTAMP(3),
>   WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
>   ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'orders',
>   'properties.bootstrap.servers' = 'kafka:9092',
>   'properties.group.id' = 'orders1',
>   'value.format' = 'avro',
>   'properties.auto.offset.reset' = 'latest',
>   'value.fields-include' = 'ALL'
>   );
>
>
> To test this, I run the following query:
>
>   SELECT
>   orders.order_id,
>   orders.price,
>   orders.currency,
>   currency_rates.conversion_rate,
>   orders.order_time,
>   currency_rates.update_time
>   FROM orders
>   LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
>   ON orders.currency = currency_rates.currency;
>
> These are the inserts I’m making:
>
> INSERT INTO currency_rates VALUES ('USD', 'value 1',
> CURRENT_TIMESTAMP);
> INSERT INTO orders VALUES ('ORD001', 100.00, 'USD', CURRENT_TIMESTAMP);
> INSERT INTO orders VALUES ('ORD002', 100.00, 'USD', CURRENT_TIMESTAMP);
>
> The following result is displayed:
> order_idpricecurrencyconversion_rateorder_timeupdate_time
> ORD001 100.00 USD value 1 2024-10-29 21:42:41.630 2024-10-29 21:41:59.279
> Then, I run:
>
> INSERT INTO currency_rates VALUES ('USD', 'value 2',
> CURRENT_TIMESTAMP);
> INSERT INTO orders VALUES ('ORD007', 2000.00, 'USD', CURRENT_TIMESTAMP
> - interval '5' minutes);
> INSERT INTO orders VALUES ('ORD003', 100.00, 'USD', CURRENT_TIMESTAMP);
>
> And:
>
> order_idpricecurrencyconversion_rateorder_timeupdate_time
> ORD001 100.00 USD value 1 2024-10-29 21:42:41.630 2024-10-29 21:41:59.279
> ORD002 100.00 USD value 1 2024-10-29 21:42:46.936 2024-10-29 21:41:59.279
> ORD007 2000.00 USD  2024-10-29 21:39:32.560 . <-
> I don't understand why record ORD007 appears, as it was inserted with a
> 5-minute delay, so I thought it should not be included because it came late.
>
>
>


Re: Critical CVE-2024-47561 on Apache Avro

2024-10-29 Thread Chirag Dewan via user
 Any view on this? 

On Monday 28 October, 2024 at 04:16:17 pm IST, Chirag Dewan via user 
 wrote:  
 
 Hi,
There is a critical CVE on Apache Avro - NVD - CVE-2024-47561

Is there a released Flink version which has upgraded Avro to 1.11.4 or 1.12?
If not, is it safe to upgrade just AVRO, keeping flink-avro on 1.16.3 (my 
current Flink version).

Appreciate any inputs. 
Thanks,Chirag

| 
| 
| 
|  |  |

 |

 |
| 
|  | 
NVD - CVE-2024-47561


 |

 |

 |



  

Re: Mysql CDC: support for parallelism

2024-10-29 Thread Michael Marino
Hi Hang,

Thanks for the response. Unfortunately, this doesn't work for me, it still
blocks and does not propagate the watermark downstream. At the moment, the
only solution for me is to do set table.exec.source.idle-timeout.

Thanks,
Mike



On Mon, Oct 28, 2024 at 9:29 AM Hang Ruan  wrote:

> Hi, Michael.
>
> MySQL CDC source has the parallelism 1 when reading binlog events to keep
> their order. And other subtasks will stop reading data.
> For your question, you could set the option
> 'scan.incremental.close-idle-reader.enabled'='true'[1] in your cdc table to
> let the source close the idle subtasks.
> ps: There is a limit when opening this option. Please see more in its
> description.
>
> Best,
> Hang
>
> [1]
> https://nightlies.apache.org/flink/flink-cdc-docs-release-3.2/docs/connectors/flink-sources/mysql-cdc/
>
> Michael Marino  于2024年10月24日周四 19:13写道:
>
>> Let me quickly follow up on this:
>>
>> - I missed noting that I *was* setting the server-id value to a range.
>> - I just realized that if I do a hard restart and start without a
>> snapshot, then this works, i.e. the multiple sub-tasks receive events and
>> the watermarking/processing progresses. This is, however, not really ideal,
>> is there any way to scale CDC without this hard restart?
>>
>> Thanks,
>> Mike
>>
>>
>> On Thu, Oct 24, 2024 at 12:25 PM Michael Marino 
>> wrote:
>>
>>> Hey all,
>>>
>>> We are working to scale one of our Flink Jobs (using Table API mostly,
>>> some DataStream) where we are using a MySQL CDC table as a source for
>>> enrichment.
>>>
>>> What I've noticed is that, when I increase the parallelism of the job
>>> (e.g. to 2), the CDC table source has 2 tasks, but only one of these reads
>>> any events. The other one remains completely idle. This stalls downstream
>>> processing because we are not getting any watermarks, the only way I've
>>> found to get this to continue is to set table.exec.source.idle-timeout to a
>>> non-zero value.
>>>
>>> My questions are:
>>>   - is there some setting I can tune to get the CDC to distribute events
>>> across the different sub-tasks?
>>>   - If the above isn't possible, is there a way in the Table/SQL API to
>>> reduce the parallelism (e.g. to 1)? CDC doesn't seem to support
>>> scan.parallelism.
>>>
>>> If neither of the above works, I think I may be forced to use the
>>> DataStream API, set the parallelism explicitly and then convert to a table.
>>>
>>> Thanks!
>>>
>>> Cheers,
>>> Mike
>>>
>>> --
>>>
>>> Michael Marino
>>>
>>> Principal Data Science & Analytics
>>>
>>> Phone:  +49 89 7167786 - 14
>>>
>>> linkedin.com/company/tadogmbh
>>>  | facebook.com/tado
>>>  | twitter.com/tado
>>>  | youtube.com/tado
>>> 
>>>
>>> www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany
>>>
>>>  Managing Directors: Dr. Philip Beckmann | Christian Deilmann |
>>> Johannes Schwarz | Dr. Frank Siebdrat | Lukas Zyla
>>>
>>> Registered with the Commercial Register Munich as HRB 194769 | VAT-No:
>>> DE 280012558
>>>
>>
>>
>> --
>>
>> Michael Marino
>>
>> Principal Data Science & Analytics
>>
>> Phone:  +49 89 7167786 - 14
>>
>> linkedin.com/company/tadogmbh 
>>  | facebook.com/tado  | twitter.com/tado
>>  | youtube.com/tado
>> 
>>
>> www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany
>>
>>  Managing Directors: Dr. Philip Beckmann | Christian Deilmann | Johannes
>> Schwarz | Dr. Frank Siebdrat | Lukas Zyla
>>
>> Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE
>> 280012558
>>
>

-- 

Michael Marino

Principal Data Science & Analytics

Phone:  +49 89 7167786 - 14

linkedin.com/company/tadogmbh  |
facebook.com/tado  | twitter.com/tado
 | youtube.com/tado


www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany

 Managing Directors: Dr. Philip Beckmann | Christian Deilmann | Johannes
Schwarz | Dr. Frank Siebdrat | Lukas Zyla

Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE
280012558


Re: Task manager has no more allocated slots for job

2024-10-29 Thread John Smith
What does this mean exactly?

On Fri, Oct 25, 2024 at 10:12 AM John Smith  wrote:

> Hi getting the following exception. I have 3 task managers running and
> they all have enough slots available. See screenshot.
>
> 2024-10-25 10:00:31
> org.apache.flink.util.FlinkException: TaskExecutor
> akka.tcp://fl...@xxx.xxx.xxx:42441/user/rpc/taskmanager_0 has no more
> allocated slots for job 925415d3c719bc881031bc47ce754e8f.
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnectionIfNoAllocatedResources(TaskExecutor.java:1936)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.freeSlotInternal(TaskExecutor.java:1917)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.timeoutSlot(TaskExecutor.java:1950)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$3200(TaskExecutor.java:183)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$SlotActionsImpl.lambda$timeoutSlot$1(TaskExecutor.java:2352)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:537)
> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
> at akka.actor.ActorCell.invoke(ActorCell.scala:548)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> at
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
> at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
> at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
> at
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
>
>
> [image: Screen Shot 2024-10-25 at 10.07.20 AM.png]
>


Re: Tenantive Flink 2.0 release date?

2024-10-28 Thread Anil Dasari
Thanks for the information.
I created https://issues.apache.org/jira/browse/FLINK-36605 yesterday.

The Flink repository includes Java 11 profiles. Are there specific Flink
JAR files compiled for Java 11 that I can use? I’ve seen that there are
Flink Docker images for Java 11 available for Flink 1.20. Is there a plan
to upgrade to Flink 1.20?.

I attempted to upgrade Flink CDC to version 1.20 (and Java 11), and it
failed. Also, some of the imports still depend on Guava 31 (from the older
Flink shaded JAR i.e jre31).

Thanks

On Mon, Oct 28, 2024 at 4:37 AM Yanquan Lv  wrote:

> Hi, Anil.
> In Flink 2.0, the Depreciated APIs was removed, which requires connector
> adaptation. It is expected that most external connectors will complete the
> migration work in Flink 2.3.
> The FlinkCDC community hopes to bump this version and JDK 11 after the
> release of Flink 2.0 for a period of time, and upgrading to Debezium 2. x
> also requires a lot of adaptation work, so the expected upgrade time should
> be in Flink 2.3 or one or two versions after that.
>
>
> 2024年10月23日 13:40,Anil Dasari  写道:
>
> Hi,
>
> We are planning to explore Flink CDC for our CDC pipelines and have
> quickly noticed that Flink CDC is still using DBZ 1.9.2.Final.
>
> DBZ 2.0.0.Final is a major release that requires JDK 11, while the latest
> version, DBZ 3.0.0.Final, requires JDK 17. Currently, Flink CDC 3.2.0 is
> using Flink 1.19 and JDK 1.8, but Flink only supports Java 11 from version
> 2.0 - https://github.com/apache/flink/blob/master/pom.xml#L128
> 
> .
>
> Could you please share the tentative release timeline for Flink 2.0?
> Thanks in advance.
> Thanks
>
>
>


Re: Flink custom sink

2024-10-28 Thread Anil Dasari
Hi Yanquan,
Could you provide some insights and help with this? I see similar requests
in flink slack channel and no active conversations. It seems that the Flink
Slack channels aren't very active, and the documentation lacks examples for
different approaches.

I'm trying to implement micro-batching with near real-time processing and
have explored the following options:

   1. *Windowing*: This seemed promising, but the flushing mechanism
   requires record-level information checks as window information isn't
   available throughout the pipeline.
   2. *Window + Trigger*: This buffers events until the trigger interval is
   reached, which makes it non-real-time; events are only processed when the
   trigger event is triggered.
   3. *Processing Time*: The processing time is local to each file writer,
   meaning it's not consistent across different task managers.
   4. *Watermark*: There’s no global watermark; it's specific to each
   source task. and initial watermark (pre first watermark event) info is not
   epoch.

want to write the data grouped by time (micro-batch time) and then by
category. What’s the best approach to achieve micro-batching in Flink?

Thanks

On Fri, Oct 25, 2024 at 2:35 AM Anil Dasari  wrote:

> Hello all,
>
> Are there Flink patterns that support microbatching and ensure all data
> for a microbatch is written to a specific prefix in the destination with
> exactly-once delivery?
> I’ve explored both window and processing time (implemented in FileSink)
> options to set the boundaries for microbatches.
>
> However, processing time can't establish data boundaries by timestamp
> across multiple sink writer tasks when each writer starts at different
> times due to Flink's auto-scaling. If I understand correctly, the process
> time approach operates based on fixed intervals within a single sink writer
> (e.g., FileSink) rather than as a fixed interval task across multiple sink
> writers.
>
> The window option requires checking both the current and previous
> element's window start times to determine the boundaries, complicating the
> data flush process.
> Is there a way to listen for the window start/end event in all the
> pipeline operators post window operator to effectively implement this as an
> OnProcessTime function in FileSink?
>
> If yes, Could you please point me to the docs or examples if any? thanks
> in advance.
>
> Thanks
>
> On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv  wrote:
>
>> Hi, Anil.
>>
>> Iceberg Sink is merged recently in
>> https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880
>> 
>> .
>>
>> From your description, I guess that what you need is
>> a TwoPhaseCommittingSink[1], the steps you listed can be executed with the
>> following steps:
>>
>> > 1. Group data by category and write it to S3 under its respective
>> prefix.
>> This can be done in
>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write
>>  method.
>>
>> > 2. Update category metrics in a manifest file stored in the S3 manifest
>> prefix.
>> > 3. Send category metrics to an external system to initiate consumption.
>> These metrics information could be passed by
>>  
>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit
>> method.
>> And `Update category metrics`/`Send category metrics` can be done in
>> org.apache.flink.api.connector.sink2.Committer#commit method.
>>
>> Rollback action could be done in SinkWriter or Committer, to delete files
>> from S3, you need to pass the files information though
>> PrecommittingSinkWriter#prepareCommit too. Then you can throw exception to
>> let Flink job failover and retry.
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html
>> 
>>
>>
>>
>> 2024年10月14日 12:49,Anil Dasari  写道:
>>
>> Hello,
>>
>> I am looking to implement a Flink sink for the following use case, where
>> the steps below are executed for each microbatch (using Spark terminology)
>> or trigger:
>>
>>1. Group data by category and write it to S3 under its respective
>>prefix.
>>2. Update category metrics in a manifest file stored in the S3
>>manifest prefix.
>>3. Send category metrics to an external system to initiate
>>consumption.
>>
>> In case of failure, any previously completed steps should be rolled back,
>> i.e., delete files from S3 and reprocess the entire microbatch.
>>
>> It seems this pattern can be implemented using the Unified Sink API, as
>> discussed in this video: https://www.youtube.com/watch?v=0GVI25OEs4A
>> 
>> .
>>
>> I'm currently reviewing FileSink and IcebergSink (still searching for
>> the source code) to under

Re: Flink CDC -> Kafka -> Paimon?

2024-10-28 Thread Andrew Otto
> Maybe you can simply the pipeline by synchronizing data from MariaDB to
Paimon directly.
> Paimon sink is already available in FlinkCDC, so we can directly write
data from MariaDB to Paimon

Yes! I have accomplished this, and will explore it more.  However, having
Kafka in the middle has some advantages:

- A distributed log buffer: MariaDB read and downstream (e.g. Paimon table)
write jobs are decoupled.
- CDC data in Kafka means it can be re-used for things. We can sink it to
Paimon or Iceberg tables, and we could use kafka + debezium json source to
implement other streaming jobs.

I added this to the JIRA. Thank you!





On Mon, Oct 28, 2024 at 6:52 AM Yanquan Lv  wrote:

> Hi, Andrew.
> Yeah, currently, the output from Kafka pipeline didn't contain schema
> info, So in Paimon action, it will be considered as a String type.
> Your suggestion is very meaningful. I plan to support this feature in the
> next version of FlinkCDC (FlinkCDC 3.3), which may be enabled through a
> parameter[1].
>
> And Paimon sink is already available in FlinkCDC, so we can directly write
> data from MariaDB to Paimon to reduce the number of components and links
> that need to be maintained, We will also follow up on any issues
> encountered in Paimon Pipeline sink.
>
> [1] https://issues.apache.org/jira/browse/FLINK-36611
>
>
> 2024年10月28日 18:22,Yanquan Lv  写道:
>
> Hi, Andrew.
> Yeah, currently, the output from Kafka pipeline didn't contain schema
> info, So in Paimon action, it will be considered as a String type.
> Your suggestion is very meaningful. I plan to support this feature in the
> next version of FlinkCDC (FlinkCDC 3.3), which may be enabled through a
> parameter[1].
>
> And Paimon sink is already available in FlinkCDC, so we can directly write
> data from MariaDB to Paimon to reduce the number of components and links
> that need to be maintained, We will also follow up on any issues
> encountered in Paimon Pipeline sink.
>
> [1] https://issues.apache.org/jira/browse/FLINK-36611
>
>
> 2024年10月26日 11:32,Andrew Otto  写道:
>
> Hi!
>
> I really like Flink CDC's pipeline connectors
> <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.2/docs/connectors/pipeline-connectors/overview/>!
> So simple!
> I also like Paimon's CDC ingestion action CLI
> <https://paimon.apache.org/docs/master/flink/cdc-ingestion/overview/>.
>
> I like these because I don't need to specify the schemas; they are
> inferred from the source.  I also like the schema evolution support!
>
> Paimon's recent Iceberg Compatibility
> <https://paimon.apache.org/docs/master/migration/iceberg-compatibility/>
> mode looks cool!
>
> I'd like to accomplish the following:
>
>- MariaDB CDC -> Kafka
>- Kafka -> Paimon
>- Query with Spark+Iceberg
>
> I can do MariaDB CDC -> Kafka with the flink-cdc pipeline connector
> <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.2/docs/connectors/pipeline-connectors/kafka/>
> .
> However, I'm stuck on Kafka -> Paimon.
>
> Paimon's Kafka CDC action docs
> <https://paimon.apache.org/docs/0.9/flink/cdc-ingestion/kafka-cdc/> say:
> > Usually, debezium-json contains ‘schema’ field, from which Paimon will
> retrieve data types. Make sure your debezium json has this field, or Paimon
> will use ‘STRING’ type.
>
> However, the messages generated by flink-cdc's pipeline connector do not
> have a schema field.  They look like:
>
> {
>   "before": null,
>   "after": {
> "rev_id": 37,
>  ...
>   },
>   "op": "c"
> }
>
> The only reference I can find to a schema field is in the debezium-json
> format documentation
> <https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/table/formats/debezium/#debezium-json-ddl>
> .
> > users may setup the Debezium Kafka Connect with the Kafka configuration
> 'value.converter.schemas.enable' enabled to include schema in the message
>
> This leads me to believe that the schema field that Paimon's doc is referring
> to is added not by debezium or flink-cdc, but by Kafka Connect when it is
> used with Debezium proper to write CDC messages to Kafka.
>
>- Does this mean that the CDC messages generated by the flink-cdc
>kafka pipeline connector are not compatible with Paimon's
>kafka_sync_database action?
>- Or, is there a way to cause flink-cdc pipeline connectors to include
>schema in the message?
>
>
> I could be misunderstanding something. Is Kafka Connect+Debezium used by
> Flink to support debezium-json formatted messages? I tried passing 
> properties.value.converter.schemas.enable:
> true to the flink-cdc pipeline kafka sink but that did not work (as
> expected).
>
> Thank you!
>
> -Andrew Otto
>  Wikimedia Foundation
>
> P.S. Context for what we are trying to do is here: T373144 [SPIKE] Learn
> and document how to use Flink-CDC from MediaWiki MariaDB locally
> <https://phabricator.wikimedia.org/T373144>
>
>
>
>
>
>


Re: Tenantive Flink 2.0 release date?

2024-10-28 Thread Yanquan Lv
Hi, Anil.
In Flink 2.0, the Depreciated APIs was removed, which requires connector 
adaptation. It is expected that most external connectors will complete the 
migration work in Flink 2.3.
The FlinkCDC community hopes to bump this version and JDK 11 after the release 
of Flink 2.0 for a period of time, and upgrading to Debezium 2. x also requires 
a lot of adaptation work, so the expected upgrade time should be in Flink 2.3 
or one or two versions after that.


> 2024年10月23日 13:40,Anil Dasari  写道:
> 
> Hi,
> We are planning to explore Flink CDC for our CDC pipelines and have quickly 
> noticed that Flink CDC is still using DBZ 1.9.2.Final.
> 
> DBZ 2.0.0.Final is a major release that requires JDK 11, while the latest 
> version, DBZ 3.0.0.Final, requires JDK 17. Currently, Flink CDC 3.2.0 is 
> using Flink 1.19 and JDK 1.8, but Flink only supports Java 11 from version 
> 2.0 - https://github.com/apache/flink/blob/master/pom.xml#L128. 
> 
> Could you please share the tentative release timeline for Flink 2.0? Thanks 
> in advance.
> 
> Thanks



Re: Flink CDC -> Kafka -> Paimon?

2024-10-28 Thread Yanquan Lv
Hi, Andrew.
Yeah, currently, the output from Kafka pipeline didn't contain schema info, So 
in Paimon action, it will be considered as a String type.
Your suggestion is very meaningful. I plan to support this feature in the next 
version of FlinkCDC (FlinkCDC 3.3), which may be enabled through a parameter[1].

And Paimon sink is already available in FlinkCDC, so we can directly write data 
from MariaDB to Paimon to reduce the number of components and links that need 
to be maintained, We will also follow up on any issues encountered in Paimon 
Pipeline sink.

[1] https://issues.apache.org/jira/browse/FLINK-36611


> 2024年10月28日 18:22,Yanquan Lv  写道:
> 
> Hi, Andrew.
> Yeah, currently, the output from Kafka pipeline didn't contain schema info, 
> So in Paimon action, it will be considered as a String type.
> Your suggestion is very meaningful. I plan to support this feature in the 
> next version of FlinkCDC (FlinkCDC 3.3), which may be enabled through a 
> parameter[1].
> 
> And Paimon sink is already available in FlinkCDC, so we can directly write 
> data from MariaDB to Paimon to reduce the number of components and links that 
> need to be maintained, We will also follow up on any issues encountered in 
> Paimon Pipeline sink.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-36611
> 
> 
>> 2024年10月26日 11:32,Andrew Otto  写道:
>> 
>> Hi!
>> 
>> I really like Flink CDC's pipeline connectors 
>> !
>>   So simple!  
>> I also like Paimon's CDC ingestion action CLI 
>> . 
>> 
>> I like these because I don't need to specify the schemas; they are inferred 
>> from the source.  I also like the schema evolution support!
>> 
>> Paimon's recent Iceberg Compatibility 
>>  
>> mode looks cool!
>> 
>> I'd like to accomplish the following:
>> MariaDB CDC -> Kafka 
>> Kafka -> Paimon
>> Query with Spark+Iceberg
>> I can do MariaDB CDC -> Kafka with the flink-cdc pipeline connector 
>> .
>> However, I'm stuck on Kafka -> Paimon.
>> 
>> Paimon's Kafka CDC action docs 
>>  say:
>> > Usually, debezium-json contains ‘schema’ field, from which Paimon will 
>> > retrieve data types. Make sure your debezium json has this field, or 
>> > Paimon will use ‘STRING’ type.
>> 
>> However, the messages generated by flink-cdc's pipeline connector do not 
>> have a schema field.  They look like:
>> 
>> {
>>   "before": null,
>>   "after": {
>> "rev_id": 37,
>>  ...
>>   },
>>   "op": "c"
>> }
>> 
>> The only reference I can find to a schema field is in the debezium-json 
>> format documentation 
>> .
>> > users may setup the Debezium Kafka Connect with the Kafka configuration 
>> > 'value.converter.schemas.enable' enabled to include schema in the message
>> 
>> This leads me to believe that the schema field that Paimon's doc is 
>> referring to is added not by debezium or flink-cdc, but by Kafka Connect 
>> when it is used with Debezium proper to write CDC messages to Kafka.
>> Does this mean that the CDC messages generated by the flink-cdc kafka 
>> pipeline connector are not compatible with Paimon's kafka_sync_database 
>> action? 
>> Or, is there a way to cause flink-cdc pipeline connectors to include schema 
>> in the message?
>> 
>> I could be misunderstanding something. Is Kafka Connect+Debezium used by 
>> Flink to support debezium-json formatted messages? I tried passing 
>> properties.value.converter.schemas.enable: true to the flink-cdc pipeline 
>> kafka sink but that did not work (as expected).
>> 
>> Thank you!
>> 
>> -Andrew Otto
>>  Wikimedia Foundation
>> 
>> P.S. Context for what we are trying to do is here: T373144 [SPIKE] Learn and 
>> document how to use Flink-CDC from MediaWiki MariaDB locally 
>> 
>> 
>> 
>> 
> 



Re: Flink CDC -> Kafka -> Paimon?

2024-10-28 Thread Yanquan Lv
Hi, Andrew.
Yeah, currently, the output from Kafka pipeline didn't contain schema info, So 
in Paimon action, it will be considered as a String type.
Your suggestion is very meaningful. I plan to support this feature in the next 
version of FlinkCDC (FlinkCDC 3.3), which may be enabled through a parameter[1].

And Paimon sink is already available in FlinkCDC, so we can directly write data 
from MariaDB to Paimon to reduce the number of components and links that need 
to be maintained, We will also follow up on any issues encountered in Paimon 
Pipeline sink.

[1] https://issues.apache.org/jira/browse/FLINK-36611


> 2024年10月26日 11:32,Andrew Otto  写道:
> 
> Hi!
> 
> I really like Flink CDC's pipeline connectors 
> !
>   So simple!  
> I also like Paimon's CDC ingestion action CLI 
> . 
> 
> I like these because I don't need to specify the schemas; they are inferred 
> from the source.  I also like the schema evolution support!
> 
> Paimon's recent Iceberg Compatibility 
>  mode 
> looks cool!
> 
> I'd like to accomplish the following:
> MariaDB CDC -> Kafka 
> Kafka -> Paimon
> Query with Spark+Iceberg
> I can do MariaDB CDC -> Kafka with the flink-cdc pipeline connector 
> .
> However, I'm stuck on Kafka -> Paimon.
> 
> Paimon's Kafka CDC action docs 
>  say:
> > Usually, debezium-json contains ‘schema’ field, from which Paimon will 
> > retrieve data types. Make sure your debezium json has this field, or Paimon 
> > will use ‘STRING’ type.
> 
> However, the messages generated by flink-cdc's pipeline connector do not have 
> a schema field.  They look like:
> 
> {
>   "before": null,
>   "after": {
> "rev_id": 37,
>  ...
>   },
>   "op": "c"
> }
> 
> The only reference I can find to a schema field is in the debezium-json 
> format documentation 
> .
> > users may setup the Debezium Kafka Connect with the Kafka configuration 
> > 'value.converter.schemas.enable' enabled to include schema in the message
> 
> This leads me to believe that the schema field that Paimon's doc is referring 
> to is added not by debezium or flink-cdc, but by Kafka Connect when it is 
> used with Debezium proper to write CDC messages to Kafka.
> Does this mean that the CDC messages generated by the flink-cdc kafka 
> pipeline connector are not compatible with Paimon's kafka_sync_database 
> action? 
> Or, is there a way to cause flink-cdc pipeline connectors to include schema 
> in the message?
> 
> I could be misunderstanding something. Is Kafka Connect+Debezium used by 
> Flink to support debezium-json formatted messages? I tried passing 
> properties.value.converter.schemas.enable: true to the flink-cdc pipeline 
> kafka sink but that did not work (as expected).
> 
> Thank you!
> 
> -Andrew Otto
>  Wikimedia Foundation
> 
> P.S. Context for what we are trying to do is here: T373144 [SPIKE] Learn and 
> document how to use Flink-CDC from MediaWiki MariaDB locally 
> 
> 
> 
> 



Re: Mysql CDC: support for parallelism

2024-10-28 Thread Hang Ruan
Hi, Michael.

MySQL CDC source has the parallelism 1 when reading binlog events to keep
their order. And other subtasks will stop reading data.
For your question, you could set the option
'scan.incremental.close-idle-reader.enabled'='true'[1] in your cdc table to
let the source close the idle subtasks.
ps: There is a limit when opening this option. Please see more in its
description.

Best,
Hang

[1]
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.2/docs/connectors/flink-sources/mysql-cdc/

Michael Marino  于2024年10月24日周四 19:13写道:

> Let me quickly follow up on this:
>
> - I missed noting that I *was* setting the server-id value to a range.
> - I just realized that if I do a hard restart and start without a
> snapshot, then this works, i.e. the multiple sub-tasks receive events and
> the watermarking/processing progresses. This is, however, not really ideal,
> is there any way to scale CDC without this hard restart?
>
> Thanks,
> Mike
>
>
> On Thu, Oct 24, 2024 at 12:25 PM Michael Marino 
> wrote:
>
>> Hey all,
>>
>> We are working to scale one of our Flink Jobs (using Table API mostly,
>> some DataStream) where we are using a MySQL CDC table as a source for
>> enrichment.
>>
>> What I've noticed is that, when I increase the parallelism of the job
>> (e.g. to 2), the CDC table source has 2 tasks, but only one of these reads
>> any events. The other one remains completely idle. This stalls downstream
>> processing because we are not getting any watermarks, the only way I've
>> found to get this to continue is to set table.exec.source.idle-timeout to a
>> non-zero value.
>>
>> My questions are:
>>   - is there some setting I can tune to get the CDC to distribute events
>> across the different sub-tasks?
>>   - If the above isn't possible, is there a way in the Table/SQL API to
>> reduce the parallelism (e.g. to 1)? CDC doesn't seem to support
>> scan.parallelism.
>>
>> If neither of the above works, I think I may be forced to use the
>> DataStream API, set the parallelism explicitly and then convert to a table.
>>
>> Thanks!
>>
>> Cheers,
>> Mike
>>
>> --
>>
>> Michael Marino
>>
>> Principal Data Science & Analytics
>>
>> Phone:  +49 89 7167786 - 14
>>
>> linkedin.com/company/tadogmbh 
>>  | facebook.com/tado  | twitter.com/tado
>>  | youtube.com/tado
>> 
>>
>> www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany
>>
>>  Managing Directors: Dr. Philip Beckmann | Christian Deilmann | Johannes
>> Schwarz | Dr. Frank Siebdrat | Lukas Zyla
>>
>> Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE
>> 280012558
>>
>
>
> --
>
> Michael Marino
>
> Principal Data Science & Analytics
>
> Phone:  +49 89 7167786 - 14
>
> linkedin.com/company/tadogmbh 
> | facebook.com/tado  | twitter.com/tado
>  | youtube.com/tado
> 
>
> www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany
>
>  Managing Directors: Dr. Philip Beckmann | Christian Deilmann | Johannes
> Schwarz | Dr. Frank Siebdrat | Lukas Zyla
>
> Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE
> 280012558
>


Re: Kafka sink producing record at event timestamp

2024-10-25 Thread Sebastian Zapata
there is one option of configuring this in kafka at the topic level
"""


By default Kafka will use the timestamp provided by the producer.

However, you can also make Kafka update the timestamp when it writes the
record to the log by setting message.timestamp.type to LogAppendTime on
your topic.

See the topic configurations section
 in the documentation.

https://stackoverflow.com/questions/55472723/get-the-time-the-message-reached-kafka




On Fri, Oct 25, 2024 at 6:14 AM Sachin Mittal  wrote:

> Hi,
> I am having a pipeline where source and sink are two Kafka topics.
> The pipeline uses event time semantics, where event time is extracted from
> the record.
>
> What I notice is that when producing records at the sink side, it produces
> them such that the record's time in the kafka topic is the same as its
> event time.
>
> Ideally I was expecting that the record's time should be the timestamp
> when the record was inserted in the Kafka topic and not the same as its
> event time.
>
> How can I control this behaviour?
>
> Thanks
> Sachin
>
>


Re: Flink custom sink

2024-10-25 Thread Anil Dasari
Hello all,

Are there Flink patterns that support microbatching and ensure all data for
a microbatch is written to a specific prefix in the destination with
exactly-once delivery?
I’ve explored both window and processing time (implemented in FileSink)
options to set the boundaries for microbatches.

However, processing time can't establish data boundaries by timestamp
across multiple sink writer tasks when each writer starts at different
times due to Flink's auto-scaling. If I understand correctly, the process
time approach operates based on fixed intervals within a single sink writer
(e.g., FileSink) rather than as a fixed interval task across multiple sink
writers.

The window option requires checking both the current and previous element's
window start times to determine the boundaries, complicating the data flush
process.
Is there a way to listen for the window start/end event in all the pipeline
operators post window operator to effectively implement this as an
OnProcessTime function in FileSink?

If yes, Could you please point me to the docs or examples if any? thanks in
advance.

Thanks

On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv  wrote:

> Hi, Anil.
>
> Iceberg Sink is merged recently in
> https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880
> 
> .
>
> From your description, I guess that what you need is
> a TwoPhaseCommittingSink[1], the steps you listed can be executed with the
> following steps:
>
> > 1. Group data by category and write it to S3 under its respective prefix.
> This can be done in
> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write
>  method.
>
> > 2. Update category metrics in a manifest file stored in the S3 manifest
> prefix.
> > 3. Send category metrics to an external system to initiate consumption.
> These metrics information could be passed by
>  
> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit
> method.
> And `Update category metrics`/`Send category metrics` can be done in
> org.apache.flink.api.connector.sink2.Committer#commit method.
>
> Rollback action could be done in SinkWriter or Committer, to delete files
> from S3, you need to pass the files information though
> PrecommittingSinkWriter#prepareCommit too. Then you can throw exception to
> let Flink job failover and retry.
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html
> 
>
>
>
> 2024年10月14日 12:49,Anil Dasari  写道:
>
> Hello,
>
> I am looking to implement a Flink sink for the following use case, where
> the steps below are executed for each microbatch (using Spark terminology)
> or trigger:
>
>1. Group data by category and write it to S3 under its respective
>prefix.
>2. Update category metrics in a manifest file stored in the S3
>manifest prefix.
>3. Send category metrics to an external system to initiate consumption.
>
> In case of failure, any previously completed steps should be rolled back,
> i.e., delete files from S3 and reprocess the entire microbatch.
>
> It seems this pattern can be implemented using the Unified Sink API, as
> discussed in this video: https://www.youtube.com/watch?v=0GVI25OEs4A
> 
> .
>
> I'm currently reviewing FileSink and IcebergSink (still searching for the
> source code) to understand their implementations and create a new one.
>
> Are there any step-by-step docs or examples available for writing a new
> unified sink?
>
> Thanks
>
>
>
>


Re: [ANNOUNCE] Apache Flink 2.0 Preview released

2024-10-25 Thread weijie guo
Hi Enric
I clone the code from apache/flink repo and import it to Idea, But there
was nothing unexpected.

在 2024年10月23日星期三,Enric Ott <243816...@qq.com> 写道:

> How to import the source code(from github) to Intelligent Idea,seems that
> a project descriptor is missing.
>
>
> -- 原始邮件 --
> *发件人:* "Xintong Song" ;
> *发送时间:* 2024年10月23日(星期三) 下午5:26
> *收件人:* "dev";"user" >;"user-zh";"announce";
> *主题:* [ANNOUNCE] Apache Flink 2.0 Preview released
>
> The Apache Flink community is very happy to announce the release of Apache
> Flink 2.0 Preview.
>
> Apache Flink® is an open-source unified stream and batch data processing
> framework for distributed, high-performing, always-available, and accurate
> data applications.
>
> This release is a preview of the upcoming Flink 2.0 release. The purpose
> is to facilitate early adaptation to the breaking changes for our users and
> partner projects (e.g., connectors), and to offer a sneak peek into the
> exciting new features while gathering feedback.
>
> Note: Flink 2.0 Preview is not a stable release and should not be used in
> production environments.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please checkout the release blog post for an overview of this release:
> https://flink.apache.org/2024/10/23/preview-release-of-apache-flink-2.0/
>
> The full release notes are available in jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?
> projectId=12315522&version=12355070
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Best,
>
> Becket, Jark, Martijn and Xintong
>
>

-- 

Best regards,

Weijie


Re: Mysql CDC: support for parallelism

2024-10-24 Thread Michael Marino
Let me quickly follow up on this:

- I missed noting that I *was* setting the server-id value to a range.
- I just realized that if I do a hard restart and start without a snapshot,
then this works, i.e. the multiple sub-tasks receive events and the
watermarking/processing progresses. This is, however, not really ideal, is
there any way to scale CDC without this hard restart?

Thanks,
Mike


On Thu, Oct 24, 2024 at 12:25 PM Michael Marino 
wrote:

> Hey all,
>
> We are working to scale one of our Flink Jobs (using Table API mostly,
> some DataStream) where we are using a MySQL CDC table as a source for
> enrichment.
>
> What I've noticed is that, when I increase the parallelism of the job
> (e.g. to 2), the CDC table source has 2 tasks, but only one of these reads
> any events. The other one remains completely idle. This stalls downstream
> processing because we are not getting any watermarks, the only way I've
> found to get this to continue is to set table.exec.source.idle-timeout to a
> non-zero value.
>
> My questions are:
>   - is there some setting I can tune to get the CDC to distribute events
> across the different sub-tasks?
>   - If the above isn't possible, is there a way in the Table/SQL API to
> reduce the parallelism (e.g. to 1)? CDC doesn't seem to support
> scan.parallelism.
>
> If neither of the above works, I think I may be forced to use the
> DataStream API, set the parallelism explicitly and then convert to a table.
>
> Thanks!
>
> Cheers,
> Mike
>
> --
>
> Michael Marino
>
> Principal Data Science & Analytics
>
> Phone:  +49 89 7167786 - 14
>
> linkedin.com/company/tadogmbh 
> | facebook.com/tado  | twitter.com/tado
>  | youtube.com/tado
> 
>
> www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany
>
>  Managing Directors: Dr. Philip Beckmann | Christian Deilmann | Johannes
> Schwarz | Dr. Frank Siebdrat | Lukas Zyla
>
> Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE
> 280012558
>


-- 

Michael Marino

Principal Data Science & Analytics

Phone:  +49 89 7167786 - 14

linkedin.com/company/tadogmbh  |
facebook.com/tado  | twitter.com/tado
 | youtube.com/tado


www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany

 Managing Directors: Dr. Philip Beckmann | Christian Deilmann | Johannes
Schwarz | Dr. Frank Siebdrat | Lukas Zyla

Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE
280012558


Re: Backporting array_agg from 1.20 to 1.18

2024-10-24 Thread Jiabao Sun
Hi Daniele, 

It's a good idea to implement a UDF[1] by referring to the ArrayAggFunction[2] 
in Flink.

I don't understand what "I don't get how RowData comes into play" means. 
Could you clarify it?

Best,
Jiabao

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#aggregate-functions
[2] 
https://github.com/apache/flink/blob/28ae3ae65d13b98dc28b919ef75c14780c512618/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/ArrayAggFunction.java

On 2024/10/24 08:34:32 Daniele Parmeggiani wrote:
> Hi Hanyu Zheng 👋
> 
> I've actually included the function into a UDF jar, hoping I could get
> away with not forking Flink itself.
> Do you think that'll work?
> 
> In my SQL script I import the function like this:
> ```sql
> create temporary function ArrayAggFunction
>     as 'eu.spaziodati.cp.flink.funcs.ArrayAggFunction'
>     using jar '
> ```
> 
> Then I use it like this:
> ```sql
> select `...`, ArrayAggFunction(row(...))
> ```
> 
> So the fact that a Row is being passed into the function does not
> surprise me, but I don't get how RowData comes into play.
> 
> 
> Daniele
> 
> 
> 
> On October 24, 2024, user  wrote:
> > Hi Daniele Parmeggiani,
> >
> > I have some ideas regarding the issue. From the exception:
> >
> > java.lang.ClassCastException: class org.apache.flink.types.Row cannot
> > be cast to class org.apache.flink.table.data.RowData
> > This error clearly indicates that the code is trying to cast a Row
> > object to a RowData object, which suggests that the input data being
> > processed is of type Row. However, the backported ARRAY_AGG function
> > expects RowData. The type mismatch between Row and RowData might be
> > causing the error.
> >
> > We need to confirm whether the data type passed to ARRAY_AGG is the
> > correct one. Additionally, the input and output strategy should be
> > defined in BuiltInFunctionDefinitions — we should first define the
> > input and output types and then let the function use them.
> >
> > It’s possible that checking the type in the wrong place and passing an
> > incorrect data type is leading to this issue.
> >
> >
> > --
> >
> >  
> > Hanyu (Peter) Zheng he/him/hisSoftware Engineer+1 (213) 431-7193
> > 
> > Follow us:
> >  > signature_type.community_content.blog>
> >   > peter-zheng/> 
> > 
> >  > apac_cd.inbound&utm_source=gmail&utm_medium=organic>
> >
> >
> 


Re: Backporting array_agg from 1.20 to 1.18

2024-10-24 Thread Daniele Parmeggiani
Hi Jiabao 👋

Sorry, I phrased it incorrectly.
I'm new to Flink, so I just don't know what RowData is essentially.


Daniele

On October 24, 2024, Jiabao Sun  wrote:
> Hi Daniele,
>
> It's a good idea to implement a UDF[1] by referring to the
> ArrayAggFunction[2] in Flink.
>
> I don't understand what "I don't get how RowData comes into play"
> means.
> Could you clarify it?
>
> Best,
> Jiabao
>
> [1]  master/docs/dev/table/functions/udfs/#aggregate-functions>
> [2]
>  table/flink-table-
> runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/ArrayAggFunction.java>
>
> On 2024/10/24 08:34:32 Daniele Parmeggiani wrote:
> > Hi Hanyu Zheng 👋
> >
> > I've actually included the function into a UDF jar, hoping I could
> get
> > away with not forking Flink itself.
> > Do you think that'll work?
> >
> > In my SQL script I import the function like this:
> > ```sql
> > create temporary function ArrayAggFunction
> >     as 'eu.spaziodati.cp.flink.funcs.ArrayAggFunction'
> >     using jar '<>
> > ```
> >
> > Then I use it like this:
> > ```sql
> > select `...`, ArrayAggFunction(row(...))
> > ```
> >
> > So the fact that a Row is being passed into the function does not
> > surprise me, but I don't get how RowData comes into play.
> >
> >
> > Daniele
> >
> >
> >
> > On October 24, 2024, user  wrote:
> > > Hi Daniele Parmeggiani,
> > >
> > > I have some ideas regarding the issue. From the exception:
> > >
> > > java.lang.ClassCastException: class org.apache.flink.types.Row
> cannot
> > > be cast to class org.apache.flink.table.data.RowData
> > > This error clearly indicates that the code is trying to cast a Row
> > > object to a RowData object, which suggests that the input data
> being
> > > processed is of type Row. However, the backported ARRAY_AGG
> function
> > > expects RowData. The type mismatch between Row and RowData might
> be
> > > causing the error.
> > >
> > > We need to confirm whether the data type passed to ARRAY_AGG is
> the
> > > correct one. Additionally, the input and output strategy should be
> > > defined in BuiltInFunctionDefinitions — we should first define the
> > > input and output types and then let the function use them.
> > >
> > > It’s possible that checking the type in the wrong place and
> passing an
> > > incorrect data type is leading to this issue.
> > >
> > >
> > > --
> > >
> > > <>
> > > Hanyu (Peter) Zheng he/him/hisSoftware Engineer+1 (213) 431-7193
> > > 
> > > Follow us:
> > >
> < >
> > > signature_type.community_content.blog>
> > > <>
> <
> > > peter-zheng/> <>
> > > <>
> > > <
> > > apac_cd.inbound&utm_source=gmail&utm_medium=organic>
> > >
> > >
> >


Re: Backporting array_agg from 1.20 to 1.18

2024-10-24 Thread Daniele Parmeggiani
Hi Hanyu Zheng 👋

I've actually included the function into a UDF jar, hoping I could get
away with not forking Flink itself.
Do you think that'll work?

In my SQL script I import the function like this:
```sql
create temporary function ArrayAggFunction
    as 'eu.spaziodati.cp.flink.funcs.ArrayAggFunction'
    using jar '
```

Then I use it like this:
```sql
select `...`, ArrayAggFunction(row(...))
```

So the fact that a Row is being passed into the function does not
surprise me, but I don't get how RowData comes into play.


Daniele



On October 24, 2024, user  wrote:
> Hi Daniele Parmeggiani,
>
> I have some ideas regarding the issue. From the exception:
>
> java.lang.ClassCastException: class org.apache.flink.types.Row cannot
> be cast to class org.apache.flink.table.data.RowData
> This error clearly indicates that the code is trying to cast a Row
> object to a RowData object, which suggests that the input data being
> processed is of type Row. However, the backported ARRAY_AGG function
> expects RowData. The type mismatch between Row and RowData might be
> causing the error.
>
> We need to confirm whether the data type passed to ARRAY_AGG is the
> correct one. Additionally, the input and output strategy should be
> defined in BuiltInFunctionDefinitions — we should first define the
> input and output types and then let the function use them.
>
> It’s possible that checking the type in the wrong place and passing an
> incorrect data type is leading to this issue.
>
>
> --
>
>  
> Hanyu (Peter) Zheng he/him/hisSoftware Engineer+1 (213) 431-7193
> 
> Follow us:
>  signature_type.community_content.blog>
>   peter-zheng/> 
> 
>  apac_cd.inbound&utm_source=gmail&utm_medium=organic>
>
>


Re: Backporting array_agg from 1.20 to 1.18

2024-10-23 Thread Hanyu (Peter) Zheng via user
Hi Daniele Parmeggiani,

I have some ideas regarding the issue. From the exception:

java.lang.ClassCastException: class org.apache.flink.types.Row cannot be
cast to class org.apache.flink.table.data.RowData
This error clearly indicates that the code is trying to cast a Row object
to a RowData object, which suggests that the input data being processed is
of type Row. However, the backported ARRAY_AGG function expects RowData.
The type mismatch between Row and RowData might be causing the error.

We need to confirm whether the data type passed to ARRAY_AGG is the correct
one. Additionally, the input and output strategy should be defined in
BuiltInFunctionDefinitions — we should first define the input and output
types and then let the function use them.

It’s possible that checking the type in the wrong place and passing an
incorrect data type is leading to this issue.

On Wed, Oct 23, 2024 at 5:32 PM Hanyu (Peter) Zheng 
wrote:

>
> I have some ideas.
> --
>
> [image: Confluent] 
> Hanyu (Peter) Zheng he/him/his
> Software Engineer
> +1 (213) 431-7193 <+1+(213)+431-7193>
> Follow us: [image: Blog]
> [image:
> Twitter] [image: LinkedIn]
> [image: Slack]
> [image: YouTube]
> 
>
> [image: Try Confluent Cloud for Free]
> 
>


-- 

[image: Confluent] 
Hanyu (Peter) Zheng he/him/his
Software Engineer
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
[image:
Twitter] [image: LinkedIn]
[image: Slack]
[image: YouTube]


[image: Try Confluent Cloud for Free]



RE: https://lists.apache.org/thread/vk4nb2d6n3n35bgjdrj1b0ndbpkxr0qw

2024-10-23 Thread Hanyu (Peter) Zheng via user
I have some idea
https://lists.apache.org/thread/vk4nb2d6n3n35bgjdrj1b0ndbpkxr0qw

-- 

[image: Confluent] 
Hanyu (Peter) Zheng he/him/his
Software Engineer
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
[image:
Twitter] [image: LinkedIn]
[image: Slack]
[image: YouTube]


[image: Try Confluent Cloud for Free]



RE: Backporting array_agg from 1.20 to 1.18

2024-10-23 Thread Hanyu (Peter) Zheng via user
I have some ideas.
-- 

[image: Confluent] 
Hanyu (Peter) Zheng he/him/his
Software Engineer
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
[image:
Twitter] [image: LinkedIn]
[image: Slack]
[image: YouTube]


[image: Try Confluent Cloud for Free]



Re: Re:Backpressure causing operators to stop ingestion completely

2024-10-23 Thread Raihan Sunny via user
Hi Jake,

Thanks for the suggestion. I'm actually using PyFlink and it seems that the 
flame graph can only account for Java methods. Is there any other methods to 
debug this? I was curious about the network buffers tuning. Given the situation 
that there's a surge of input data at times while other times there's a more or 
less contant flow of data, is there any specific network buffer settings that I 
might try to tweak and observe the changes?




From: Jake.zhang 
Sent: Tuesday, October 22, 2024 2:40 PM
To: Raihan Sunny ; user 
Subject: Re:Backpressure causing operators to stop ingestion completely


Hi,  you can use flame_graphs to debug

https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/ops/debugging/flame_graphs/

-- Original --
From: "Raihan Sunny" ;
Date: Tue, Oct 22, 2024 01:00 PM
To: "user";
Subject: Backpressure causing operators to stop ingestion completely

Hello,

I have an aggregator job that experiences backpressure after running for a 
while and completely stops processing. It doesn't take any further input from 
the source. Here's a bit of context:
- There are 3 producer jobs, all of which write data to a common Kafka topic
- The aggregator job reads in from that topic, aggregates the data once it 
finds 3 matching records for the same ID and emits a single record into another 
Kafka topic
- The logic is quite simple: the aggregator stores the data in its state for 
each ID until all 3 matches arrive and then the state is cleared for that ID
- 2 of the producer jobs emit records at an almost constant pace, while the 3rd 
producer emits data in chunks of variable length. The emission of data is also 
not periodic for this 3rd producer; meaning it can hold up to 15 seconds worth 
of data in one instance and 2 minutes of data in another for example

After running for a while, the Kafka source and the key-by operator following 
it are at 100% backpressure and the actual aggregator operator is shown in the 
UI as 100% busy even though it's doing absolutely nothing. All the jobs have a 
parallelism of 1.

If anything, I think it's the 3rd producer job which emits data chunks of 
variable length is causing the problem. However, I can't think of a valid 
explanation so far from what I've read about the Flink backpressure handling 
and how it's supposed to work. Can anyone please provide some pointers as to 
where I might look to figure out the issue?


Thanks,
Sunny


Re: Opensearch Connector for Flink 1.18+

2024-10-22 Thread Yanquan Lv
Hi, Kirti.
We already have a 2.0.0 version in maven repo[1] for Opensearch connector  for 
Flink 1.18/1.19.
But it should be noted that this version is built on JDK11[2].

[1] 
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-opensearch2
[2] https://lists.apache.org/thread/3w1rnjp5y612xy5k9yv44hy37zm9ph15


> 2024年10月23日 13:45,Kirti Dhar Upadhyay K via user  写道:
> 
> Hi,
>  
> Currently we are using Flink 1.16.3 with Opensearch connector version 
> 1.0.1-1.16 as per link below:
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/opensearch/
>  
> Now, we want to upgrade Flink version to 1.18+, but found that there no 
> Opensearch connector  for flink 1.18 onwards.
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/opensearch/
>  
> Is there any plan to give any connector for Flink 1.18+ versions in future?
>  
> Regards,
> Kirti Dhar



Re: Flink custom sink

2024-10-20 Thread Anil Dasari
Hi Yanquan,
What’s the most effective approach to implement micro batching (i.e.,
grouping a set of events into a time window) for writing to a single folder
at the destination while enhancing fault tolerance? Should this be done
using windowing and triggers, or through checkpointing process time?

Psuedo Window + trigger implementation:
https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/customsink/App.java#L97
https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/customsink/CustomStatefulSinkWriter.java#L109

Checkpointing process time:
https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/customsink/CustomStatefulSinkWriter.java#L173

I see file sink using process time to commit the pending in-progress files.

Please note that my implementation is still incomplete and trying to add
one step at a time. Any feedback is appreciated.

Thanks.

On Mon, Oct 14, 2024 at 8:30 PM Yanquan Lv  wrote:

> Sorry, I couldn't find any clear and detailed user guidance other than
> FLIP in the official documentation too.
>
>
> 2024年10月15日 01:39,Anil Dasari  写道:
>
> Hi Yanquan,
> I've finished reading the Sink FLIPs and am now reviewing some of the sink
> implementations, like TestSinkV2, to better understand the flow. I'll write
> a new one to experiment with.
> Are there flink sink docs/flow daigrams like detailed source
> implementation docs like
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/
> 
> ?
>
> Thanks.
>
> On Mon, Oct 14, 2024 at 10:18 AM Yanquan Lv  wrote:
>
>> Hi, Anil.
>> For your scenario, I think looking at FLIP-143 first and then FLIP-191
>> should provide a better understanding. Then you can look at other FLIPs or
>> specific implementations.
>>
>> Anil Dasari  于2024年10月15日周二 00:55写道:
>>
>>> Got it. thanks.
>>> Sink improvements have many FLIP confluence pages i.e FLIP-143, 171, 177
>>> and 191. So, Is there a sequence of steps flow charts for better
>>> understanding of the sink process with sink, writer and committer ?
>>>
>>> Thanks
>>>
>>> On Mon, Oct 14, 2024 at 9:48 AM Yanquan Lv  wrote:
>>>
 Yeah, TwoPhaseCommittingSink will be removed in Flink 2.0, and it will
 be replaced by SupportsCommitter[1] interface, which was introduced in
 Flink 1.19.
 But you can still use TwoPhaseCommittingSink under Flink 2.0, it
 depends on your target Flink version, The interfaces of these two APIs are
 almost identical.

 [1]
 https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html
 

 Anil Dasari  于2024年10月14日周一 23:26写道:

> Hi Yanquan,
> Thanks for sharing the information.
> It appears that TwoPhaseCommittingSink is not available in the flink
> repo main branch. it is replaced with Sink, Committer and SinkWritter ?
>
> Thanks
>
> On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv 
> wrote:
>
>> Hi, Anil.
>>
>> Iceberg Sink is merged recently in
>> https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880
>> 
>> .
>>
>> From your description, I guess that what you need is
>> a TwoPhaseCommittingSink[1], the steps you listed can be executed with 
>> the
>> following steps:
>>
>> > 1. Group data by category and write it to S3 under its respective
>> prefix.
>> This can be done in
>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write
>>  method.
>>
>> > 2. Update category metrics in a manifest file stored in the S3
>> manifest prefix.
>> > 3. Send category metrics to an external system to initiate
>> consumption.
>> These metrics information could be passed by
>>  
>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit
>> method.
>> And `Update category metrics`/`Send category metrics` can be done in
>> org.apache.flink.api.connector.sink2.Committer#commit method.
>>
>> Rollback action could be done in SinkWriter or Committer, to delete
>> files from S3, you need to pass the files information though
>> PrecommittingSinkWriter#prepareCommit too. Then you can throw exception 
>> to
>> let Flink job failover and retry.
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html
>> 
>>
>>
>>

Re: OperatorStateFromBackend can't complete initialisation because of high number of savepoint files reads

2024-10-18 Thread Gabor Somogyi
Hi William and Mate,

I've just tested compressed and uncompressed restore with the same amount
of data and the result matches with yours:
- Compressed restore operation performing well because snappy skips data
only for a single block which is relatively small
- Uncompressed restore operation performing well after applied my PR [1]

After it's going to be merged we can say both compressed and uncompressed
state is safe to use.

Thanks for everybody for the efforts to sort this out!

[1] https://github.com/apache/flink/pull/25509

G


On Thu, Oct 17, 2024 at 11:06 PM Gabor Somogyi 
wrote:

> Hi Mate,
>
> Thanks for the deep dive! I've had a slight look at the code and it makes
> sense why you and William is not seeing slowness with compressed state.
> Tomorrow I'll do some tests and come back with the results...
>
> @William Wallace  I think the restore should
> work without the memory-threshold setting with compression.
> When compression is off then my PR is going to be the cure🙂
>
> G
>
>
> On Thu, Oct 17, 2024 at 8:06 PM Mate Czagany  wrote:
>
>> Hi William,
>>
>> I think your findings are correct, I could easily reproduce the issue
>> with snapshot-compression set to false, but I was unable to with
>> snapshot-compression set to true.
>>
>> When using compressed state, the available() call will return the number
>> of bytes in the Snappy internal buffer that has not been decompressed yet
>> [1] [2]. It is safe to skip bytes here, as this will not cause the next
>> call to seek() to seek backwards.
>>
>> When not using compressed state, the available() call will return the
>> number of bytes buffered by the underlying BufferedInputStream which
>> buffers data from the filesystem, e.g. S3. In my tests the buffer size was
>> 4096, and if Flink read e.g. 50 bytes of data, the result of available()
>> was then 4046. Skipping 4046 bytes (or any number of bytes) meant that for
>> the next seek() call the buffer had to seek backwards, and for S3 that
>> meant closing and re-opening the stream, resulting in a new GET request for
>> each element in the list state.
>>
>> I think Gabor's PR [3] is the right solution, I can't think of any
>> situation where we would have to skip any bytes in the stream when not
>> using compressed state. I also think that compressed state is not affected
>> by this.
>>
>> [1]
>> https://github.com/xerial/snappy-java/blob/9f8c3cf74223ed0a8a834134be9c917b9f10ceb5/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java#L313
>> [2]
>> https://github.com/xerial/snappy-java/blob/9f8c3cf74223ed0a8a834134be9c917b9f10ceb5/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java#L566
>> [3] https://github.com/apache/flink/pull/25509
>>
>> Best,
>> Mate
>>
>> William Wallace  ezt írta (időpont: 2024.
>> okt. 17., Cs, 19:23):
>>
>>> Hi G,
>>>
>>> We did a test today using
>>> ```
>>> execution.checkpointing.snapshot-compression: true
>>> state.storage.fs.memory-threshold: 500kb
>>> ```
>>> across 6 jobs with different parallelism and volume load.
>>> I will use one as an example - 70 slots - I had 70 files of 670kb
>>> corresponding to the subtask state containing the KafkaSource operator. In
>>> total compressed savepoint size was around 360MB, 201 files, biggest was
>>> ~11MB. Job restored ok from checkpoint and savepoint not seeing the
>>> millions of reads we were observing before. (forced stop/run few times)
>>>
>>> We decided to let this soak for some time since our checkpoints can
>>> reach more than 10GB (uncompressed).
>>>
>>> Let me know if you have any updates. I will let you know if I observe
>>> anything else.
>>> Please let us know if you have any new findings.
>>>
>>> Thank you.
>>>
>>> On Tue, Oct 15, 2024 at 4:38 PM Gabor Somogyi 
>>> wrote:
>>>
>>>> Could you please let us know if you see anything wrong when using
>>>>> `execution.checkpointing.snapshot-compression: true` since for us this
>>>>> seems to have solved the multiple S3 reads issue.
>>>>>
>>>> When something is working it's never wrong. The question is why is has
>>>> been resolved.
>>>> Are you still having state.storage.fs.memory-threshold set to 500Kb?
>>>> State compression may reduce the state under this threshold which would
>>>> make that work.
>>>>
>>>> For uncompressed state could you please let us know how the change f

Re: OperatorStateFromBackend can't complete initialisation because of high number of savepoint files reads

2024-10-17 Thread Gabor Somogyi
Hi Mate,

Thanks for the deep dive! I've had a slight look at the code and it makes
sense why you and William is not seeing slowness with compressed state.
Tomorrow I'll do some tests and come back with the results...

@William Wallace  I think the restore should
work without the memory-threshold setting with compression.
When compression is off then my PR is going to be the cure🙂

G


On Thu, Oct 17, 2024 at 8:06 PM Mate Czagany  wrote:

> Hi William,
>
> I think your findings are correct, I could easily reproduce the issue with
> snapshot-compression set to false, but I was unable to with
> snapshot-compression set to true.
>
> When using compressed state, the available() call will return the number
> of bytes in the Snappy internal buffer that has not been decompressed yet
> [1] [2]. It is safe to skip bytes here, as this will not cause the next
> call to seek() to seek backwards.
>
> When not using compressed state, the available() call will return the
> number of bytes buffered by the underlying BufferedInputStream which
> buffers data from the filesystem, e.g. S3. In my tests the buffer size was
> 4096, and if Flink read e.g. 50 bytes of data, the result of available()
> was then 4046. Skipping 4046 bytes (or any number of bytes) meant that for
> the next seek() call the buffer had to seek backwards, and for S3 that
> meant closing and re-opening the stream, resulting in a new GET request for
> each element in the list state.
>
> I think Gabor's PR [3] is the right solution, I can't think of any
> situation where we would have to skip any bytes in the stream when not
> using compressed state. I also think that compressed state is not affected
> by this.
>
> [1]
> https://github.com/xerial/snappy-java/blob/9f8c3cf74223ed0a8a834134be9c917b9f10ceb5/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java#L313
> [2]
> https://github.com/xerial/snappy-java/blob/9f8c3cf74223ed0a8a834134be9c917b9f10ceb5/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java#L566
> [3] https://github.com/apache/flink/pull/25509
>
> Best,
> Mate
>
> William Wallace  ezt írta (időpont: 2024.
> okt. 17., Cs, 19:23):
>
>> Hi G,
>>
>> We did a test today using
>> ```
>> execution.checkpointing.snapshot-compression: true
>> state.storage.fs.memory-threshold: 500kb
>> ```
>> across 6 jobs with different parallelism and volume load.
>> I will use one as an example - 70 slots - I had 70 files of 670kb
>> corresponding to the subtask state containing the KafkaSource operator. In
>> total compressed savepoint size was around 360MB, 201 files, biggest was
>> ~11MB. Job restored ok from checkpoint and savepoint not seeing the
>> millions of reads we were observing before. (forced stop/run few times)
>>
>> We decided to let this soak for some time since our checkpoints can reach
>> more than 10GB (uncompressed).
>>
>> Let me know if you have any updates. I will let you know if I observe
>> anything else.
>> Please let us know if you have any new findings.
>>
>> Thank you.
>>
>> On Tue, Oct 15, 2024 at 4:38 PM Gabor Somogyi 
>> wrote:
>>
>>> Could you please let us know if you see anything wrong when using
>>>> `execution.checkpointing.snapshot-compression: true` since for us this
>>>> seems to have solved the multiple S3 reads issue.
>>>>
>>> When something is working it's never wrong. The question is why is has
>>> been resolved.
>>> Are you still having state.storage.fs.memory-threshold set to 500Kb?
>>> State compression may reduce the state under this threshold which would
>>> make that work.
>>>
>>> For uncompressed state could you please let us know how the change from
>>>> your PR eliminates the multiple calls to S3. Is not very clear to us.
>>>>
>>> Copy from the PR:
>>> Flink state restore from S3 is super slow because skip function is
>>> consuming ~15 seconds for ~6Mb of data.
>>> ...
>>> In this PR the skip going to be called only in case of compression
>>> because otherwise a stream is seekable.
>>>
>>> G
>>>
>>> On Tue, Oct 15, 2024 at 4:30 PM William Wallace <
>>> theanonymous31...@gmail.com> wrote:
>>>
>>>> Thank you for the recommendation and the help.
>>>>
>>>> Could you please let us know if you see anything wrong when using
>>>> `execution.checkpointing.snapshot-compression: true` since for us this
>>>> seems to have solved the multiple S3 reads issue.
>>>>
>>> In debug we see:
>>

Re: OperatorStateFromBackend can't complete initialisation because of high number of savepoint files reads

2024-10-17 Thread Mate Czagany
Hi William,

I think your findings are correct, I could easily reproduce the issue with
snapshot-compression set to false, but I was unable to with
snapshot-compression set to true.

When using compressed state, the available() call will return the number of
bytes in the Snappy internal buffer that has not been decompressed yet [1]
[2]. It is safe to skip bytes here, as this will not cause the next call to
seek() to seek backwards.

When not using compressed state, the available() call will return the
number of bytes buffered by the underlying BufferedInputStream which
buffers data from the filesystem, e.g. S3. In my tests the buffer size was
4096, and if Flink read e.g. 50 bytes of data, the result of available()
was then 4046. Skipping 4046 bytes (or any number of bytes) meant that for
the next seek() call the buffer had to seek backwards, and for S3 that
meant closing and re-opening the stream, resulting in a new GET request for
each element in the list state.

I think Gabor's PR [3] is the right solution, I can't think of any
situation where we would have to skip any bytes in the stream when not
using compressed state. I also think that compressed state is not affected
by this.

[1]
https://github.com/xerial/snappy-java/blob/9f8c3cf74223ed0a8a834134be9c917b9f10ceb5/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java#L313
[2]
https://github.com/xerial/snappy-java/blob/9f8c3cf74223ed0a8a834134be9c917b9f10ceb5/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java#L566
[3] https://github.com/apache/flink/pull/25509

Best,
Mate

William Wallace  ezt írta (időpont: 2024. okt.
17., Cs, 19:23):

> Hi G,
>
> We did a test today using
> ```
> execution.checkpointing.snapshot-compression: true
> state.storage.fs.memory-threshold: 500kb
> ```
> across 6 jobs with different parallelism and volume load.
> I will use one as an example - 70 slots - I had 70 files of 670kb
> corresponding to the subtask state containing the KafkaSource operator. In
> total compressed savepoint size was around 360MB, 201 files, biggest was
> ~11MB. Job restored ok from checkpoint and savepoint not seeing the
> millions of reads we were observing before. (forced stop/run few times)
>
> We decided to let this soak for some time since our checkpoints can reach
> more than 10GB (uncompressed).
>
> Let me know if you have any updates. I will let you know if I observe
> anything else.
> Please let us know if you have any new findings.
>
> Thank you.
>
> On Tue, Oct 15, 2024 at 4:38 PM Gabor Somogyi 
> wrote:
>
>> Could you please let us know if you see anything wrong when using
>>> `execution.checkpointing.snapshot-compression: true` since for us this
>>> seems to have solved the multiple S3 reads issue.
>>>
>> When something is working it's never wrong. The question is why is has
>> been resolved.
>> Are you still having state.storage.fs.memory-threshold set to 500Kb?
>> State compression may reduce the state under this threshold which would
>> make that work.
>>
>> For uncompressed state could you please let us know how the change from
>>> your PR eliminates the multiple calls to S3. Is not very clear to us.
>>>
>> Copy from the PR:
>> Flink state restore from S3 is super slow because skip function is
>> consuming ~15 seconds for ~6Mb of data.
>> ...
>> In this PR the skip going to be called only in case of compression
>> because otherwise a stream is seekable.
>>
>> G
>>
>> On Tue, Oct 15, 2024 at 4:30 PM William Wallace <
>> theanonymous31...@gmail.com> wrote:
>>
>>> Thank you for the recommendation and the help.
>>>
>>> Could you please let us know if you see anything wrong when using
>>> `execution.checkpointing.snapshot-compression: true` since for us this
>>> seems to have solved the multiple S3 reads issue.
>>>
>> In debug we see:
>>> `in.delegate =
>>> ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream)`
>>>
>>> and
>>> `in.compressionDelegate = SnappyFramedInputStream`
>>> and  in the logs a file is retrieved only once per subtask
>>> ```
>>> DEBUG com.amazonaws.request[] -
>>> Sending Request: GET 
>>> https://.../savepoints/flink-compression/.../savepoint-...
>>> Range: bytes=0-9223372036854775806.
>>> ```
>>>
>>> For uncompressed state could you please let us know how the change from
>>> your PR eliminates the multiple calls to S3. Is not very clear to us.
>>>
>> Thank you.
>>>
>>> On Tue, Oct 15, 2024 at 1:42 PM Gabor Somogyi 
>>> wrote:
>&g

Re: OperatorStateFromBackend can't complete initialisation because of high number of savepoint files reads

2024-10-17 Thread William Wallace
Hi G,

We did a test today using
```
execution.checkpointing.snapshot-compression: true
state.storage.fs.memory-threshold: 500kb
```
across 6 jobs with different parallelism and volume load.
I will use one as an example - 70 slots - I had 70 files of 670kb
corresponding to the subtask state containing the KafkaSource operator. In
total compressed savepoint size was around 360MB, 201 files, biggest was
~11MB. Job restored ok from checkpoint and savepoint not seeing the
millions of reads we were observing before. (forced stop/run few times)

We decided to let this soak for some time since our checkpoints can reach
more than 10GB (uncompressed).

Let me know if you have any updates. I will let you know if I observe
anything else.
Please let us know if you have any new findings.

Thank you.

On Tue, Oct 15, 2024 at 4:38 PM Gabor Somogyi 
wrote:

> Could you please let us know if you see anything wrong when using
>> `execution.checkpointing.snapshot-compression: true` since for us this
>> seems to have solved the multiple S3 reads issue.
>>
> When something is working it's never wrong. The question is why is has
> been resolved.
> Are you still having state.storage.fs.memory-threshold set to 500Kb? State
> compression may reduce the state under this threshold which would make that
> work.
>
> For uncompressed state could you please let us know how the change from
>> your PR eliminates the multiple calls to S3. Is not very clear to us.
>>
> Copy from the PR:
> Flink state restore from S3 is super slow because skip function is
> consuming ~15 seconds for ~6Mb of data.
> ...
> In this PR the skip going to be called only in case of compression because
> otherwise a stream is seekable.
>
> G
>
> On Tue, Oct 15, 2024 at 4:30 PM William Wallace <
> theanonymous31...@gmail.com> wrote:
>
>> Thank you for the recommendation and the help.
>>
>> Could you please let us know if you see anything wrong when using
>> `execution.checkpointing.snapshot-compression: true` since for us this
>> seems to have solved the multiple S3 reads issue.
>>
> In debug we see:
>> `in.delegate =
>> ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream)`
>>
>> and
>> `in.compressionDelegate = SnappyFramedInputStream`
>> and  in the logs a file is retrieved only once per subtask
>> ```
>> DEBUG com.amazonaws.request[] -
>> Sending Request: GET 
>> https://.../savepoints/flink-compression/.../savepoint-...
>> Range: bytes=0-9223372036854775806.
>> ```
>>
>> For uncompressed state could you please let us know how the change from
>> your PR eliminates the multiple calls to S3. Is not very clear to us.
>>
> Thank you.
>>
>> On Tue, Oct 15, 2024 at 1:42 PM Gabor Somogyi 
>> wrote:
>>
>>> My recommendation is to cherry-pick this PR [1] at top of your Flink
>>> distro when possible.
>>> Additionally turn off state compression. These should do the trick...
>>>
>>> [1] https://github.com/apache/flink/pull/25509
>>>
>>> G
>>>
>>>
>>> On Tue, Oct 15, 2024 at 1:03 PM William Wallace <
>>> theanonymous31...@gmail.com> wrote:
>>>
 Thank you Gabor for your reply.

 I'm sharing below more findings for both uncompressed and compressed
 state with the hope it helps. I'm looking further to your thoughts.

 1. uncompressed state - observe the
 `stateHandle=RelativeFileStateHandle`
 ```
 org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation []
 - Finished restoring from state handle:
 KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
 endKeyGroup=31}}, stateHandle=RelativeFileStateHandle State:
 s3p://.../savepoints/flink-no-compression/.../savepoint-.../12345678-...,
 12345678-... [... bytes]}.
  ```

 `FSDataInputStream in.delegate` in
 `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues`
 is an instance of
 `ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream`.
 For every `offset: offsets = metaInfo.getOffsets()` we end up doing an
 actual partial file read which in our case ends in order of millions
 because of high job parallelism (subtasks) and job can't recover.

 2. compressed state - observe the stateHandle=ByteStreamStateHandle
 ```
 org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation []
 - Finished restoring from state handle:
 KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
 endKeyGroup=31}},
 stateHandle=ByteStreamStateHandle{handleName='(s3p:.../savepoints/flink-compression/.../savepoint-.../12345678-...',
 dataBytes=...}}.
 ```
 `FSDataInputStream in.delegate` in
 `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues`
 is an instance if `ByteStreamStateHandle(ByteStateHandleInp

RE: [External] Terrible bug related to serialization of Pojos in keystate.

2024-10-16 Thread Schwalbe Matthias
Hi Ammon Diether,

This is actually not a bug, for logical (and documented) reasons keys can not 
be schema-migrated:

  *   When storing state / hash-distributing events, the target key group (one 
out of max parallelism) is calculated from the key hash.
  *   If you change the key, the hash changes and hence the key group
  *   Therefore schema migration is suppressed for keys

What you can do to solve the situation is to transcode your savepoint by means 
of the State Processor API [1]:
Read by means of the old key type, write by means of the new key type.
Also consider, if you change the key type, probably also the semantics of your 
aggregations etc. changes, hence you might run into troubles with data 
correctness with respect to your business logic.


I hope that helps.

Sincerely

Thias


[1] https://nightlies.apache.org/flink/flink-docs-release-1.20/

From: Ammon Diether 
Sent: Wednesday, October 16, 2024 9:00 PM
To: user 
Subject: [External] Terrible bug related to serialization of Pojos in keystate.

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


We have encountered a rather rare, but very nasty bug with Flink related to 
serialization of Pojos in keystate.

-- Timeline --
1) Write a specific item to keystate of class C at Time1, no read of that key 
will happen until step 5.
2) Time elapses
3) class C is schema evolved to include an additional field
4) Time elapses
5) When reading the specific item written above, we get a EOFException being 
thrown to AbstractRocksDBState.migrateSerializedValue
6) Reading the item puts Flink into a restart loop of death.  Manual 
intervention is required.

-- details at the time of writing the value to keystate --
class C {  // at Time1
 private String fieldAA = "AA";
 private String fieldBB = "BB";
}

The serialized buffer looks like so:
02 flag 00 03 41 41 is_null, len+1, 'A', 'A' 00 03 42 42 is_null, len+1, 'B', 
'B'
Serialized Field list is: [fieldAA, fieldBB]

-- schema evolution --
class C {  // at Time3
 private String fieldAA = "AA";
 private Integer fieldAB = -1;
 private String fieldBB = "BB";
}

-- details at the time of reading the value from keystate --
The serialized buffer looks like so:
02 flag 00 03 41 41 is_null, len+1, 'A', 'A' 00 ff ff ff ff is_null, -1 00 03 
42 42 is_null, len+1, 'B', 'B'
Serialized Field list is: [fieldAA, fieldBB]

When reading the buffer, flink will read fieldAA just fine, it will then 
attempt to read ff ff ff ff as a string for fieldBB.  Something has altered the 
buffer such that it has field AB, but the Serialized Field list does not have 
fieldAB.

-- Runtime Details  and notes --
flink 1.14.3
stateful functions
300 GB savepoint size.
The total time elapsed between write and read seems to need to be a few months 
for this corruption to happen.

Questions:
A) Any insight into the general mechanism related to Pojos and serialization?
B) What can cause a keystate to be migrated?  Clearly a read does, what about 
just checkpointing over time, how about reading keystate with a key that is 
"close" to the other key?
C) If a specific key in keystate is deserialized from rockdb, does flink 
deserialize other (adjacent?) keys in the "block" of data?
D) Are there tools for manually editing Flink savepoints?
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Re: OperatorStateFromBackend can't complete initialisation because of high number of savepoint files reads

2024-10-15 Thread Gabor Somogyi
>
> Could you please let us know if you see anything wrong when using
> `execution.checkpointing.snapshot-compression: true` since for us this
> seems to have solved the multiple S3 reads issue.
>
When something is working it's never wrong. The question is why is has been
resolved.
Are you still having state.storage.fs.memory-threshold set to 500Kb? State
compression may reduce the state under this threshold which would make that
work.

For uncompressed state could you please let us know how the change from
> your PR eliminates the multiple calls to S3. Is not very clear to us.
>
Copy from the PR:
Flink state restore from S3 is super slow because skip function is
consuming ~15 seconds for ~6Mb of data.
...
In this PR the skip going to be called only in case of compression because
otherwise a stream is seekable.

G

On Tue, Oct 15, 2024 at 4:30 PM William Wallace 
wrote:

> Thank you for the recommendation and the help.
>
> Could you please let us know if you see anything wrong when using
> `execution.checkpointing.snapshot-compression: true` since for us this
> seems to have solved the multiple S3 reads issue.
>
In debug we see:
> `in.delegate =
> ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream)`
>
> and
> `in.compressionDelegate = SnappyFramedInputStream`
> and  in the logs a file is retrieved only once per subtask
> ```
> DEBUG com.amazonaws.request[] -
> Sending Request: GET 
> https://.../savepoints/flink-compression/.../savepoint-...
> Range: bytes=0-9223372036854775806.
> ```
>
> For uncompressed state could you please let us know how the change from
> your PR eliminates the multiple calls to S3. Is not very clear to us.
>
Thank you.
>
> On Tue, Oct 15, 2024 at 1:42 PM Gabor Somogyi 
> wrote:
>
>> My recommendation is to cherry-pick this PR [1] at top of your Flink
>> distro when possible.
>> Additionally turn off state compression. These should do the trick...
>>
>> [1] https://github.com/apache/flink/pull/25509
>>
>> G
>>
>>
>> On Tue, Oct 15, 2024 at 1:03 PM William Wallace <
>> theanonymous31...@gmail.com> wrote:
>>
>>> Thank you Gabor for your reply.
>>>
>>> I'm sharing below more findings for both uncompressed and compressed
>>> state with the hope it helps. I'm looking further to your thoughts.
>>>
>>> 1. uncompressed state - observe the `stateHandle=RelativeFileStateHandle`
>>> ```
>>> org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation [] -
>>> Finished restoring from state handle:
>>> KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
>>> endKeyGroup=31}}, stateHandle=RelativeFileStateHandle State:
>>> s3p://.../savepoints/flink-no-compression/.../savepoint-.../12345678-...,
>>> 12345678-... [... bytes]}.
>>>  ```
>>>
>>> `FSDataInputStream in.delegate` in
>>> `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues`
>>> is an instance of
>>> `ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream`.
>>> For every `offset: offsets = metaInfo.getOffsets()` we end up doing an
>>> actual partial file read which in our case ends in order of millions
>>> because of high job parallelism (subtasks) and job can't recover.
>>>
>>> 2. compressed state - observe the stateHandle=ByteStreamStateHandle
>>> ```
>>> org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation [] -
>>> Finished restoring from state handle:
>>> KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
>>> endKeyGroup=31}},
>>> stateHandle=ByteStreamStateHandle{handleName='(s3p:.../savepoints/flink-compression/.../savepoint-.../12345678-...',
>>> dataBytes=...}}.
>>> ```
>>> `FSDataInputStream in.delegate` in
>>> `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues`
>>> is an instance if `ByteStreamStateHandle(ByteStateHandleInputStream)
>>> This means that for every `offset: offsets = metaInfo.getOffsets()` we
>>> end up doing a read from a `byte[]` which are faster.
>>>
>>> At this point I don't understand how not doing the `skip` operation in
>>> case of uncompressed state can work, since skip is required for the partial
>>> reads, and I apologise if I'm wrong, I don't have the same level of
>>> understanding as you have.
>>>
>>> What we considered doing was to find a way to actually cache the file as
>>> a byte[] and do the reads from memory ... but it seems the state
>>> compression is doing the same. We are in the process of testing state
>>> compression under production volumes ... can't say how that will actually
>>> work for us.
>>>
>>> Thank you again for looking into this. I'm looking forward for your
>>> thoughts. Please let me know if I missed or misunderstood something. Please
>>> let us know your recommendation.
>>>
>>> On Tue, Oct 15, 2024 at 8:35 AM Gabor Somogyi 
>>> wrote:
>>>
 Hi William,

Re: OperatorStateFromBackend can't complete initialisation because of high number of savepoint files reads

2024-10-15 Thread William Wallace
Thank you for the recommendation and the help.

Could you please let us know if you see anything wrong when using
`execution.checkpointing.snapshot-compression: true` since for us this
seems to have solved the multiple S3 reads issue.
In debug we see:
`in.delegate =
ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream)`

and
`in.compressionDelegate = SnappyFramedInputStream`
and  in the logs a file is retrieved only once per subtask
```
DEBUG com.amazonaws.request[] -
Sending Request: GET https://.../savepoints/flink-compression/.../savepoint-...
Range: bytes=0-9223372036854775806.
```

For uncompressed state could you please let us know how the change from
your PR eliminates the multiple calls to S3. Is not very clear to us.
Thank you.

On Tue, Oct 15, 2024 at 1:42 PM Gabor Somogyi 
wrote:

> My recommendation is to cherry-pick this PR [1] at top of your Flink
> distro when possible.
> Additionally turn off state compression. These should do the trick...
>
> [1] https://github.com/apache/flink/pull/25509
>
> G
>
>
> On Tue, Oct 15, 2024 at 1:03 PM William Wallace <
> theanonymous31...@gmail.com> wrote:
>
>> Thank you Gabor for your reply.
>>
>> I'm sharing below more findings for both uncompressed and compressed
>> state with the hope it helps. I'm looking further to your thoughts.
>>
>> 1. uncompressed state - observe the `stateHandle=RelativeFileStateHandle`
>> ```
>> org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation [] -
>> Finished restoring from state handle:
>> KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
>> endKeyGroup=31}}, stateHandle=RelativeFileStateHandle State:
>> s3p://.../savepoints/flink-no-compression/.../savepoint-.../12345678-...,
>> 12345678-... [... bytes]}.
>>  ```
>>
>> `FSDataInputStream in.delegate` in
>> `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues`
>> is an instance of
>> `ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream`.
>> For every `offset: offsets = metaInfo.getOffsets()` we end up doing an
>> actual partial file read which in our case ends in order of millions
>> because of high job parallelism (subtasks) and job can't recover.
>>
>> 2. compressed state - observe the stateHandle=ByteStreamStateHandle
>> ```
>> org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation [] -
>> Finished restoring from state handle:
>> KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
>> endKeyGroup=31}},
>> stateHandle=ByteStreamStateHandle{handleName='(s3p:.../savepoints/flink-compression/.../savepoint-.../12345678-...',
>> dataBytes=...}}.
>> ```
>> `FSDataInputStream in.delegate` in
>> `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues`
>> is an instance if `ByteStreamStateHandle(ByteStateHandleInputStream)
>> This means that for every `offset: offsets = metaInfo.getOffsets()` we
>> end up doing a read from a `byte[]` which are faster.
>>
>> At this point I don't understand how not doing the `skip` operation in
>> case of uncompressed state can work, since skip is required for the partial
>> reads, and I apologise if I'm wrong, I don't have the same level of
>> understanding as you have.
>>
>> What we considered doing was to find a way to actually cache the file as
>> a byte[] and do the reads from memory ... but it seems the state
>> compression is doing the same. We are in the process of testing state
>> compression under production volumes ... can't say how that will actually
>> work for us.
>>
>> Thank you again for looking into this. I'm looking forward for your
>> thoughts. Please let me know if I missed or misunderstood something. Please
>> let us know your recommendation.
>>
>> On Tue, Oct 15, 2024 at 8:35 AM Gabor Somogyi 
>> wrote:
>>
>>> Hi William,
>>>
>>> It's a bit old question but I think now we know why this is happening.
>>> Please see [1] for further details.
>>> It's an important requirement to use uncompressed state because even
>>> with the fix compressed state is still problematic.
>>>
>>> We've already tested the PR with load but if you can report back it
>>> would be helpful.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-36530
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Fri, Aug 16, 2024 at 11:25 AM William Wallace <
>>> theanonymous31...@gmail.com> wrote:
>>>
 Context

 We have recently upgraded from Flink 1.13.6 to Flink 1.19. We consume
 data from ~ 40k Kafka topic partitions in some environments. We are using
 aligned checkpoints. We set state.storage.fs.memory-threshold: 500kb.

 Problem

 At the point when the state for operator using
 topic-partition-offset-states doesn’t fit in the
 state.storage.fs.memory-threshold, we end up with a proportionally high
 number of

Re: OperatorStateFromBackend can't complete initialisation because of high number of savepoint files reads

2024-10-15 Thread Gabor Somogyi
My recommendation is to cherry-pick this PR [1] at top of your Flink distro
when possible.
Additionally turn off state compression. These should do the trick...

[1] https://github.com/apache/flink/pull/25509

G


On Tue, Oct 15, 2024 at 1:03 PM William Wallace 
wrote:

> Thank you Gabor for your reply.
>
> I'm sharing below more findings for both uncompressed and compressed state
> with the hope it helps. I'm looking further to your thoughts.
>
> 1. uncompressed state - observe the `stateHandle=RelativeFileStateHandle`
> ```
> org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation [] -
> Finished restoring from state handle:
> KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
> endKeyGroup=31}}, stateHandle=RelativeFileStateHandle State:
> s3p://.../savepoints/flink-no-compression/.../savepoint-.../12345678-...,
> 12345678-... [... bytes]}.
>  ```
>
> `FSDataInputStream in.delegate` in
> `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues`
> is an instance of
> `ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream`.
> For every `offset: offsets = metaInfo.getOffsets()` we end up doing an
> actual partial file read which in our case ends in order of millions
> because of high job parallelism (subtasks) and job can't recover.
>
> 2. compressed state - observe the stateHandle=ByteStreamStateHandle
> ```
> org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation [] -
> Finished restoring from state handle:
> KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
> endKeyGroup=31}},
> stateHandle=ByteStreamStateHandle{handleName='(s3p:.../savepoints/flink-compression/.../savepoint-.../12345678-...',
> dataBytes=...}}.
> ```
> `FSDataInputStream in.delegate` in
> `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues`
> is an instance if `ByteStreamStateHandle(ByteStateHandleInputStream)
> This means that for every `offset: offsets = metaInfo.getOffsets()` we end
> up doing a read from a `byte[]` which are faster.
>
> At this point I don't understand how not doing the `skip` operation in
> case of uncompressed state can work, since skip is required for the partial
> reads, and I apologise if I'm wrong, I don't have the same level of
> understanding as you have.
>
> What we considered doing was to find a way to actually cache the file as a
> byte[] and do the reads from memory ... but it seems the state compression
> is doing the same. We are in the process of testing state compression under
> production volumes ... can't say how that will actually work for us.
>
> Thank you again for looking into this. I'm looking forward for your
> thoughts. Please let me know if I missed or misunderstood something. Please
> let us know your recommendation.
>
> On Tue, Oct 15, 2024 at 8:35 AM Gabor Somogyi 
> wrote:
>
>> Hi William,
>>
>> It's a bit old question but I think now we know why this is happening.
>> Please see [1] for further details.
>> It's an important requirement to use uncompressed state because even with
>> the fix compressed state is still problematic.
>>
>> We've already tested the PR with load but if you can report back it would
>> be helpful.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-36530
>>
>> BR,
>> G
>>
>>
>> On Fri, Aug 16, 2024 at 11:25 AM William Wallace <
>> theanonymous31...@gmail.com> wrote:
>>
>>> Context
>>>
>>> We have recently upgraded from Flink 1.13.6 to Flink 1.19. We consume
>>> data from ~ 40k Kafka topic partitions in some environments. We are using
>>> aligned checkpoints. We set state.storage.fs.memory-threshold: 500kb.
>>>
>>> Problem
>>>
>>> At the point when the state for operator using
>>> topic-partition-offset-states doesn’t fit in the
>>> state.storage.fs.memory-threshold, we end up with a proportionally high
>>> number of reads for the checkpoint and savepoint files for each of the
>>> topic partition offsets.
>>>
>>> For example when we have:
>>>
>>> {code}
>>>
>>> [14-Aug-2024 11:39:12.392 UTC] DEBUG
>>> org.apache.flink.runtime.state.TaskStateManagerImpl  [] - Operator
>>> 8992e27ae82755cac12dd37f518df782 has remote state
>>> SubtaskState{operatorStateFromBackend=StateObjectCollection{[OperatorStateHandle{stateNameToPartitionOffsets={
>>> SourceReaderState=StateMetaInfo{offsets=[234, 279, 324, 369, 414, 459,
>>> 504, 549 …(offsets is a list 40k elements)
>>>
>>> {code}
>>>
>>> For each of the metadata offsets we will have S3 reads for
>>> checkpoint/savepoint. The Flink job fails to resume from checkpoint.
>>> With debug logs, we see hundred of thousands of AWS GET calls for the same
>>> checkpoint file, with different offsets. These AWS calls take such a long
>>> time, that our application fails to start and job crashes and starts same
>>> reads again and crashes again.
>>>
>>>
>>> We will have:
>>>
>

Re: OperatorStateFromBackend can't complete initialisation because of high number of savepoint files reads

2024-10-15 Thread William Wallace
Thank you Gabor for your reply.

I'm sharing below more findings for both uncompressed and compressed state
with the hope it helps. I'm looking further to your thoughts.

1. uncompressed state - observe the `stateHandle=RelativeFileStateHandle`
```
org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation [] -
Finished restoring from state handle:
KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
endKeyGroup=31}}, stateHandle=RelativeFileStateHandle State:
s3p://.../savepoints/flink-no-compression/.../savepoint-.../12345678-...,
12345678-... [... bytes]}.
 ```

`FSDataInputStream in.delegate` in
`org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues`
is an instance of
`ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream`.
For every `offset: offsets = metaInfo.getOffsets()` we end up doing an
actual partial file read which in our case ends in order of millions
because of high job parallelism (subtasks) and job can't recover.

2. compressed state - observe the stateHandle=ByteStreamStateHandle
```
org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation [] -
Finished restoring from state handle:
KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
endKeyGroup=31}},
stateHandle=ByteStreamStateHandle{handleName='(s3p:.../savepoints/flink-compression/.../savepoint-.../12345678-...',
dataBytes=...}}.
```
`FSDataInputStream in.delegate` in
`org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues`
is an instance if `ByteStreamStateHandle(ByteStateHandleInputStream)
This means that for every `offset: offsets = metaInfo.getOffsets()` we end
up doing a read from a `byte[]` which are faster.

At this point I don't understand how not doing the `skip` operation in case
of uncompressed state can work, since skip is required for the partial
reads, and I apologise if I'm wrong, I don't have the same level of
understanding as you have.

What we considered doing was to find a way to actually cache the file as a
byte[] and do the reads from memory ... but it seems the state compression
is doing the same. We are in the process of testing state compression under
production volumes ... can't say how that will actually work for us.

Thank you again for looking into this. I'm looking forward for your
thoughts. Please let me know if I missed or misunderstood something. Please
let us know your recommendation.

On Tue, Oct 15, 2024 at 8:35 AM Gabor Somogyi 
wrote:

> Hi William,
>
> It's a bit old question but I think now we know why this is happening.
> Please see [1] for further details.
> It's an important requirement to use uncompressed state because even with
> the fix compressed state is still problematic.
>
> We've already tested the PR with load but if you can report back it would
> be helpful.
>
> [1] https://issues.apache.org/jira/browse/FLINK-36530
>
> BR,
> G
>
>
> On Fri, Aug 16, 2024 at 11:25 AM William Wallace <
> theanonymous31...@gmail.com> wrote:
>
>> Context
>>
>> We have recently upgraded from Flink 1.13.6 to Flink 1.19. We consume
>> data from ~ 40k Kafka topic partitions in some environments. We are using
>> aligned checkpoints. We set state.storage.fs.memory-threshold: 500kb.
>>
>> Problem
>>
>> At the point when the state for operator using
>> topic-partition-offset-states doesn’t fit in the
>> state.storage.fs.memory-threshold, we end up with a proportionally high
>> number of reads for the checkpoint and savepoint files for each of the
>> topic partition offsets.
>>
>> For example when we have:
>>
>> {code}
>>
>> [14-Aug-2024 11:39:12.392 UTC] DEBUG
>> org.apache.flink.runtime.state.TaskStateManagerImpl  [] - Operator
>> 8992e27ae82755cac12dd37f518df782 has remote state
>> SubtaskState{operatorStateFromBackend=StateObjectCollection{[OperatorStateHandle{stateNameToPartitionOffsets={
>> SourceReaderState=StateMetaInfo{offsets=[234, 279, 324, 369, 414, 459,
>> 504, 549 …(offsets is a list 40k elements)
>>
>> {code}
>>
>> For each of the metadata offsets we will have S3 reads for
>> checkpoint/savepoint. The Flink job fails to resume from checkpoint.
>> With debug logs, we see hundred of thousands of AWS GET calls for the same
>> checkpoint file, with different offsets. These AWS calls take such a long
>> time, that our application fails to start and job crashes and starts same
>> reads again and crashes again.
>>
>>
>> We will have:
>>
>> {code}
>>
>> [14-Aug-2024 09:32:49.218 UTC] DEBUG com.amazonaws.request
>> [] - Sending Request: GET
>> https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18
>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
>> Range: bytes=234-9223372036854775806, User-Agent:  ,
>> cfg/retry-mode/legacy, presto, )
>>
>> [14-Aug-20

Re: Flink job can't complete initialisation because of millions of savepoint file reads

2024-10-15 Thread Gabor Somogyi
Hi Alex,

Please see my comment here [1].

[1] https://lists.apache.org/thread/h5mv6ld4l2g4hsjszfdos9f365nh7ctf

BR,
G


On Mon, Sep 2, 2024 at 11:02 AM Alex K.  wrote:

> We have an issue where a savepoint file containing Kafka topic partitions
> offsets is requested millions of times from AWS S3. This results in the
> job crashing and then followed by a restart and crashing again. We have
> tracked the high number of reads (~3 millions) to Kafka topic partitions
> (~40k) multiplied by job parallelism (70 slots). We are using Flink
> 1.19.0, KafkaSource and savepoints/checkpoints are stored in AWS S3.
>
> We increased the state.storage.fs.memory-threshold to 700kb, which results
> in the Kafka topic partition offsets being written in the _metadata
> savepoint file and implicitly eliminates the problem from above. Our topics
> and partitions are increasing weekly so we will reach the
> state.storage.fs.memory-threshold max value limit of 1mb soon.
>
> Is this behaviour expected and in such case could it be optimised by
> reducing the high number of reads, by caching the file or by some other
> configuration we are not aware of?
>
> Thank you
>
>


Re: OperatorStateFromBackend can't complete initialisation because of high number of savepoint files reads

2024-10-15 Thread Gabor Somogyi
Hi William,

It's a bit old question but I think now we know why this is happening.
Please see [1] for further details.
It's an important requirement to use uncompressed state because even with
the fix compressed state is still problematic.

We've already tested the PR with load but if you can report back it would
be helpful.

[1] https://issues.apache.org/jira/browse/FLINK-36530

BR,
G


On Fri, Aug 16, 2024 at 11:25 AM William Wallace <
theanonymous31...@gmail.com> wrote:

> Context
>
> We have recently upgraded from Flink 1.13.6 to Flink 1.19. We consume data
> from ~ 40k Kafka topic partitions in some environments. We are using
> aligned checkpoints. We set state.storage.fs.memory-threshold: 500kb.
>
> Problem
>
> At the point when the state for operator using
> topic-partition-offset-states doesn’t fit in the
> state.storage.fs.memory-threshold, we end up with a proportionally high
> number of reads for the checkpoint and savepoint files for each of the
> topic partition offsets.
>
> For example when we have:
>
> {code}
>
> [14-Aug-2024 11:39:12.392 UTC] DEBUG
> org.apache.flink.runtime.state.TaskStateManagerImpl  [] - Operator
> 8992e27ae82755cac12dd37f518df782 has remote state
> SubtaskState{operatorStateFromBackend=StateObjectCollection{[OperatorStateHandle{stateNameToPartitionOffsets={
> SourceReaderState=StateMetaInfo{offsets=[234, 279, 324, 369, 414, 459,
> 504, 549 …(offsets is a list 40k elements)
>
> {code}
>
> For each of the metadata offsets we will have S3 reads for
> checkpoint/savepoint. The Flink job fails to resume from checkpoint. With
> debug logs, we see hundred of thousands of AWS GET calls for the same
> checkpoint file, with different offsets. These AWS calls take such a long
> time, that our application fails to start and job crashes and starts same
> reads again and crashes again.
>
>
> We will have:
>
> {code}
>
> [14-Aug-2024 09:32:49.218 UTC] DEBUG com.amazonaws.request
> [] - Sending Request: GET
> https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18
> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
> Range: bytes=234-9223372036854775806, User-Agent:  ,
> cfg/retry-mode/legacy, presto, )
>
> [14-Aug-2024 11:39:12.476 UTC] DEBUG com.amazonaws.request
> [] - Sending Request: GET
> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
> Range: bytes=234-9223372036854775806, User-Agent: ,
> cfg/retry-mode/legacy, presto, )
>
> [14-Aug-2024 09:32:49.286 UTC] DEBUG com.amazonaws.request
> [] - Sending Request: GET
> https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18
> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
> Range: bytes=279-9223372036854775806, User-Agent: , cfg/retry-mode/legacy,
> presto, )
>
> [14-Aug-2024 11:39:12.530 UTC] DEBUG com.amazonaws.request
> [] - Sending Request: GET
> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
> Range: bytes=279-9223372036854775806, User-Agent: , cfg/retry-mode/legacy,
> presto, )
>
> {code}
>
> Code which does the multiple reads was isolated to:
>
> {code}
>
>
> org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues
>
>  private  void deserializeOperatorStateValues(
>
> PartitionableListState stateListForName,
>
> FSDataInputStream in,
>
> OperatorStateHandle.StateMetaInfo metaInfo)
>
> throws IOException {
>
> if (null != metaInfo) {
>
> long[] offsets = metaInfo.getOffsets();
>
> if (null != offsets) {
>
> DataInputView div = new DataInputViewStreamWrapper(in);
>
> TypeSerializer serializer =
>
>
> 
> stateListForName.getStateMetaInfo().getPartitionStateSerializer();
>
> for (long offset : offsets) {
>
> in.seek(offset);
>
> stateListForName.add(serializer.deserialize(div));
>
> }
>
> }
>
> }
>
> {code}
>
> Questions:
>
>1.
>
>Please review the behaviour from above and advise if this is expected?
>2.
>
>The reads for the topic-partition-offset-states are similar to:
>
> {code}
>
> Sending Request: GET
> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
> Range: bytes=234-9223372036854775806
>
> Sending Request: GET
> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
> Headers: (amz-sdk-invocation-id:, C

Re: Flink custom sink

2024-10-14 Thread Yanquan Lv
Sorry, I couldn't find any clear and detailed user guidance other than FLIP in 
the official documentation too.


> 2024年10月15日 01:39,Anil Dasari  写道:
> 
> Hi Yanquan,
> I've finished reading the Sink FLIPs and am now reviewing some of the sink 
> implementations, like TestSinkV2, to better understand the flow. I'll write a 
> new one to experiment with.
> Are there flink sink docs/flow daigrams like detailed source implementation 
> docs like 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/
>  ?
> 
> Thanks.
> 
> On Mon, Oct 14, 2024 at 10:18 AM Yanquan Lv  > wrote:
>> Hi, Anil.
>> For your scenario, I think looking at FLIP-143 first and then FLIP-191 
>> should provide a better understanding. Then you can look at other FLIPs or 
>> specific implementations.
>> 
>> Anil Dasari mailto:adas...@guidewire.com>> 
>> 于2024年10月15日周二 00:55写道:
>>> Got it. thanks. 
>>> Sink improvements have many FLIP confluence pages i.e FLIP-143, 171, 177 
>>> and 191. So, Is there a sequence of steps flow charts for better 
>>> understanding of the sink process with sink, writer and committer ? 
>>> 
>>> Thanks
>>> 
>>> On Mon, Oct 14, 2024 at 9:48 AM Yanquan Lv >> > wrote:
 Yeah, TwoPhaseCommittingSink will be removed in Flink 2.0, and it will be 
 replaced by SupportsCommitter[1] interface, which was introduced in Flink 
 1.19.
 But you can still use TwoPhaseCommittingSink under Flink 2.0, it depends 
 on your target Flink version, The interfaces of these two APIs are almost 
 identical.
 
 [1] 
 https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html
 
 Anil Dasari mailto:adas...@guidewire.com>> 
 于2024年10月14日周一 23:26写道:
> Hi Yanquan,
> Thanks for sharing the information.
> It appears that TwoPhaseCommittingSink is not available in the flink repo 
> main branch. it is replaced with Sink, Committer and SinkWritter ?
> 
> Thanks
> 
> On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv  > wrote:
>> Hi, Anil.
>> 
>> Iceberg Sink is merged recently in 
>> https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880.
>> 
>> From your description, I guess that what you need is a 
>> TwoPhaseCommittingSink[1], the steps you listed can be executed with the 
>> following steps:
>> 
>> > 1. Group data by category and write it to S3 under its respective 
>> > prefix.
>> This can be done in 
>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write
>>  method.
>> 
>> > 2. Update category metrics in a manifest file stored in the S3 
>> > manifest prefix.
>> > 3. Send category metrics to an external system to initiate consumption.
>> These metrics information could be passed by  
>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit
>>  method.
>> And `Update category metrics`/`Send category metrics` can be done in 
>> org.apache.flink.api.connector.sink2.Committer#commit method.
>> 
>> Rollback action could be done in SinkWriter or Committer, to delete 
>> files from S3, you need to pass the files information though 
>> PrecommittingSinkWriter#prepareCommit too. Then you can throw exception 
>> to let Flink job failover and retry.
>> 
>> 
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html
>> 
>> 
>> 
>>> 2024年10月14日 12:49,Anil Dasari >> > 写道:
>>> 
>>> Hello,
>>> I am looking to implement a Flink sink for the following use case, 
>>> where the steps below are executed for each microbatch (using Spark 
>>> terminology) or trigger:
>>> 
>>> Group data by category and write it to S3 under its respective prefix.
>>> Update category metrics in a manifest file stored in the S3 manifest 
>>> prefix.
>>> Send category metrics to an external system to initiate consumption.
>>> In case of failure, any previously completed steps should be rolled 
>>> back, i.e., delete files from S3 and reprocess the entire microbatch.
>>> 
>>> It seems this pattern can be implemented using the Unified Sink API, as 
>>> discussed in this video: https://www.youtube.com/watch?v=0GVI25OEs4A.
>>> 
>>> I'm currently reviewing FileSink and IcebergSink (still searching for 
>>> the source code) to understand their implementations and create a new 
>>> one. 
>>> 
>>> Are there any step-by-step docs or examples available for writing a new 
>>> unified sink?
>>> 
>>> Thanks
>>> 
>>> 
>>> 
>> 



Re: Flink custom sink

2024-10-14 Thread Anil Dasari
Hi Yanquan,
I've finished reading the Sink FLIPs and am now reviewing some of the sink
implementations, like TestSinkV2, to better understand the flow. I'll write
a new one to experiment with.
Are there flink sink docs/flow daigrams like detailed source implementation
docs like
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/
?

Thanks.

On Mon, Oct 14, 2024 at 10:18 AM Yanquan Lv  wrote:

> Hi, Anil.
> For your scenario, I think looking at FLIP-143 first and then FLIP-191
> should provide a better understanding. Then you can look at other FLIPs or
> specific implementations.
>
> Anil Dasari  于2024年10月15日周二 00:55写道:
>
>> Got it. thanks.
>> Sink improvements have many FLIP confluence pages i.e FLIP-143, 171, 177
>> and 191. So, Is there a sequence of steps flow charts for better
>> understanding of the sink process with sink, writer and committer ?
>>
>> Thanks
>>
>> On Mon, Oct 14, 2024 at 9:48 AM Yanquan Lv  wrote:
>>
>>> Yeah, TwoPhaseCommittingSink will be removed in Flink 2.0, and it will
>>> be replaced by SupportsCommitter[1] interface, which was introduced in
>>> Flink 1.19.
>>> But you can still use TwoPhaseCommittingSink under Flink 2.0, it depends
>>> on your target Flink version, The interfaces of these two APIs are almost
>>> identical.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html
>>> 
>>>
>>> Anil Dasari  于2024年10月14日周一 23:26写道:
>>>
 Hi Yanquan,
 Thanks for sharing the information.
 It appears that TwoPhaseCommittingSink is not available in the flink
 repo main branch. it is replaced with Sink, Committer and SinkWritter ?

 Thanks

 On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv 
 wrote:

> Hi, Anil.
>
> Iceberg Sink is merged recently in
> https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880
> 
> .
>
> From your description, I guess that what you need is
> a TwoPhaseCommittingSink[1], the steps you listed can be executed with the
> following steps:
>
> > 1. Group data by category and write it to S3 under its respective
> prefix.
> This can be done in
> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write
>  method.
>
> > 2. Update category metrics in a manifest file stored in the S3
> manifest prefix.
> > 3. Send category metrics to an external system to initiate
> consumption.
> These metrics information could be passed by
>  
> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit
> method.
> And `Update category metrics`/`Send category metrics` can be done in
> org.apache.flink.api.connector.sink2.Committer#commit method.
>
> Rollback action could be done in SinkWriter or Committer, to delete
> files from S3, you need to pass the files information though
> PrecommittingSinkWriter#prepareCommit too. Then you can throw exception to
> let Flink job failover and retry.
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html
> 
>
>
>
> 2024年10月14日 12:49,Anil Dasari  写道:
>
> Hello,
>
> I am looking to implement a Flink sink for the following use case,
> where the steps below are executed for each microbatch (using Spark
> terminology) or trigger:
>
>1. Group data by category and write it to S3 under its respective
>prefix.
>2. Update category metrics in a manifest file stored in the S3
>manifest prefix.
>3. Send category metrics to an external system to initiate
>consumption.
>
> In case of failure, any previously completed steps should be rolled
> back, i.e., delete files from S3 and reprocess the entire microbatch.
>
> It seems this pattern can be implemented using the Unified Sink API,
> as discussed in this video:
> https://www.youtube.com/watch?v=0GVI25OEs4A
> 
> .
>
> I'm currently reviewing FileSink and IcebergSink (still searching for
> the source code) to understand their implementations and create a new one.
>
> Are there any step-by-step docs or examples available for writing a
> new unified sink?
>
> Thanks
>
>
>
>


Re: Flink custom sink

2024-10-14 Thread Yanquan Lv
Hi, Anil.
For your scenario, I think looking at FLIP-143 first and then FLIP-191
should provide a better understanding. Then you can look at other FLIPs or
specific implementations.

Anil Dasari  于2024年10月15日周二 00:55写道:

> Got it. thanks.
> Sink improvements have many FLIP confluence pages i.e FLIP-143, 171, 177
> and 191. So, Is there a sequence of steps flow charts for better
> understanding of the sink process with sink, writer and committer ?
>
> Thanks
>
> On Mon, Oct 14, 2024 at 9:48 AM Yanquan Lv  wrote:
>
>> Yeah, TwoPhaseCommittingSink will be removed in Flink 2.0, and it will be
>> replaced by SupportsCommitter[1] interface, which was introduced in Flink
>> 1.19.
>> But you can still use TwoPhaseCommittingSink under Flink 2.0, it depends
>> on your target Flink version, The interfaces of these two APIs are almost
>> identical.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html
>>
>> Anil Dasari  于2024年10月14日周一 23:26写道:
>>
>>> Hi Yanquan,
>>> Thanks for sharing the information.
>>> It appears that TwoPhaseCommittingSink is not available in the flink
>>> repo main branch. it is replaced with Sink, Committer and SinkWritter ?
>>>
>>> Thanks
>>>
>>> On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv  wrote:
>>>
 Hi, Anil.

 Iceberg Sink is merged recently in
 https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880
 .

 From your description, I guess that what you need is
 a TwoPhaseCommittingSink[1], the steps you listed can be executed with the
 following steps:

 > 1. Group data by category and write it to S3 under its respective
 prefix.
 This can be done in
 org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write
  method.

 > 2. Update category metrics in a manifest file stored in the S3
 manifest prefix.
 > 3. Send category metrics to an external system to initiate
 consumption.
 These metrics information could be passed by
  
 org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit
 method.
 And `Update category metrics`/`Send category metrics` can be done in
 org.apache.flink.api.connector.sink2.Committer#commit method.

 Rollback action could be done in SinkWriter or Committer, to delete
 files from S3, you need to pass the files information though
 PrecommittingSinkWriter#prepareCommit too. Then you can throw exception to
 let Flink job failover and retry.


 [1]
 https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html



 2024年10月14日 12:49,Anil Dasari  写道:

 Hello,

 I am looking to implement a Flink sink for the following use case,
 where the steps below are executed for each microbatch (using Spark
 terminology) or trigger:

1. Group data by category and write it to S3 under its respective
prefix.
2. Update category metrics in a manifest file stored in the S3
manifest prefix.
3. Send category metrics to an external system to initiate
consumption.

 In case of failure, any previously completed steps should be rolled
 back, i.e., delete files from S3 and reprocess the entire microbatch.

 It seems this pattern can be implemented using the Unified Sink API, as
 discussed in this video: https://www.youtube.com/watch?v=0GVI25OEs4A.

 I'm currently reviewing FileSink and IcebergSink (still searching for
 the source code) to understand their implementations and create a new one.

 Are there any step-by-step docs or examples available for writing a new
 unified sink?

 Thanks






Re: Flink custom sink

2024-10-14 Thread Anil Dasari
Got it. thanks.
Sink improvements have many FLIP confluence pages i.e FLIP-143, 171, 177
and 191. So, Is there a sequence of steps flow charts for better
understanding of the sink process with sink, writer and committer ?

Thanks

On Mon, Oct 14, 2024 at 9:48 AM Yanquan Lv  wrote:

> Yeah, TwoPhaseCommittingSink will be removed in Flink 2.0, and it will be
> replaced by SupportsCommitter[1] interface, which was introduced in Flink
> 1.19.
> But you can still use TwoPhaseCommittingSink under Flink 2.0, it depends
> on your target Flink version, The interfaces of these two APIs are almost
> identical.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html
> 
>
> Anil Dasari  于2024年10月14日周一 23:26写道:
>
>> Hi Yanquan,
>> Thanks for sharing the information.
>> It appears that TwoPhaseCommittingSink is not available in the flink repo
>> main branch. it is replaced with Sink, Committer and SinkWritter ?
>>
>> Thanks
>>
>> On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv  wrote:
>>
>>> Hi, Anil.
>>>
>>> Iceberg Sink is merged recently in
>>> https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880
>>> 
>>> .
>>>
>>> From your description, I guess that what you need is
>>> a TwoPhaseCommittingSink[1], the steps you listed can be executed with the
>>> following steps:
>>>
>>> > 1. Group data by category and write it to S3 under its respective
>>> prefix.
>>> This can be done in
>>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write
>>>  method.
>>>
>>> > 2. Update category metrics in a manifest file stored in the S3
>>> manifest prefix.
>>> > 3. Send category metrics to an external system to initiate consumption.
>>> These metrics information could be passed by
>>>  
>>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit
>>> method.
>>> And `Update category metrics`/`Send category metrics` can be done in
>>> org.apache.flink.api.connector.sink2.Committer#commit method.
>>>
>>> Rollback action could be done in SinkWriter or Committer, to delete
>>> files from S3, you need to pass the files information though
>>> PrecommittingSinkWriter#prepareCommit too. Then you can throw exception to
>>> let Flink job failover and retry.
>>>
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html
>>> 
>>>
>>>
>>>
>>> 2024年10月14日 12:49,Anil Dasari  写道:
>>>
>>> Hello,
>>>
>>> I am looking to implement a Flink sink for the following use case, where
>>> the steps below are executed for each microbatch (using Spark terminology)
>>> or trigger:
>>>
>>>1. Group data by category and write it to S3 under its respective
>>>prefix.
>>>2. Update category metrics in a manifest file stored in the S3
>>>manifest prefix.
>>>3. Send category metrics to an external system to initiate
>>>consumption.
>>>
>>> In case of failure, any previously completed steps should be rolled
>>> back, i.e., delete files from S3 and reprocess the entire microbatch.
>>>
>>> It seems this pattern can be implemented using the Unified Sink API, as
>>> discussed in this video: https://www.youtube.com/watch?v=0GVI25OEs4A
>>> 
>>> .
>>>
>>> I'm currently reviewing FileSink and IcebergSink (still searching for
>>> the source code) to understand their implementations and create a new one.
>>>
>>> Are there any step-by-step docs or examples available for writing a new
>>> unified sink?
>>>
>>> Thanks
>>>
>>>
>>>
>>>


Re: Flink custom sink

2024-10-14 Thread Yanquan Lv
Yeah, TwoPhaseCommittingSink will be removed in Flink 2.0, and it will be
replaced by SupportsCommitter[1] interface, which was introduced in Flink
1.19.
But you can still use TwoPhaseCommittingSink under Flink 2.0, it depends on
your target Flink version, The interfaces of these two APIs are almost
identical.

[1]
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html

Anil Dasari  于2024年10月14日周一 23:26写道:

> Hi Yanquan,
> Thanks for sharing the information.
> It appears that TwoPhaseCommittingSink is not available in the flink repo
> main branch. it is replaced with Sink, Committer and SinkWritter ?
>
> Thanks
>
> On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv  wrote:
>
>> Hi, Anil.
>>
>> Iceberg Sink is merged recently in
>> https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880
>> .
>>
>> From your description, I guess that what you need is
>> a TwoPhaseCommittingSink[1], the steps you listed can be executed with the
>> following steps:
>>
>> > 1. Group data by category and write it to S3 under its respective
>> prefix.
>> This can be done in
>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write
>>  method.
>>
>> > 2. Update category metrics in a manifest file stored in the S3 manifest
>> prefix.
>> > 3. Send category metrics to an external system to initiate consumption.
>> These metrics information could be passed by
>>  
>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit
>> method.
>> And `Update category metrics`/`Send category metrics` can be done in
>> org.apache.flink.api.connector.sink2.Committer#commit method.
>>
>> Rollback action could be done in SinkWriter or Committer, to delete files
>> from S3, you need to pass the files information though
>> PrecommittingSinkWriter#prepareCommit too. Then you can throw exception to
>> let Flink job failover and retry.
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html
>>
>>
>>
>> 2024年10月14日 12:49,Anil Dasari  写道:
>>
>> Hello,
>>
>> I am looking to implement a Flink sink for the following use case, where
>> the steps below are executed for each microbatch (using Spark terminology)
>> or trigger:
>>
>>1. Group data by category and write it to S3 under its respective
>>prefix.
>>2. Update category metrics in a manifest file stored in the S3
>>manifest prefix.
>>3. Send category metrics to an external system to initiate
>>consumption.
>>
>> In case of failure, any previously completed steps should be rolled back,
>> i.e., delete files from S3 and reprocess the entire microbatch.
>>
>> It seems this pattern can be implemented using the Unified Sink API, as
>> discussed in this video: https://www.youtube.com/watch?v=0GVI25OEs4A.
>>
>> I'm currently reviewing FileSink and IcebergSink (still searching for
>> the source code) to understand their implementations and create a new one.
>>
>> Are there any step-by-step docs or examples available for writing a new
>> unified sink?
>>
>> Thanks
>>
>>
>>
>>


Re: Flink custom sink

2024-10-14 Thread Anil Dasari
Hi Yanquan,
Thanks for sharing the information.
It appears that TwoPhaseCommittingSink is not available in the flink repo
main branch. it is replaced with Sink, Committer and SinkWritter ?

Thanks

On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv  wrote:

> Hi, Anil.
>
> Iceberg Sink is merged recently in
> https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880
> 
> .
>
> From your description, I guess that what you need is
> a TwoPhaseCommittingSink[1], the steps you listed can be executed with the
> following steps:
>
> > 1. Group data by category and write it to S3 under its respective prefix.
> This can be done in
> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write
>  method.
>
> > 2. Update category metrics in a manifest file stored in the S3 manifest
> prefix.
> > 3. Send category metrics to an external system to initiate consumption.
> These metrics information could be passed by
>  
> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit
> method.
> And `Update category metrics`/`Send category metrics` can be done in
> org.apache.flink.api.connector.sink2.Committer#commit method.
>
> Rollback action could be done in SinkWriter or Committer, to delete files
> from S3, you need to pass the files information though
> PrecommittingSinkWriter#prepareCommit too. Then you can throw exception to
> let Flink job failover and retry.
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html
> 
>
>
>
> 2024年10月14日 12:49,Anil Dasari  写道:
>
> Hello,
>
> I am looking to implement a Flink sink for the following use case, where
> the steps below are executed for each microbatch (using Spark terminology)
> or trigger:
>
>1. Group data by category and write it to S3 under its respective
>prefix.
>2. Update category metrics in a manifest file stored in the S3
>manifest prefix.
>3. Send category metrics to an external system to initiate consumption.
>
> In case of failure, any previously completed steps should be rolled back,
> i.e., delete files from S3 and reprocess the entire microbatch.
>
> It seems this pattern can be implemented using the Unified Sink API, as
> discussed in this video: https://www.youtube.com/watch?v=0GVI25OEs4A
> 
> .
>
> I'm currently reviewing FileSink and IcebergSink (still searching for the
> source code) to understand their implementations and create a new one.
>
> Are there any step-by-step docs or examples available for writing a new
> unified sink?
>
> Thanks
>
>
>
>


Re: Flink custom sink

2024-10-14 Thread Yanquan Lv
Hi, Anil.

Iceberg Sink is merged recently in 
https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880.

From your description, I guess that what you need is a 
TwoPhaseCommittingSink[1], the steps you listed can be executed with the 
following steps:

> 1. Group data by category and write it to S3 under its respective prefix.
This can be done in 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write
 method.

> 2. Update category metrics in a manifest file stored in the S3 manifest 
> prefix.
> 3. Send category metrics to an external system to initiate consumption.
These metrics information could be passed by  
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit
 method.
And `Update category metrics`/`Send category metrics` can be done in 
org.apache.flink.api.connector.sink2.Committer#commit method.

Rollback action could be done in SinkWriter or Committer, to delete files from 
S3, you need to pass the files information though 
PrecommittingSinkWriter#prepareCommit too. Then you can throw exception to let 
Flink job failover and retry.


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html



> 2024年10月14日 12:49,Anil Dasari  写道:
> 
> Hello,
> I am looking to implement a Flink sink for the following use case, where the 
> steps below are executed for each microbatch (using Spark terminology) or 
> trigger:
> 
> Group data by category and write it to S3 under its respective prefix.
> Update category metrics in a manifest file stored in the S3 manifest prefix.
> Send category metrics to an external system to initiate consumption.
> In case of failure, any previously completed steps should be rolled back, 
> i.e., delete files from S3 and reprocess the entire microbatch.
> 
> It seems this pattern can be implemented using the Unified Sink API, as 
> discussed in this video: https://www.youtube.com/watch?v=0GVI25OEs4A.
> 
> I'm currently reviewing FileSink and IcebergSink (still searching for the 
> source code) to understand their implementations and create a new one. 
> 
> Are there any step-by-step docs or examples available for writing a new 
> unified sink?
> 
> Thanks
> 
> 
> 



Re: We found a deadlock problem when a single piece of data is too large on Flink1.13.2, do not continue to process the data, which more understand the data transmission piece, welcome to comment.

2024-10-11 Thread rui chen
The process stack is as follows:

"Map (1/10)#4" #346 prio=5 os_prio=0 tid=0x023e9800 nid=0x5a7f
waiting on condition [0x7fc291fd8000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00079c54dce8> (a
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:347)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:319)
at
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:338)
at
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:314)
at
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:258)
at
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:147)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
at
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:93)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44)
at
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)
at
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$218/175531.runDefaultAction(Unknown
Source)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$337/2022158031.run(Unknown
Source)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)

rui chen  于2024年10月9日周三 11:38写道:

> We found a deadlock problem when a single piece of data is too large on
> Flink1.13.2, do not continue to process the data, which more understand the
> data transmission piece, welcome to comment.
>
>


Re: Stopping the flink 1.18 program with savepoint seems to fail with timeout

2024-10-11 Thread Mate Czagany
Hi,

In the background it is a REST call to Flink. If it takes too long to
create the savepoint, you might hit a timeout. You can increase this using
the configuration client.timeout [1]. You can also use the --detached
option for the stop action, which will return once it receives a trigger ID
from Flink. [2]

Best,
Mate

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/config/#client-timeout
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/cli/#creating-a-savepoint

Sachin Mittal  ezt írta (időpont: 2024. okt. 11., P,
6:19):

> Hello,
> I am running a flink job which I stop it with a savepoint:
>
> ./bin/flink stop --savepointPath /tmp/flink-savepoints
> 0b3b584a298afa372491eff5e3d2160b
> Suspending job "0b3b584a298afa372491eff5e3d2160b" with a CANONICAL
> savepoint.
>
>
> However this is what I get in the cli
> 
> The program finished with the following exception:
>
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job
> "0b3b584a298afa372491eff5e3d2160b".
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:595)
> at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1041)
> at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:578)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1110)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at
> org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
> Caused by: java.util.concurrent.TimeoutException
> at
> java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1950)
> at
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2085)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:591)
> ... 7 more
>
>
> What I also see is that actually a savepoint does get generated at the
> specified path and my flink job is also stopped after a while.
>
> Is there any setting which is making the cli program to timeout and is
> there a way we can verify that the entire savepoint got generated on the
> specified path ?
>
> Thanks
> Sachin
>
>


Re: Request to joining Apache Flink community on Slack

2024-10-10 Thread Leonard Xu
Here, Anil

https://join.slack.com/t/apache-flink/shared_invite/zt-2s64yiul3-qhMWEJY2BmOMWyZdXDUgRQ

And I’ll update the expired invitation link in  flink website soon.

Best,
Leonard

> 2024年10月10日 下午11:03,Anil Dasari  写道:
> 
> Hello Leonard, Could you please send an invite to join the slack community ? 
> thanks in advance.
> 
> Regards,
> Anil
> 
> On Wed, Oct 9, 2024 at 8:29 PM Leonard Xu  > wrote:
> Welcome Ken, I’ve sent the invitation to your email.
> 
> 
> Best,
> Leonard
> 
> 
>> 2024年10月10日 上午3:52,Ken CHUAN YU > > 写道:
>> 
>> Hi there
>> 
>> I like to join the Apache Flink community on Slack, my mail address for 
>> slack is: ken.h...@vestiairecollective.com 
>>   
> 



Re: Request to joining Apache Flink community on Slack

2024-10-10 Thread Anil Dasari
Hello Leonard, Could you please send an invite to join the slack community
? thanks in advance.

Regards,
Anil

On Wed, Oct 9, 2024 at 8:29 PM Leonard Xu  wrote:

> Welcome Ken, I’ve sent the invitation to your email.
>
>
> Best,
> Leonard
>
>
> 2024年10月10日 上午3:52,Ken CHUAN YU  写道:
>
> Hi there
>
> I like to join the Apache Flink community on Slack, my mail address for
> slack is: ken.h...@vestiairecollective.com
>
>
>


Re: Flink SQL CDC connector do nothing to the setting "debizum.*" when create the source table

2024-10-09 Thread Leonard Xu


> 2024年10月10日 上午4:34,Yaroslav Tkachenko  写道:
> 
> Hi Ken,
> 
> Snapshotting is implemented differently in Flink CDC, it doesn't re-use 
> Debezium's implementation. So you can override some Debezium properties using 
> "debezium.", but not "debezium.snapshot.".

Yeah, Yaroslav is right that Flink CDC has its own snapshot mode, i.e. 
scan.startup.mode [1], and we also have plan to disable debezium.snapshot.* to 
avoid user confusion.

Best,
Leonard
[1]https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/mysql-cdc/

> 
> On Wed, Oct 9, 2024 at 12:46 PM Ken CHUAN YU  <mailto:ken.hung...@gmail.com>> wrote:
> Hi there
> I have issue to use flink sql connector to capture change data from 
> MariaDB(MySQL) when configure “debezium.* settings here are more details:
> I have following table in the source database (MariaDB):
> ‘’’CREATE TABLE `client_test` (
> `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
> `name` varchar(500) NOT NULL DEFAULT '',
> `age` int(11) NOT NULL,
> PRIMARY KEY (`id`)
> );
> ‘’'
> 
> Becasue some reason I need only partial data in this table for the snapshot 
> so I define the Flink stream table as follow:
> 
> ‘’’CREATE TABLE client_cdc (
> id DOUBLE,
> name VARCHAR(500),
> age DOUBLE,
> PRIMARY KEY(id) NOT ENFORCED
> )
> WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = 'mariadb',
> 'port' = '3306',
> 'username' = 'ooo',
> 'password' = 'ooo',
> 'database-name' = 'xxx',
> 'scan.startup.mode' = 'initial',
> 'table-name' = 'client_test',
> 'debezium.snapshot.query.mode' = 'custom',
> 'debezium.snapshot.select.statement.overrides' = 'xxx.client_test',
> 'debezium.snapshot.select.statement.overrides.xxx.client_test' = 'SELECT * 
> FROM xxx.client_test WHERE id > 3'
> );
> ‘’’
> Above, I tried do filter out the rows which id is less than 3 when sanpshot. 
> But after execute select * from client_cdc; in flink client I can still see 
> all the sanpshot.
> I also try to run this:
> 
> ‘’’CREATE TABLE client_cdc (
> id DOUBLE,
> name VARCHAR(500),
> age DOUBLE,
> PRIMARY KEY(id) NOT ENFORCED
> )
> WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = 'mariadb',
> 'port' = '3306',
> 'username' = 'ooo',
> 'password' = 'ooo',
> 'database-name' = 'xxx',
> 'scan.startup.mode' = 'initial',
> 'table-name' = 'client_test',
> 'debezium.snapshot.query.mode' = 'custom',
> 'debezium.snapshot.select.statement.overrides' = 'xxx.client_test',
> 'debezium.snapshot.select.statement.overrides.xxx.client_test' = 'this should 
> failed SELECT * FROM xxx.client_test WHERE id > 3'
> );
> ‘’'
> This time I give an invaild query to 
> 'debezium.snapshot.select.statement.overrides.xxx.client_test' but I can 
> still execute select * from client_cdc; and it still take a full snapshot. In 
> other word it seems to me the Flink CDC connector is ignoring the settings 
> are Prefix debezium.* Am I missing anything here?
> According to the document I be able to config the debezium but doesn’t seems 
> the case.
> 
> The expectation is to see only rows selected during snapshot in 
> configuration:  
> "debezium.snapshot.select.statement.overrides.[database].[table] “ ex: SELECT 
> * FROM xxx.client_test WHERE id > 3 I should only see id is greater than 3 
> after the snapshot in the stream table even I have id less than 3 in the 
> table in mysql database
> 
> Am I missing anything here?
> 
> The Flink version I’m using :1.18
> Flink cdc connector I’m using : flink-sql-connector-mysql-cdc-3.1.1
> JDBC version: mysql-connector-j-9.0.0
> Here is the setting about debezium.snapshot.select.statement.overrides : 
> https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-select-statement-overrides
>  
> <https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-select-statement-overrides>
> 
> Thank you for your help in advanced 
> Br,
> Ken Hung
> 
> 
> 
> 
> 
> 
> 



Re: Request to joining Apache Flink community on Slack

2024-10-09 Thread Leonard Xu
Welcome Ken, I’ve sent the invitation to your email.


Best,
Leonard


> 2024年10月10日 上午3:52,Ken CHUAN YU  写道:
> 
> Hi there
> 
> I like to join the Apache Flink community on Slack, my mail address for slack 
> is: ken.h...@vestiairecollective.com 
>   



Re: Flink SQL CDC connector do nothing to the setting "debizum.*" when create the source table

2024-10-09 Thread Ken CHUAN YU
Dear Yaroslav

Thanks for the explanation :-)

So if I just want partial snapshot base on certain statement is it possible
to do you Flink CDC?
The reason is the table is huge(100+ GBs) so full snapshot would overwhelm
the Flink application. Or are there any other practices that I can follow
here ?

Br,
Ken Hung


On Wed, Oct 9, 2024 at 10:34 PM Yaroslav Tkachenko 
wrote:

> Hi Ken,
>
> Snapshotting is implemented differently in Flink CDC, it doesn't re-use
> Debezium's implementation. So you can override some Debezium properties
> using "debezium.", but not "debezium.snapshot.".
>
> On Wed, Oct 9, 2024 at 12:46 PM Ken CHUAN YU 
> wrote:
>
>> Hi there
>> I have issue to use flink sql connector to capture change data from
>> MariaDB(MySQL) when configure “debezium.* settings here are more details:
>> I have following table in the source database (MariaDB):
>> ‘’’CREATE TABLE `client_test` (
>> `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
>> `name` varchar(500) NOT NULL DEFAULT '',
>> `age` int(11) NOT NULL,
>> PRIMARY KEY (`id`)
>> );
>> ‘’'
>>
>> Becasue some reason I need only partial data in this table for the
>> snapshot so I define the Flink stream table as follow:
>>
>> ‘’’CREATE TABLE client_cdc (
>> id DOUBLE,
>> name VARCHAR(500),
>> age DOUBLE,
>> PRIMARY KEY(id) NOT ENFORCED
>> )
>> WITH (
>> 'connector' = 'mysql-cdc',
>> 'hostname' = 'mariadb',
>> 'port' = '3306',
>> 'username' = 'ooo',
>> 'password' = 'ooo',
>> 'database-name' = 'xxx',
>> 'scan.startup.mode' = 'initial',
>> 'table-name' = 'client_test',
>> 'debezium.snapshot.query.mode' = 'custom',
>> 'debezium.snapshot.select.statement.overrides' = 'xxx.client_test',
>> 'debezium.snapshot.select.statement.overrides.xxx.client_test' = 'SELECT
>> * FROM xxx.client_test WHERE id > 3'
>> );
>> ‘’’
>> Above, I tried do filter out the rows which id is less than 3 when
>> sanpshot. But after execute select * from client_cdc; in flink client I can
>> still see all the sanpshot.
>> I also try to run this:
>>
>> ‘’’CREATE TABLE client_cdc (
>> id DOUBLE,
>> name VARCHAR(500),
>> age DOUBLE,
>> PRIMARY KEY(id) NOT ENFORCED
>> )
>> WITH (
>> 'connector' = 'mysql-cdc',
>> 'hostname' = 'mariadb',
>> 'port' = '3306',
>> 'username' = 'ooo',
>> 'password' = 'ooo',
>> 'database-name' = 'xxx',
>> 'scan.startup.mode' = 'initial',
>> 'table-name' = 'client_test',
>> 'debezium.snapshot.query.mode' = 'custom',
>> 'debezium.snapshot.select.statement.overrides' = 'xxx.client_test',
>> 'debezium.snapshot.select.statement.overrides.xxx.client_test' = 'this
>> should failed SELECT * FROM xxx.client_test WHERE id > 3'
>> );
>> ‘’'
>> This time I give an invaild query to
>> 'debezium.snapshot.select.statement.overrides.xxx.client_test' but I can
>> still execute select * from client_cdc; and it still take a full snapshot.
>> In other word it seems to me the Flink CDC connector is ignoring the
>> settings are Prefix debezium.* Am I missing anything here?
>> According to the document I be able to config the debezium but doesn’t
>> seems the case.
>>
>> The expectation is to see only rows selected during snapshot in
>> configuration:
>> "debezium.snapshot.select.statement.overrides.[database].[table] “ ex:
>> SELECT * FROM xxx.client_test WHERE id > 3 I should only see id is greater
>> than 3 after the snapshot in the stream table even I have id less than 3 in
>> the table in mysql database
>>
>> Am I missing anything here?
>>
>> The Flink version I’m using :1.18
>> Flink cdc connector I’m using : flink-sql-connector-mysql-cdc-3.1.1
>> JDBC version: mysql-connector-j-9.0.0
>> Here is the setting about debezium.snapshot.select.statement.overrides :
>> https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-select-statement-overrides
>>
>> Thank you for your help in advanced
>> Br,
>> Ken Hung
>>
>>
>>
>>
>>
>>
>>
>>


Re: Flink SQL CDC connector do nothing to the setting "debizum.*" when create the source table

2024-10-09 Thread Yaroslav Tkachenko
Hi Ken,

Snapshotting is implemented differently in Flink CDC, it doesn't re-use
Debezium's implementation. So you can override some Debezium properties
using "debezium.", but not "debezium.snapshot.".

On Wed, Oct 9, 2024 at 12:46 PM Ken CHUAN YU  wrote:

> Hi there
> I have issue to use flink sql connector to capture change data from
> MariaDB(MySQL) when configure “debezium.* settings here are more details:
> I have following table in the source database (MariaDB):
> ‘’’CREATE TABLE `client_test` (
> `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
> `name` varchar(500) NOT NULL DEFAULT '',
> `age` int(11) NOT NULL,
> PRIMARY KEY (`id`)
> );
> ‘’'
>
> Becasue some reason I need only partial data in this table for the
> snapshot so I define the Flink stream table as follow:
>
> ‘’’CREATE TABLE client_cdc (
> id DOUBLE,
> name VARCHAR(500),
> age DOUBLE,
> PRIMARY KEY(id) NOT ENFORCED
> )
> WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = 'mariadb',
> 'port' = '3306',
> 'username' = 'ooo',
> 'password' = 'ooo',
> 'database-name' = 'xxx',
> 'scan.startup.mode' = 'initial',
> 'table-name' = 'client_test',
> 'debezium.snapshot.query.mode' = 'custom',
> 'debezium.snapshot.select.statement.overrides' = 'xxx.client_test',
> 'debezium.snapshot.select.statement.overrides.xxx.client_test' = 'SELECT *
> FROM xxx.client_test WHERE id > 3'
> );
> ‘’’
> Above, I tried do filter out the rows which id is less than 3 when
> sanpshot. But after execute select * from client_cdc; in flink client I can
> still see all the sanpshot.
> I also try to run this:
>
> ‘’’CREATE TABLE client_cdc (
> id DOUBLE,
> name VARCHAR(500),
> age DOUBLE,
> PRIMARY KEY(id) NOT ENFORCED
> )
> WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = 'mariadb',
> 'port' = '3306',
> 'username' = 'ooo',
> 'password' = 'ooo',
> 'database-name' = 'xxx',
> 'scan.startup.mode' = 'initial',
> 'table-name' = 'client_test',
> 'debezium.snapshot.query.mode' = 'custom',
> 'debezium.snapshot.select.statement.overrides' = 'xxx.client_test',
> 'debezium.snapshot.select.statement.overrides.xxx.client_test' = 'this
> should failed SELECT * FROM xxx.client_test WHERE id > 3'
> );
> ‘’'
> This time I give an invaild query to
> 'debezium.snapshot.select.statement.overrides.xxx.client_test' but I can
> still execute select * from client_cdc; and it still take a full snapshot.
> In other word it seems to me the Flink CDC connector is ignoring the
> settings are Prefix debezium.* Am I missing anything here?
> According to the document I be able to config the debezium but doesn’t
> seems the case.
>
> The expectation is to see only rows selected during snapshot in
> configuration:
> "debezium.snapshot.select.statement.overrides.[database].[table] “ ex:
> SELECT * FROM xxx.client_test WHERE id > 3 I should only see id is greater
> than 3 after the snapshot in the stream table even I have id less than 3 in
> the table in mysql database
>
> Am I missing anything here?
>
> The Flink version I’m using :1.18
> Flink cdc connector I’m using : flink-sql-connector-mysql-cdc-3.1.1
> JDBC version: mysql-connector-j-9.0.0
> Here is the setting about debezium.snapshot.select.statement.overrides :
> https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-select-statement-overrides
>
> Thank you for your help in advanced
> Br,
> Ken Hung
>
>
>
>
>
>
>
>


Re: Kafka SQL Connector loses data if deserialization error occures

2024-10-09 Thread Ilya Karpov
Finally I found that in order to execute many insert statements in one job
I need to use STATEMENT SET. This solved the problem.

ср, 9 окт. 2024 г. в 12:17, Ilya Karpov :

> During this morning debug I've found that if I comment one of two insert
> expressions and submit sql, then only one job will be created in the flink
> cluster. If a corrupted message causes failure of this job then flink
> behaves correctly: checkpoint does not happen, offset is not committed!
> Modified sql is:
> CREATE TABLE IF NOT EXISTS events (
> message STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'events',
> 'properties.bootstrap.servers' = 'localhost:19092',
> 'properties.group.id' = 'flink-kafka-to-ch-connector',
> 'properties.auto.offset.reset' = 'latest', -- If my-group is used for the
> first time, the consumption starts from the latest offset
> 'format' = 'raw'
> );
>
> CREATE CATALOG clickhouse WITH (
> 'type' = 'clickhouse',
> 'url' = 'clickhouse://localhost:8123',
> 'username' = 'default',
> 'password' = 'secret',
> 'database-name' = 'default',
> 'use-local' = 'false',
> 'sink.max-retries' = '-1'
> );
>
> USE CATALOG clickhouse;
>
> -- write data into the clickhouse `mobile` table
> -- INSERT INTO mobile_hits_medium
> -- SELECT
> -- JSON_VALUE(message, '$.id' RETURNING STRING)
> -- , JSON_VALUE(message, '$.type' RETURNING STRING)
> -- , JSON_VALUE(message, '$.platform' RETURNING STRING)
> -- , JSON_VALUE(message, '$.cnt' RETURNING INTEGER)
> -- FROM default_catalog.default_database.events
> -- where JSON_VALUE(message, '$.platform') = 'mobile';
>
> -- write data into the clickhouse `web` table
> INSERT INTO web_hits
> SELECT
> JSON_VALUE(message, '$.id' RETURNING STRING) as id
> , JSON_VALUE(message, '$.type' RETURNING STRING) as type
> , JSON_VALUE(message, '$.platform' RETURNING STRING) as platform
> , JSON_VALUE(message, '$.payload.browser_name' RETURNING STRING) as
> payload_browser_name
> , JSON_VALUE(message, '$.payload.browser_version' RETURNING INTEGER) as
> payload_browser_version
> , JSON_VALUE(message, '$.payload.fp_score' RETURNING DOUBLE) as
> payload_fp_score
> , TO_TIMESTAMP(JSON_VALUE(message, '$.created_at' RETURNING STRING), 
> '-MM-dd
> HH:mm:ss.SSSX') as created_at
> FROM default_catalog.default_database.events
> where JSON_VALUE(message, '$.platform') = 'web';
>
>
> My current guess is that because of* two inserts *(that uses the* same
> kafka table as a source data*) running simultaneously and* one of them is
> not failing *then this job successfully commits offset. This causes a
> buggy situation.
>
> ср, 9 окт. 2024 г. в 10:06, Ilya Karpov :
>
>> Hi,
>> I have a local flink-1.20.0 setup, where I test clickhouse connector
>> . The problem
>> occurred in one of the test cases: when I push a corrupted message to kafka
>> (json field `browser_version` expected to be integer but actually is a
>> string) then the task fails with exception and restarts over and over
>> (which is ok and expected). But after (some time passes and) checkpoint
>> occurs then offset value is incremented and committed to kafka (this is *not
>> expected*!), then task restarts reads updated offset value and ready to
>> handle new data - actually it just skipped corrupted message! What I expect
>> is: checkpoint is stuck/skipped because one of tasks is restarting, offset
>> value is NOT committed to kafka, tasks is restarting infinitely until
>> manual increment of the offset or change in flinksql script.
>> Checkpointing is default (uses hash, one in 3min), number of slots = 10 -
>> nothing else is changed in default flink conf.
>>
>> Please help to figure out where the problem is: in my expectations, in
>> configuration or its a bug.
>>
>> Full logs and flink conf attached.
>>
>> Details:
>> Exception:
>> 2024-10-09 08:37:58,657 WARN org.apache.flink.runtime.taskmanager.Task
>> [] - Source: events[4] -> Calc[5] -> Sink: web_hits[6] (1/1)#12 
>> (7f77085ad8b8d671723bf6bf5a8f6493_cbc357ccb763df2852fee8c4fc7d55f2_0_12)
>> switched from RUNNING to FAILED with failure cause: java.io.IOException:
>> Failed to deserialize consumer record due to
>> at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter
>> .emitRecord(KafkaRecordEmitter.java:56)
>> at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter
>> .emitRecord(KafkaRecordEmitter.java:33)
>> at org.apache.flink.connector.base.source.reader.SourceReaderBase
>> .pollNext(SourceReaderBase.java:143)
>> at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(
>> SourceOperator.java:385)
>> at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(
>> StreamTaskSourceInput.java:68)
>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>> .processInput(StreamOneInputProcessor.java:65)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>> StreamTask.java:542)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcess

Re: Kafka SQL Connector loses data if deserialization error occures

2024-10-09 Thread Ilya Karpov
During this morning debug I've found that if I comment one of two insert
expressions and submit sql, then only one job will be created in the flink
cluster. If a corrupted message causes failure of this job then flink
behaves correctly: checkpoint does not happen, offset is not committed!
Modified sql is:
CREATE TABLE IF NOT EXISTS events (
message STRING
) WITH (
'connector' = 'kafka',
'topic' = 'events',
'properties.bootstrap.servers' = 'localhost:19092',
'properties.group.id' = 'flink-kafka-to-ch-connector',
'properties.auto.offset.reset' = 'latest', -- If my-group is used for the
first time, the consumption starts from the latest offset
'format' = 'raw'
);

CREATE CATALOG clickhouse WITH (
'type' = 'clickhouse',
'url' = 'clickhouse://localhost:8123',
'username' = 'default',
'password' = 'secret',
'database-name' = 'default',
'use-local' = 'false',
'sink.max-retries' = '-1'
);

USE CATALOG clickhouse;

-- write data into the clickhouse `mobile` table
-- INSERT INTO mobile_hits_medium
-- SELECT
-- JSON_VALUE(message, '$.id' RETURNING STRING)
-- , JSON_VALUE(message, '$.type' RETURNING STRING)
-- , JSON_VALUE(message, '$.platform' RETURNING STRING)
-- , JSON_VALUE(message, '$.cnt' RETURNING INTEGER)
-- FROM default_catalog.default_database.events
-- where JSON_VALUE(message, '$.platform') = 'mobile';

-- write data into the clickhouse `web` table
INSERT INTO web_hits
SELECT
JSON_VALUE(message, '$.id' RETURNING STRING) as id
, JSON_VALUE(message, '$.type' RETURNING STRING) as type
, JSON_VALUE(message, '$.platform' RETURNING STRING) as platform
, JSON_VALUE(message, '$.payload.browser_name' RETURNING STRING) as
payload_browser_name
, JSON_VALUE(message, '$.payload.browser_version' RETURNING INTEGER) as
payload_browser_version
, JSON_VALUE(message, '$.payload.fp_score' RETURNING DOUBLE) as
payload_fp_score
, TO_TIMESTAMP(JSON_VALUE(message, '$.created_at' RETURNING STRING),
'-MM-dd
HH:mm:ss.SSSX') as created_at
FROM default_catalog.default_database.events
where JSON_VALUE(message, '$.platform') = 'web';


My current guess is that because of* two inserts *(that uses the* same
kafka table as a source data*) running simultaneously and* one of them is
not failing *then this job successfully commits offset. This causes a buggy
situation.

ср, 9 окт. 2024 г. в 10:06, Ilya Karpov :

> Hi,
> I have a local flink-1.20.0 setup, where I test clickhouse connector
> . The problem
> occurred in one of the test cases: when I push a corrupted message to kafka
> (json field `browser_version` expected to be integer but actually is a
> string) then the task fails with exception and restarts over and over
> (which is ok and expected). But after (some time passes and) checkpoint
> occurs then offset value is incremented and committed to kafka (this is *not
> expected*!), then task restarts reads updated offset value and ready to
> handle new data - actually it just skipped corrupted message! What I expect
> is: checkpoint is stuck/skipped because one of tasks is restarting, offset
> value is NOT committed to kafka, tasks is restarting infinitely until
> manual increment of the offset or change in flinksql script.
> Checkpointing is default (uses hash, one in 3min), number of slots = 10 -
> nothing else is changed in default flink conf.
>
> Please help to figure out where the problem is: in my expectations, in
> configuration or its a bug.
>
> Full logs and flink conf attached.
>
> Details:
> Exception:
> 2024-10-09 08:37:58,657 WARN org.apache.flink.runtime.taskmanager.Task []
> - Source: events[4] -> Calc[5] -> Sink: web_hits[6] (1/1)#12 
> (7f77085ad8b8d671723bf6bf5a8f6493_cbc357ccb763df2852fee8c4fc7d55f2_0_12)
> switched from RUNNING to FAILED with failure cause: java.io.IOException:
> Failed to deserialize consumer record due to
> at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter
> .emitRecord(KafkaRecordEmitter.java:56)
> at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter
> .emitRecord(KafkaRecordEmitter.java:33)
> at org.apache.flink.connector.base.source.reader.SourceReaderBase
> .pollNext(SourceReaderBase.java:143)
> at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(
> SourceOperator.java:385)
> at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(
> StreamTaskSourceInput.java:68)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:65)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:542)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:231)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:831)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> .java:780)
> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
> Task.java:935)
> at org.apach

Re: Flink custom source

2024-10-06 Thread Anil Dasari
Hi Ahmed,
Thanks again. You are right. send a split request call in the reader fixed
the issue.

Thanks




On Sun, Oct 6, 2024 at 11:30 AM Ahmed Hamdy  wrote:

> Hi Anil,
> Yes you are right, split assignment is either proactive by split discovery
> and then assignment as in the scheduled discovery tasks in Kafka, JDBC
> sources or reactive where the reader itself submits a split request and the
> enumerator assigns on requests like the example you added, or even a
> mixture of both as in the file system source, In order to assign splits via
> SplitEnumerator#handleSplitRequest implementation reader instances must
> submit requests as in here[1]. I can't see your custom implementation [2]
> submitting any split requests. Let me know if I get anything wrong here!
>
>
> 1-
> https://github.com/apache/flink/blob/dd45e0522588ea594e4a92fd98d8115363a5700a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java#L92C13-L92C40
> 
>
> 2-
> https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/paralleljdbc/DatabaseSplitReader.java
> 
> Best Regards
> Ahmed Hamdy
>
>
> On Sun, 6 Oct 2024 at 16:48, Anil Dasari  wrote:
>
>> Hi Ahmed,
>> Thanks for the response.
>> This is the part that I find unclear in the documentation and FLIP-27.
>> The actual split assignment happens in the
>> SplitEnumerator#handleSplitRequest method and not in
>> SplitEnumerator#start. Both KafkaSource and JdbcSource use
>> context.callAsync to identify new splits in SplitEnumerator#start. From
>> what I understand, split assignment does not need to occur in the start
>> method. SplitEmulator assigns the assigned splits to the defined reader in
>> the source.
>>
>> SplitEnumerator#handleSplitRequest calls decided by parallelism ? i
>> think, yes.
>>
>> Kafka and Jdbc Source split emulators a bit complex due to the nature of
>> the auto discovery required for the given configuration. Here is the simple
>> split emulator from flink repo
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java#L58
>> 
>> where split assignment happen in handleRequest.
>>
>> Let me know if you have any questions.
>>
>> Thanks.
>>
>> On Sun, Oct 6, 2024 at 6:44 AM Ahmed Hamdy  wrote:
>>
>>> Hi Anil
>>> I am glad the new (yet to be only) source API is getting attention,
>>> according to the execution model the assignments of splits is the
>>> responsibility of the enumerator and it seems that the enumerator is not
>>> assigning the readers any splits.
>>> Check kafka source for reference[1]
>>> 1-
>>> https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L286
>>> 
>>> Best Regards
>>> Ahmed Hamdy
>>>
>>>
>>> On Sun, 6 Oct 2024 at 06:41, Anil Dasari  wrote:
>>>
 Hello,
 I have implemented a custom source that reads tables in parallel, with
 each split corresponding to a table and custom source implementation can be
 found here -
 https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/paralleljdbc/DatabaseSource.java
 

 However, it seems the source splits are not being scheduled and data is
 not being read from the tables. Can someone help me identify the issue in
 the implementation?

 Thanks




Re: Flink custom source

2024-10-06 Thread Ahmed Hamdy
Hi Anil,
Yes you are right, split assignment is either proactive by split discovery
and then assignment as in the scheduled discovery tasks in Kafka, JDBC
sources or reactive where the reader itself submits a split request and the
enumerator assigns on requests like the example you added, or even a
mixture of both as in the file system source, In order to assign splits via
SplitEnumerator#handleSplitRequest implementation reader instances must
submit requests as in here[1]. I can't see your custom implementation [2]
submitting any split requests. Let me know if I get anything wrong here!


1-
https://github.com/apache/flink/blob/dd45e0522588ea594e4a92fd98d8115363a5700a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java#L92C13-L92C40

2-
https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/paralleljdbc/DatabaseSplitReader.java
Best Regards
Ahmed Hamdy


On Sun, 6 Oct 2024 at 16:48, Anil Dasari  wrote:

> Hi Ahmed,
> Thanks for the response.
> This is the part that I find unclear in the documentation and FLIP-27. The
> actual split assignment happens in the SplitEnumerator#handleSplitRequest
> method and not in SplitEnumerator#start. Both KafkaSource and JdbcSource
> use context.callAsync to identify new splits in SplitEnumerator#start.
> From what I understand, split assignment does not need to occur in the
> start method. SplitEmulator assigns the assigned splits to the defined
> reader in the source.
>
> SplitEnumerator#handleSplitRequest calls decided by parallelism ? i
> think, yes.
>
> Kafka and Jdbc Source split emulators a bit complex due to the nature of
> the auto discovery required for the given configuration. Here is the simple
> split emulator from flink repo
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java#L58
> where split assignment happen in handleRequest.
>
> Let me know if you have any questions.
>
> Thanks.
>
> On Sun, Oct 6, 2024 at 6:44 AM Ahmed Hamdy  wrote:
>
>> Hi Anil
>> I am glad the new (yet to be only) source API is getting attention,
>> according to the execution model the assignments of splits is the
>> responsibility of the enumerator and it seems that the enumerator is not
>> assigning the readers any splits.
>> Check kafka source for reference[1]
>> 1-
>> https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L286
>> Best Regards
>> Ahmed Hamdy
>>
>>
>> On Sun, 6 Oct 2024 at 06:41, Anil Dasari  wrote:
>>
>>> Hello,
>>> I have implemented a custom source that reads tables in parallel, with
>>> each split corresponding to a table and custom source implementation can be
>>> found here -
>>> https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/paralleljdbc/DatabaseSource.java
>>>
>>> However, it seems the source splits are not being scheduled and data is
>>> not being read from the tables. Can someone help me identify the issue in
>>> the implementation?
>>>
>>> Thanks
>>>
>>>


Re: Flink custom source

2024-10-06 Thread Anil Dasari
Hi Ahmed,
Thanks for the response.
This is the part that I find unclear in the documentation and FLIP-27. The
actual split assignment happens in the SplitEnumerator#handleSplitRequest
method and not in SplitEnumerator#start. Both KafkaSource and JdbcSource
use context.callAsync to identify new splits in SplitEnumerator#start. From
what I understand, split assignment does not need to occur in the start
method. SplitEmulator assigns the assigned splits to the defined reader in
the source.

SplitEnumerator#handleSplitRequest calls decided by parallelism ? i think,
yes.

Kafka and Jdbc Source split emulators a bit complex due to the nature of
the auto discovery required for the given configuration. Here is the simple
split emulator from flink repo
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java#L58
where split assignment happen in handleRequest.

Let me know if you have any questions.

Thanks.

On Sun, Oct 6, 2024 at 6:44 AM Ahmed Hamdy  wrote:

> Hi Anil
> I am glad the new (yet to be only) source API is getting attention,
> according to the execution model the assignments of splits is the
> responsibility of the enumerator and it seems that the enumerator is not
> assigning the readers any splits.
> Check kafka source for reference[1]
> 1-
> https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L286
> 
> Best Regards
> Ahmed Hamdy
>
>
> On Sun, 6 Oct 2024 at 06:41, Anil Dasari  wrote:
>
>> Hello,
>> I have implemented a custom source that reads tables in parallel, with
>> each split corresponding to a table and custom source implementation can be
>> found here -
>> https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/paralleljdbc/DatabaseSource.java
>> 
>>
>> However, it seems the source splits are not being scheduled and data is
>> not being read from the tables. Can someone help me identify the issue in
>> the implementation?
>>
>> Thanks
>>
>>


Re: Flink custom source

2024-10-06 Thread Ahmed Hamdy
Hi Anil
I am glad the new (yet to be only) source API is getting attention,
according to the execution model the assignments of splits is the
responsibility of the enumerator and it seems that the enumerator is not
assigning the readers any splits.
Check kafka source for reference[1]
1-
https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L286
Best Regards
Ahmed Hamdy


On Sun, 6 Oct 2024 at 06:41, Anil Dasari  wrote:

> Hello,
> I have implemented a custom source that reads tables in parallel, with
> each split corresponding to a table and custom source implementation can be
> found here -
> https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/paralleljdbc/DatabaseSource.java
>
> However, it seems the source splits are not being scheduled and data is
> not being read from the tables. Can someone help me identify the issue in
> the implementation?
>
> Thanks
>
>


Re: Status of ClickHouseSink

2024-10-03 Thread Ilya Karpov
Sounds great, I'll try it, thanks!

чт, 3 окт. 2024 г. в 20:54, Yaroslav Tkachenko :

> Yes, quite well! :) We've been using it in production for many months now.
>
> On Thu, Oct 3, 2024 at 10:50 AM Ilya Karpov  wrote:
>
>> Yaroslav,
>> Yep, I saw it, did you try it yourself? Does it work?
>>
>> чт, 3 окт. 2024 г. в 19:54, Yaroslav Tkachenko :
>>
>>> https://github.com/itinycheng/flink-connector-clickhouse is another
>>> one, it supports at least Flink 1.17.
>>>
>>> On Thu, Oct 3, 2024 at 7:52 AM Sachin Mittal  wrote:
>>>
 It works for me with Flink version 1.8.

 I am using this in prod. Somehow it’s simpler to use this to ingest
 data into clickhouse than setup Kafka + clickpipe.

 On Thu, 3 Oct 2024 at 7:51 PM, Ilya Karpov  wrote:

> Seems that it depends on quite outdated
> 
> version of flink. Did you make the version up yourself?
>
> чт, 3 окт. 2024 г. в 15:17, Sachin Mittal :
>
>> I have been using:
>> https://github.com/ivi-ru/flink-clickhouse-sink
>>
>>
>>
>> On Thu, Oct 3, 2024 at 4:54 PM Ilya Karpov  wrote:
>>
>>> Hi,
>>> I've been searching for an implementation of kafka to clickhouse
>>> sink and found FLIP
>>> 
>>>  and
>>> connector sources
>>> . Can
>>> anyone clarify if the connector is usable (tested on production 
>>> workloads)
>>> and plans about including it to flink distribution?
>>>
>>> Thanks forward!
>>>
>>


Re: Status of ClickHouseSink

2024-10-03 Thread Yaroslav Tkachenko
Yes, quite well! :) We've been using it in production for many months now.

On Thu, Oct 3, 2024 at 10:50 AM Ilya Karpov  wrote:

> Yaroslav,
> Yep, I saw it, did you try it yourself? Does it work?
>
> чт, 3 окт. 2024 г. в 19:54, Yaroslav Tkachenko :
>
>> https://github.com/itinycheng/flink-connector-clickhouse is another one,
>> it supports at least Flink 1.17.
>>
>> On Thu, Oct 3, 2024 at 7:52 AM Sachin Mittal  wrote:
>>
>>> It works for me with Flink version 1.8.
>>>
>>> I am using this in prod. Somehow it’s simpler to use this to ingest data
>>> into clickhouse than setup Kafka + clickpipe.
>>>
>>> On Thu, 3 Oct 2024 at 7:51 PM, Ilya Karpov  wrote:
>>>
 Seems that it depends on quite outdated
 
 version of flink. Did you make the version up yourself?

 чт, 3 окт. 2024 г. в 15:17, Sachin Mittal :

> I have been using:
> https://github.com/ivi-ru/flink-clickhouse-sink
>
>
>
> On Thu, Oct 3, 2024 at 4:54 PM Ilya Karpov  wrote:
>
>> Hi,
>> I've been searching for an implementation of kafka to clickhouse sink
>> and found FLIP
>> 
>>  and
>> connector sources
>> . Can
>> anyone clarify if the connector is usable (tested on production 
>> workloads)
>> and plans about including it to flink distribution?
>>
>> Thanks forward!
>>
>


Re: Status of ClickHouseSink

2024-10-03 Thread Ilya Karpov
Yaroslav,
Yep, I saw it, did you try it yourself? Does it work?

чт, 3 окт. 2024 г. в 19:54, Yaroslav Tkachenko :

> https://github.com/itinycheng/flink-connector-clickhouse is another one,
> it supports at least Flink 1.17.
>
> On Thu, Oct 3, 2024 at 7:52 AM Sachin Mittal  wrote:
>
>> It works for me with Flink version 1.8.
>>
>> I am using this in prod. Somehow it’s simpler to use this to ingest data
>> into clickhouse than setup Kafka + clickpipe.
>>
>> On Thu, 3 Oct 2024 at 7:51 PM, Ilya Karpov  wrote:
>>
>>> Seems that it depends on quite outdated
>>> 
>>> version of flink. Did you make the version up yourself?
>>>
>>> чт, 3 окт. 2024 г. в 15:17, Sachin Mittal :
>>>
 I have been using:
 https://github.com/ivi-ru/flink-clickhouse-sink



 On Thu, Oct 3, 2024 at 4:54 PM Ilya Karpov  wrote:

> Hi,
> I've been searching for an implementation of kafka to clickhouse sink
> and found FLIP
> 
>  and
> connector sources
> . Can
> anyone clarify if the connector is usable (tested on production workloads)
> and plans about including it to flink distribution?
>
> Thanks forward!
>



Re: Status of ClickHouseSink

2024-10-03 Thread Yaroslav Tkachenko
https://github.com/itinycheng/flink-connector-clickhouse is another one, it
supports at least Flink 1.17.

On Thu, Oct 3, 2024 at 7:52 AM Sachin Mittal  wrote:

> It works for me with Flink version 1.8.
>
> I am using this in prod. Somehow it’s simpler to use this to ingest data
> into clickhouse than setup Kafka + clickpipe.
>
> On Thu, 3 Oct 2024 at 7:51 PM, Ilya Karpov  wrote:
>
>> Seems that it depends on quite outdated
>> 
>> version of flink. Did you make the version up yourself?
>>
>> чт, 3 окт. 2024 г. в 15:17, Sachin Mittal :
>>
>>> I have been using:
>>> https://github.com/ivi-ru/flink-clickhouse-sink
>>>
>>>
>>>
>>> On Thu, Oct 3, 2024 at 4:54 PM Ilya Karpov  wrote:
>>>
 Hi,
 I've been searching for an implementation of kafka to clickhouse sink
 and found FLIP
 
  and
 connector sources
 . Can anyone
 clarify if the connector is usable (tested on production workloads) and
 plans about including it to flink distribution?

 Thanks forward!

>>>


Re: Status of ClickHouseSink

2024-10-03 Thread Sachin Mittal
It works for me with Flink version 1.8.

I am using this in prod. Somehow it’s simpler to use this to ingest data
into clickhouse than setup Kafka + clickpipe.

On Thu, 3 Oct 2024 at 7:51 PM, Ilya Karpov  wrote:

> Seems that it depends on quite outdated
> 
> version of flink. Did you make the version up yourself?
>
> чт, 3 окт. 2024 г. в 15:17, Sachin Mittal :
>
>> I have been using:
>> https://github.com/ivi-ru/flink-clickhouse-sink
>>
>>
>>
>> On Thu, Oct 3, 2024 at 4:54 PM Ilya Karpov  wrote:
>>
>>> Hi,
>>> I've been searching for an implementation of kafka to clickhouse sink
>>> and found FLIP
>>> 
>>>  and
>>> connector sources
>>> . Can anyone
>>> clarify if the connector is usable (tested on production workloads) and
>>> plans about including it to flink distribution?
>>>
>>> Thanks forward!
>>>
>>


Re: Status of ClickHouseSink

2024-10-03 Thread Ilya Karpov
Seems that it depends on quite outdated

version of flink. Did you make the version up yourself?

чт, 3 окт. 2024 г. в 15:17, Sachin Mittal :

> I have been using:
> https://github.com/ivi-ru/flink-clickhouse-sink
>
>
>
> On Thu, Oct 3, 2024 at 4:54 PM Ilya Karpov  wrote:
>
>> Hi,
>> I've been searching for an implementation of kafka to clickhouse sink and
>> found FLIP
>> 
>>  and
>> connector sources
>> . Can anyone
>> clarify if the connector is usable (tested on production workloads) and
>> plans about including it to flink distribution?
>>
>> Thanks forward!
>>
>


Re: Flink 1.20 and Flink Kafka Connector 3.2.0-1.19

2024-10-03 Thread Dominik.Buenzli
Hi Patricia,

Can you please add the complete “No Class Definition Error” stacktrace? I’ve 
also experienced some issues with the newer Kafka connectors and it might be 
the same problem. As far as I know, there is not yet a dedicated Kafka 
connector for Flink 1.20.

Kind regards

Dominik Bünzli
Data, Analytics & AI Engineer III

From: patricia lee 
Date: Thursday, 3 October 2024 at 15:27
To: user@flink.apache.org 
Subject: Flink 1.20 and Flink Kafka Connector 3.2.0-1.19
Be aware: This is an external email.

Hi,


I have upgraded our project to Flink 1.20 and JDK 17. But I noticed there is no 
Kafka connector for Flink 1.20.

I currently used the versions but there is intermittent error of Kafka related 
No Class Definition Error

Where can I get the Kafka connector for flink 1.20? Thanks


Re: Status of ClickHouseSink

2024-10-03 Thread Sachin Mittal
I have been using:
https://github.com/ivi-ru/flink-clickhouse-sink



On Thu, Oct 3, 2024 at 4:54 PM Ilya Karpov  wrote:

> Hi,
> I've been searching for an implementation of kafka to clickhouse sink and
> found FLIP
> 
>  and
> connector sources
> . Can anyone
> clarify if the connector is usable (tested on production workloads) and
> plans about including it to flink distribution?
>
> Thanks forward!
>


Re: Help regarding Flink Stateful API

2024-10-01 Thread Nitin Chauhan
Nitin Chauhan 
5:21 PM (6 minutes ago)
to Andreas
Hello Gabor,

Thanks for the response.

I have a set of specific questions, it could be of great help if you could
answer them

1) Can stateful functions be executed only using Docker images provided by
flink? I could not execute it as a standalone app.
2) Is it possible to access the state info outside of data processing i.e.
Can I access it from the UI as Sync calls?
3) I tried to expose the stateful function as a REST endpoint without
ingress/egress but was unable to achieve without defining an ingress/egress
4) How can I create a docker image of flink stateful APIs, I was unable to
decouple the source code and build a docker image. I used the playground
below.

https://github.com/apache/flink-statefun-playground/tree/release-3.3/playground-internal

Best Regards,
Nitin


On Tue, Oct 1, 2024 at 5:26 PM Gabor Somogyi 
wrote:

> Hi Nitin,
>
> Flink applications can be started locally (run the main) for example from
> Intellij or any other similar IDE.
> Important note that such case the execution path is different but it's
> convenient for business logic debugging.
>
> BR,
> G
>
>
> On Tue, Oct 1, 2024 at 12:21 PM Nitin Chauhan <
> nitin.chau...@simadvisory.com> wrote:
>
>> HI
>>
>> I am trying to run a code using stateful APIs in my local machine. I am
>> able to run it if I use docker image provided by flink official source code.
>>
>> I wanted to know is there any way in which we can do it without using the
>> docker image and configuring everything on my own?
>>
>> Best Regards,
>> Nitin
>>
>


Re: Help regarding Flink Stateful API

2024-10-01 Thread Gabor Somogyi
As I see you use stateful functions.
Please consider whether your needs and possibilities match the actual stand
of statefun project:
https://lists.apache.org/thread/7cr2bgt91ppk6pz8o0nfbd10gs63nz6t

G


On Tue, Oct 1, 2024 at 1:59 PM Nitin Chauhan 
wrote:

> Nitin Chauhan 
> 5:21 PM (6 minutes ago)
> to Andreas
> Hello Gabor,
>
> Thanks for the response.
>
> I have a set of specific questions, it could be of great help if you could
> answer them
>
> 1) Can stateful functions be executed only using Docker images provided by
> flink? I could not execute it as a standalone app.
> 2) Is it possible to access the state info outside of data processing i.e.
> Can I access it from the UI as Sync calls?
> 3) I tried to expose the stateful function as a REST endpoint without
> ingress/egress but was unable to achieve without defining an ingress/egress
> 4) How can I create a docker image of flink stateful APIs, I was unable to
> decouple the source code and build a docker image. I used the playground
> below.
>
> https://github.com/apache/flink-statefun-playground/tree/release-3.3/playground-internal
>
> Best Regards,
> Nitin
>
>
> On Tue, Oct 1, 2024 at 5:26 PM Gabor Somogyi 
> wrote:
>
>> Hi Nitin,
>>
>> Flink applications can be started locally (run the main) for example from
>> Intellij or any other similar IDE.
>> Important note that such case the execution path is different but it's
>> convenient for business logic debugging.
>>
>> BR,
>> G
>>
>>
>> On Tue, Oct 1, 2024 at 12:21 PM Nitin Chauhan <
>> nitin.chau...@simadvisory.com> wrote:
>>
>>> HI
>>>
>>> I am trying to run a code using stateful APIs in my local machine. I am
>>> able to run it if I use docker image provided by flink official source code.
>>>
>>> I wanted to know is there any way in which we can do it without using
>>> the docker image and configuring everything on my own?
>>>
>>> Best Regards,
>>> Nitin
>>>
>>


Re: Help regarding Flink Stateful API

2024-10-01 Thread Nitin Chauhan
Thanks for the help

Let me go through the link

On Tue, Oct 1, 2024 at 6:09 PM Gabor Somogyi 
wrote:

> As I see you use stateful functions.
> Please consider whether your needs and possibilities match the actual
> stand of statefun project:
> https://lists.apache.org/thread/7cr2bgt91ppk6pz8o0nfbd10gs63nz6t
>
> G
>
>
> On Tue, Oct 1, 2024 at 1:59 PM Nitin Chauhan <
> nitin.chau...@simadvisory.com> wrote:
>
>> Nitin Chauhan 
>> 5:21 PM (6 minutes ago)
>> to Andreas
>> Hello Gabor,
>>
>> Thanks for the response.
>>
>> I have a set of specific questions, it could be of great help if you
>> could answer them
>>
>> 1) Can stateful functions be executed only using Docker images provided
>> by flink? I could not execute it as a standalone app.
>> 2) Is it possible to access the state info outside of data processing
>> i.e. Can I access it from the UI as Sync calls?
>> 3) I tried to expose the stateful function as a REST endpoint without
>> ingress/egress but was unable to achieve without defining an ingress/egress
>> 4) How can I create a docker image of flink stateful APIs, I was unable
>> to decouple the source code and build a docker image. I used the playground
>> below.
>>
>> https://github.com/apache/flink-statefun-playground/tree/release-3.3/playground-internal
>>
>> Best Regards,
>> Nitin
>>
>>
>> On Tue, Oct 1, 2024 at 5:26 PM Gabor Somogyi 
>> wrote:
>>
>>> Hi Nitin,
>>>
>>> Flink applications can be started locally (run the main) for example
>>> from Intellij or any other similar IDE.
>>> Important note that such case the execution path is different but it's
>>> convenient for business logic debugging.
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Tue, Oct 1, 2024 at 12:21 PM Nitin Chauhan <
>>> nitin.chau...@simadvisory.com> wrote:
>>>
 HI

 I am trying to run a code using stateful APIs in my local machine. I am
 able to run it if I use docker image provided by flink official source 
 code.

 I wanted to know is there any way in which we can do it without using
 the docker image and configuring everything on my own?

 Best Regards,
 Nitin

>>>


Re: Help regarding Flink Stateful API

2024-10-01 Thread Gabor Somogyi
Hi Nitin,

Flink applications can be started locally (run the main) for example from
Intellij or any other similar IDE.
Important note that such case the execution path is different but it's
convenient for business logic debugging.

BR,
G


On Tue, Oct 1, 2024 at 12:21 PM Nitin Chauhan 
wrote:

> HI
>
> I am trying to run a code using stateful APIs in my local machine. I am
> able to run it if I use docker image provided by flink official source code.
>
> I wanted to know is there any way in which we can do it without using the
> docker image and configuring everything on my own?
>
> Best Regards,
> Nitin
>


Re: setUidHash not working (???)

2024-09-25 Thread Salva Alcántara
Hi Gabor,

Thanks for the insights! However, after inspecting the contents of the
latest savepoint, I double checked that the hash value for my source
operator was correct ("cbc357ccb763df2852fee8c4fc7d55f2"). This is also
consistent with the id that was shown in the jobmanager UI. So, again,
unless I'm missing something I don't see `setUidHash` having any effect in
practice. The only way I've found to workaround the problem in production
is by enabling `allowNonRestoredState`, which seems to confirm my thesis.
Indeed, the new ID shown in the jobmanager UI is
"bc764cd8ddf7a0cff126f51c16239658" (vs the intended
"cbc357ccb763df2852fee8c4fc7d55f2") as reported in my test above. This
makes me think there is something wrong with `setUidHash`...

BTW I'm running on Flink 1.18.1.

Regards,

Salva

On Tue, Sep 24, 2024 at 1:07 PM Gabor Somogyi 
wrote:

> Now I see what you're doing. In general you should look for the UID pair
> in the vertex, like this (several operators can belong to a vertex):
>
> Optional pair =
> vertex.get().getOperatorIDs().stream()
> .filter(o -> 
> o.getUserDefinedOperatorID().get().toString().equals(uidHash))
> .findFirst();
>
> assertEquals(uidHash, pair.get().getUserDefinedOperatorID().get().toString());
>
>
> Please load your savepoint with SavepointLoader and analyze what kind of
> operators are inside and why is your app blowing up.
>
> G
>
>
> On Tue, Sep 24, 2024 at 9:29 AM Gabor Somogyi 
> wrote:
>
>> Hi Salva,
>>
>> Which version is this?
>>
>> BR,
>> G
>>
>>
>> On Mon, Sep 23, 2024 at 8:46 PM Salva Alcántara 
>> wrote:
>>
>>> I have a pipeline where I'm trying to add a new operator. The problem is
>>> that originally I forgot to specify the uid for one source. To remedy this,
>>> I'm using setUidHash, providing the operator id to match that in my
>>> savepoints / current job graph.
>>>
>>> Unless I'm doing something wrong, what I've observed is that the value
>>> provided to setUidHash is completely ignored by Flink. I've tried to
>>> summarize it in the following test:
>>>
>>> ```java
>>>   @Test
>>>   public void testSetUidHash() throws Exception {
>>> var uidHash = "cbc357ccb763df2852fee8c4fc7d55f2";
>>> env.fromElements(1, 2, 3)
>>> .name("123")
>>> .setUidHash(uidHash)
>>> //.uid("123")
>>> .print()
>>> .name("print")
>>> .uid("print");
>>>
>>> var vertex = StreamSupport
>>>
>>> .stream(env.getStreamGraph().getJobGraph().getVertices().spliterator(),
>>> false)
>>> .filter(v -> v.getName().equals("Source: 123"))
>>> .findFirst()
>>> .orElseThrow();
>>>
>>> assertEquals(uidHash, vertex.getID());
>>> // Fails with java.lang.AssertionError (if setUidHash is used):
>>> // expected: but
>>> was:
>>> // Interestingly, the actual value does not even depend on the
>>> provided value (uidHash)!
>>>
>>> // assertEquals("6a7f660d1b2d5b9869cf0ecee3a17e42", vertex.getID());
>>> // Passes (if uid is used instead)
>>>   }
>>> ```
>>>
>>> Can anyone confirm whether this is a bug?
>>>
>>> Regards,
>>>
>>> Salva
>>>
>>


Re: Getting Direct buffer memory. Errors with Kafka.

2024-09-24 Thread John Smith
How/what tools can we use to monitor directory usage?

On Thu, Aug 29, 2024 at 8:00 AM John Smith  wrote:

> Also linger and batch is producer setting we are getting this error on
> consumers. In fact we don't use Kafka as a sink what so ever in D-Link.
>
> On Thu, Aug 29, 2024, 8:46 AM John Smith  wrote:
>
>> Maybe the change in direct memory allocation in java 11 did this?
>>
>> Java 8: By default, the amount of native memory used for Direct Byte
>> Buffers is limited to 87.5% of the maximum heap size.
>>
>> Java 11: By default, the amount of native memory used for Direct Byte
>> Buffers is limited to the maximum heap size.
>>
>> On Sat, Aug 24, 2024, 3:18 PM John Smith  wrote:
>>
>>> The same exact task/code and exact same version of flink had no issues
>>> before.
>>>
>>> The only thing that changed is deployed flink to java 11. Added more
>>> memory to the config and increased the parallelism of the Kafka source.
>>>
>>> On Fri, Aug 23, 2024, 3:46 PM John Smith  wrote:
>>>
 Online resources including my previous question to this problem said
 there was some client bug connecting to SSL broker that caused memory
 issues. As far as memory setup I have the following...

 Here is the link and there's a link to a JIRA...
 https://stackoverflow.com/questions/64697973/java-lang-outofmemoryerror-direct-buffer-memory-error-while-listening-kafka-top

 taskmanager.memory.flink.size: 16384m
 taskmanager.memory.jvm-metaspace.size: 3072m

 My task managers are 32GB each.


 On Fri, Aug 23, 2024 at 11:21 AM Yaroslav Tkachenko <
 yaros...@goldsky.com> wrote:

> Hi John,
>
> I've experienced this issue recently; it's likely caused either by:
>
> - the size of the producer record batch, it can be reduced by
> configuring lower linger.ms and batch.size values
> - the size of an individual record
>
>
> On Fri, Aug 23, 2024 at 7:20 AM Ahmed Hamdy 
> wrote:
>
>> Why do you believe it is an SSL issue?
>> The error trace seems like a memory issue. you could refer to
>> taskmanager memory setup guide[1].
>>
>> 1-
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/
>>
>> Best Regards
>> Ahmed Hamdy
>>
>>
>> On Fri, 23 Aug 2024 at 13:47, John Smith 
>> wrote:
>>
>>> I'm pretty sure it's not SSL is there a way to confirm, since the
>>> take does work. And/or is there other settings I can try?
>>>
>>> On Thu, Aug 22, 2024, 11:06 AM John Smith 
>>> wrote:
>>>
 Hi getting this exception, a lot of resources online point to an
 SSL misconfiguration.

 We are NOT using SSL. Neither on the broker or the consumer side.
 Our jobs work absolutely fine as in the flink task is able to consume 
 from
 kafka parse the json and then push it to the JDBC database sink.

 I would assume if SSL was enabled on one side or the other that the
 records would be completely mangled and unparsable from not being able 
 to
 encrypt/decrypt. Also this seems to happen about once a week.

 2024-08-22 10:17:09
 java.lang.RuntimeException: One or more fetchers have encountered
 exception
 at
 org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
 at
 org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
 at
 org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
 at
 org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:351)
 at
 org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 at
 org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
 at
 org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
 at
 org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
 at
 org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
 at java.base/java.lang.Thread.run(Thread.java:829)
 Caused by: java.lang.OutOfMemoryError: Direc

Re: setUidHash not working (???)

2024-09-24 Thread Gabor Somogyi
Now I see what you're doing. In general you should look for the UID pair in
the vertex, like this (several operators can belong to a vertex):

Optional pair =
vertex.get().getOperatorIDs().stream()
.filter(o ->
o.getUserDefinedOperatorID().get().toString().equals(uidHash))
.findFirst();

assertEquals(uidHash, pair.get().getUserDefinedOperatorID().get().toString());


Please load your savepoint with SavepointLoader and analyze what kind of
operators are inside and why is your app blowing up.

G


On Tue, Sep 24, 2024 at 9:29 AM Gabor Somogyi 
wrote:

> Hi Salva,
>
> Which version is this?
>
> BR,
> G
>
>
> On Mon, Sep 23, 2024 at 8:46 PM Salva Alcántara 
> wrote:
>
>> I have a pipeline where I'm trying to add a new operator. The problem is
>> that originally I forgot to specify the uid for one source. To remedy this,
>> I'm using setUidHash, providing the operator id to match that in my
>> savepoints / current job graph.
>>
>> Unless I'm doing something wrong, what I've observed is that the value
>> provided to setUidHash is completely ignored by Flink. I've tried to
>> summarize it in the following test:
>>
>> ```java
>>   @Test
>>   public void testSetUidHash() throws Exception {
>> var uidHash = "cbc357ccb763df2852fee8c4fc7d55f2";
>> env.fromElements(1, 2, 3)
>> .name("123")
>> .setUidHash(uidHash)
>> //.uid("123")
>> .print()
>> .name("print")
>> .uid("print");
>>
>> var vertex = StreamSupport
>>
>> .stream(env.getStreamGraph().getJobGraph().getVertices().spliterator(),
>> false)
>> .filter(v -> v.getName().equals("Source: 123"))
>> .findFirst()
>> .orElseThrow();
>>
>> assertEquals(uidHash, vertex.getID());
>> // Fails with java.lang.AssertionError (if setUidHash is used):
>> // expected: but
>> was:
>> // Interestingly, the actual value does not even depend on the
>> provided value (uidHash)!
>>
>> // assertEquals("6a7f660d1b2d5b9869cf0ecee3a17e42", vertex.getID());
>> // Passes (if uid is used instead)
>>   }
>> ```
>>
>> Can anyone confirm whether this is a bug?
>>
>> Regards,
>>
>> Salva
>>
>


Re: setUidHash not working (???)

2024-09-24 Thread Gabor Somogyi
Hi Salva,

Which version is this?

BR,
G


On Mon, Sep 23, 2024 at 8:46 PM Salva Alcántara 
wrote:

> I have a pipeline where I'm trying to add a new operator. The problem is
> that originally I forgot to specify the uid for one source. To remedy this,
> I'm using setUidHash, providing the operator id to match that in my
> savepoints / current job graph.
>
> Unless I'm doing something wrong, what I've observed is that the value
> provided to setUidHash is completely ignored by Flink. I've tried to
> summarize it in the following test:
>
> ```java
>   @Test
>   public void testSetUidHash() throws Exception {
> var uidHash = "cbc357ccb763df2852fee8c4fc7d55f2";
> env.fromElements(1, 2, 3)
> .name("123")
> .setUidHash(uidHash)
> //.uid("123")
> .print()
> .name("print")
> .uid("print");
>
> var vertex = StreamSupport
>
> .stream(env.getStreamGraph().getJobGraph().getVertices().spliterator(),
> false)
> .filter(v -> v.getName().equals("Source: 123"))
> .findFirst()
> .orElseThrow();
>
> assertEquals(uidHash, vertex.getID());
> // Fails with java.lang.AssertionError (if setUidHash is used):
> // expected: but
> was:
> // Interestingly, the actual value does not even depend on the
> provided value (uidHash)!
>
> // assertEquals("6a7f660d1b2d5b9869cf0ecee3a17e42", vertex.getID());
> // Passes (if uid is used instead)
>   }
> ```
>
> Can anyone confirm whether this is a bug?
>
> Regards,
>
> Salva
>


Re: Slack invitation link !!!

2024-09-22 Thread Hang Ruan
Hi, Apollo.

Here is an invitation link :
https://join.slack.com/t/apache-flink/shared_invite/zt-2r0p0hgp2-N1uS3hwwq6g33bsrRqvsjw

Best,
Hang

Apollo Elon  于2024年9月21日周六 21:41写道:

> Could you please send me an invitation link to the Flink Slack community
> ?Thanks.
>


Re: Flink 1.15.4 JobGraphs not compatible with 1.19.1

2024-09-22 Thread David Morávek
Hi Sachin,

JobGraph is not meant to be compatible between Flink releases. To upgrade,
please go through the regular process using Savepoints [1].

Best,
D.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/ops/state/savepoints/

On Wed, Sep 18, 2024 at 5:30 AM Sachin Sharma 
wrote:

> Hi,
>
> We are running flink applications on Flink versions 1.15.4 and now we want
> to migrate them to 1.19.1 version. When I am trying to start the
> application on a newer version, Flink is complaining about an internal
> change in the data structure of JobVertex. getting below error:
>
> JobVertex
>
> *1.19.1:*
> private final Map results =
> new LinkedHashMap<>();
>
> *1.15.4:*
> private final ArrayList results = new ArrayList<>();
>
> java.lang.ClassCastException: cannot assign instance of
>> java.util.ArrayList to field
>> org.apache.flink.runtime.jobgraph.JobVertex.results of type java.util.Map
>> in instance of org.apache.flink.runtime.jobgraph.JobVertex\n\tat
>> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(Unknown
>> Source)\n\tat
>> java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(Unknown
>> Source)\n\tat java.io.ObjectStreamClass.checkObjFieldValueTypes(Unknown
>> Source)\n\tat java.io.ObjectInputStream.defaultCheckFieldValues(Unknown
>> Source)\n\tat java.io.ObjectInputStream.readSerialData(Unknown
>> Source)\n\t... 23 frames truncated\n\t... 10 common frames omitted\nWrapped
>> by: org.apache.flink.util.FlinkRuntimeException: Could not recover job with
>> job id .\n\tat
>> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.tryRecoverJob(SessionDispatcherLeaderProcess.java:183)\n\tat
>> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:150)\n\tat
>> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.lambda$recoverJobsIfRunning$2(SessionDispatcherLeaderProcess.java:139)\n\tat
>> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)\n\tat
>> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:139)\n\t...
>> 1 frames truncated\n\t... 5 common frames omitted\nWrapped by:
>> java.util.concurrent.CompletionException:
>> org.apache.flink.util.FlinkRuntimeException: Could not recover job with job
>> id .\n\tat
>> java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
>> Source)\n\tat
>> java.util.concurrent.CompletableFuture.completeThrowable(Unknown
>> Source)\n\tat
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown
>> Source)\n\tat java.util.concurrent.CompletableFuture$Completion.run(Unknown
>> Source)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
>> Source)\n\t... 2 frames truncated
>>
>
> What should I do to make it compatible?
>
> Thanks & Regards,
> Sachin Sharma
> +1-669-278-5239
>
>
>


Re: Add customized metrics in Sink

2024-09-19 Thread Péter Váry
Hi Tengda,
Which sink version do you use?
I have recently (Flink 1.18, 1.19) added the metrics to the initialization
context for the committers for the new SinkV2 API [1]. With this you have a
place to collect the desired metrics.

I hope this helps,
Peter

[1]
https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=263430387#content/view/263430387

On Thu, Sep 19, 2024, 23:59 Tang, Tengda via user 
wrote:

> Hi Flink team,
>
>
>
> We want to add customized metrics for measuring “user-facing/end-to-end
> latency” in Flink application.
>
> Per this post
> https://stackoverflow.com/questions/56578919/latency-monitoring-in-flink-application,
> the suggestion is to add a histogram metric in the sink operator myself,
> which depicts the difference between the current processing time and the
> event time to get a distribution of the event time lag at the source.
>
> However, how can I add this customized metric in sink operator?
>
>
>
> I know I could adding the customized metrics using  RichSinkFunction, but
> in this case, I cannot benefits from the internal Sink operators like
> DynamoDB sink
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/dynamodb/
>
> And it’s also recommended to use these Internal Sink operations.
>
>
>
> Is there any way I could add histogram metric into Sink operators like
> DynamoDB sink?
>
>
>
>
>
> Thanks,
>
> Tengda
> This message is confidential and subject to terms at:
> https://www.jpmorgan.com/emaildisclaimer including on confidential,
> privileged or legal entity information, malicious content and monitoring of
> electronic messages. If you are not the intended recipient, please delete
> this message and notify the sender immediately. Any unauthorized use is
> strictly prohibited.
>


Re: unexpected high mem. usage / potential config misinterpretation

2024-09-18 Thread Simon Frei
Hi Alexis,

Thanks for chiming in. To my knowledge and experience that’s not how it works – 
we can consistently succeed to run this job in batch mode on less data, but 
still considerably more data than would fit in memory. Also even on the full 
data, the initial source tasks always succeed. To my understanding flink in 
batch mode writes all the data produced by a source/operator to disk. It even 
can run on less memory than the equivalent streaming job, as it can process a 
small part of the data in a partition at once from/to disk, without having to 
keep the state of all the data at once in memory. At least that’s my 
observation and understanding of e.g. this documentation: 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/execution_mode/#batch-execution-mode

Best,
Simon

From: Alexis Sarda-Espinosa 
Date: Wednesday, 18 September 2024 at 19:45
To: Simon Frei 
Cc: user 
Subject: Re: unexpected high mem. usage / potential config misinterpretation

Hi Simon,

I hope someone corrects me if I'm wrong, but just based on "batch mode 
processing terabytes of data", I feel batch mode may be the issue. I am under 
the impression that batch mode forces everything emitted by the sources to RAM 
before any downstream operators do anything, so even if each parallel task of 
your source runs in a different task manager and loads a subset of your data, 
they might each end up trying to load terabytes into RAM.

Hopefully someone more knowledgeable about batch mode can comment.

Regards,
Alexis.

On Wed, 18 Sept 2024, 18:04 Simon Frei, 
mailto:simon.f...@spire.com>> wrote:
Hi,

tl;dr:
Flink batch streaming API job resident memory usage grows far beyond 
expectations, resulting in system OOM kill/JVM native memory allocation failure 
- would appreciate a look over our config/assumptions to potentially spot any 
obvious mistakes.

Longer form:

My colleague and I are troubleshooting a large batch job for a long time, and 
still experience behaviour around flinks memory usage we cannot explain. My 
hope is that by explaining our configuration and observations, someone can spot 
a misconception. And in the ideal case I can then send a PR for the 
documentation to hopefully make that misconception less likely for other users.

I'll start with an overview/"story-like" form, and then below that are some 
numbers/configs.

This is a streaming job run in batch mode, processing terabytes of data 
sourcing and sinking to compressed files in S3. In between there are a few 
simple decoding and filter operations, then two processors with our main 
business logic and finally a few simple transformations and reduce steps. While 
reduce and sink writer tasks run, we encounter much more resident memory usage 
of the flink TM java process than expected from configuration, i.e. higher than 
the configured process memory. And that leads to failures, either the system 
OOM killer intervening or the JVM not being able to mmap. I know that the 
writers do use native memory, e.g. for avro deflate compression, which is a 
native method. Also the IO likely uses some native memory. We now configure 5g 
of task off-heap memory to compensate for any such native memory usage, but 
still encounter higher memory usage. Even 5g seems way too much for some 
compression buffers and IO, let alone more than that. So currently my main 
theory is that I misunderstand something about the memory related config. E.g. 
that task slots factor into used/allocated memory.

We during the late stages of the job, i.e. during reduce and sink operations, 
we observe much higher memory usage than expected. The increase in memory usage 
isn't happening slowly, gradually over time, but quickly when those tasks 
start. This is an example of ps output for one TM:

PID %CPU %MEMVSZ   RSS TTY  STAT START   TIME COMMAND
14961  130 94.8 32213536 30957104 ?   Sl   Sep12 7453:03 
/usr/lib/jvm/java-11-amazon-corretto.aarch64/bin/java -Xmx7435661840 
-Xms7435661840 -XX:MaxDirectMemorySize=9663676416 
-XX:MaxMetaspaceSize=268435456 
-Dlog.file=/var/log/hadoop-yarn/containers/application_1724829444792_0007/container_1724829444792_0007_01_000330/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.network.min=2147483648b -D taskmanager.cpu.cores=4.0 -D 
taskmanager.memory.task.off-heap.size=5368709120b -D 
taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D 
taskmanager.memory.jvm-overhead.min=1073741824b -D 
taskmanager.memory.framework.off-heap.size=2147483648b -D 
taskmanager.memory.network.max=2147483648b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=7328288240b -D 
taskmanager.memory.task.heap.size=7301444112b -D 
taskmanager.numberOfTaskSlots=4 -D 
taskmanager.memory

Re: unexpected high mem. usage / potential config misinterpretation

2024-09-18 Thread Alexis Sarda-Espinosa
Hi Simon,

I hope someone corrects me if I'm wrong, but just based on "batch mode
processing terabytes of data", I feel batch mode may be the issue. I am
under the impression that batch mode forces everything emitted by the
sources to RAM before any downstream operators do anything, so even if each
parallel task of your source runs in a different task manager and loads a
subset of your data, they might each end up trying to load terabytes into
RAM.

Hopefully someone more knowledgeable about batch mode can comment.

Regards,
Alexis.


On Wed, 18 Sept 2024, 18:04 Simon Frei,  wrote:

> Hi,
>
>
>
> tl;dr:
>
> Flink batch streaming API job resident memory usage grows far beyond
> expectations, resulting in system OOM kill/JVM native memory allocation
> failure - would appreciate a look over our config/assumptions to
> potentially spot any obvious mistakes.
>
>
>
> Longer form:
>
>
>
> My colleague and I are troubleshooting a large batch job for a long time,
> and still experience behaviour around flinks memory usage we cannot
> explain. My hope is that by explaining our configuration and observations,
> someone can spot a misconception. And in the ideal case I can then send a
> PR for the documentation to hopefully make that misconception less likely
> for other users.
>
>
>
> I'll start with an overview/"story-like" form, and then below that are
> some numbers/configs.
>
>
>
> This is a streaming job run in batch mode, processing terabytes of data
> sourcing and sinking to compressed files in S3. In between there are a few
> simple decoding and filter operations, then two processors with our main
> business logic and finally a few simple transformations and reduce steps.
> While reduce and sink writer tasks run, we encounter much more resident
> memory usage of the flink TM java process than expected from configuration,
> i.e. higher than the configured process memory. And that leads to failures,
> either the system OOM killer intervening or the JVM not being able to mmap.
> I know that the writers do use native memory, e.g. for avro deflate
> compression, which is a native method. Also the IO likely uses some native
> memory. We now configure 5g of task off-heap memory to compensate for any
> such native memory usage, but still encounter higher memory usage. Even 5g
> seems way too much for some compression buffers and IO, let alone more than
> that. So currently my main theory is that I misunderstand something about
> the memory related config. E.g. that task slots factor into used/allocated
> memory.
>
>
>
> We during the late stages of the job, i.e. during reduce and sink
> operations, we observe much higher memory usage than expected. The increase
> in memory usage isn't happening slowly, gradually over time, but quickly
> when those tasks start. This is an example of ps output for one TM:
>
>
>
> PID %CPU %MEMVSZ   RSS TTY  STAT START   TIME COMMAND
>
> 14961  130 94.8 32213536 30957104 ?   Sl   Sep12 7453:03
> /usr/lib/jvm/java-11-amazon-corretto.aarch64/bin/java -Xmx7435661840
> -Xms7435661840 -XX:MaxDirectMemorySize=9663676416
> -XX:MaxMetaspaceSize=268435456
> -Dlog.file=/var/log/hadoop-yarn/containers/application_1724829444792_0007/container_1724829444792_0007_01_000330/taskmanager.log
> -Dlog4j.configuration=file:./log4j.properties
> -Dlog4j.configurationFile=file:./log4j.properties
> org.apache.flink.yarn.YarnTaskExecutorRunner -D
> taskmanager.memory.network.min=2147483648b -D taskmanager.cpu.cores=4.0 -D
> taskmanager.memory.task.off-heap.size=5368709120b -D
> taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none
> -D taskmanager.memory.jvm-overhead.min=1073741824b -D
> taskmanager.memory.framework.off-heap.size=2147483648b -D
> taskmanager.memory.network.max=2147483648b -D
> taskmanager.memory.framework.heap.size=134217728b -D
> taskmanager.memory.managed.size=7328288240b -D
> taskmanager.memory.task.heap.size=7301444112b -D
> taskmanager.numberOfTaskSlots=4 -D
> taskmanager.memory.jvm-overhead.max=1073741824b --configDir .
> -Djobmanager.rpc.address=ip-172-30-119-251.us-west-2.compute.internal
> -Dweb.port=0 -Djobmanager.memory.off-heap.size=134217728b
> -Dweb.tmpdir=/tmp/flink-web-81fb345d-de64-4002-bd78-4454ca901566
> -Djobmanager.rpc.port=42195
> -Drest.address=ip-172-30-119-251.us-west-2.compute.internal
> -Djobmanager.memory.jvm-overhead.max=322122552b
> -Djobmanager.memory.jvm-overhead.min=322122552b
> -Dtaskmanager.resource-id=container_1724829444792_0007_01_000330
> -Dinternal.taskmanager.resource-id.metadata=ip-172-30-116-113.us-west-2.compute.internal:8041
> -Djobmanager.memory.jvm-metaspace.size=268435456b
> -Djobmanager.memory.heap.size=2496449736b
>
>
>
> For easier readability:
>
> RSS = 30957104kB = 29.5GB
>
>
>
> Then this is the relevant bit of our config file. It includes explanations
> of how we came up with those numbers, so here's where I hope someone can
> quickly tell me where I am wrong :)
>
>
>
> # We configure yarn to pr

Re: Flink 1.19.1 Java 17 Compatibility

2024-09-17 Thread Sachin Sharma
Thanks Zhanghao and Andreas.

Thanks & Regards,
Sachin Sharma
+1-669-278-5239




On Tue, Sep 17, 2024 at 6:04 AM Zhanghao Chen 
wrote:

> In our production environment, it works fine.
>
> Best,
> Zhanghao Chen
> --
> *From:* Sachin Sharma 
> *Sent:* Friday, September 13, 2024 1:19
> *To:* Oscar Perez via user 
> *Subject:* Flink 1.19.1 Java 17 Compatibility
>
> Hi,
>
> We are planning to use Flink 1.19.1 with kubernetes operator, I wanted to
> check if 1.19.1 is Java 17 compatible or not. In documentation it says in
> version 1.18 we added experimental support for it but nothing concrete is
> said whether it supports Java 17 completely.
>
> Thanks & Regards,
> Sachin Sharma
> +1-669-278-5239
>
>
>


Re: Recommendations for avoid Kryo

2024-09-17 Thread Zhanghao Chen
Hi Lasse,

You may use define a type information factory with config [1] starting from 
v1.20. We are also working on extending the Flink type system to cover basic 
generic collection types and avoid falling back to Kryo.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory

Best,
Zhanghao Chen

From: Lasse Nedergaard 
Sent: Friday, September 13, 2024 14:27
To: user 
Subject: Recommendations for avoid Kryo

Hi.

I was wondering how others handle the situation where you flink job has to read 
or write a data structure in json mapped to a pojo defined in an external 
library and these external pojo’s contains data types, like uuid, generic list 
etc, that you need to annotate with type information in order to avoid Kryo 
serialisation.

Is best practice to reimplement the pojo’s in the flink job with the type 
annotation? This solution works but then you have the definition twice and when 
changes occur you have to remember to change it both places.

Or do we have other options.

What is the recommended what of handling this problem?

Med venlig hilsen / Best regards
Lasse Nedergaard



Re: Flink 1.19.1 Java 17 Compatibility

2024-09-17 Thread Zhanghao Chen
In our production environment, it works fine.

Best,
Zhanghao Chen

From: Sachin Sharma 
Sent: Friday, September 13, 2024 1:19
To: Oscar Perez via user 
Subject: Flink 1.19.1 Java 17 Compatibility

Hi,

We are planning to use Flink 1.19.1 with kubernetes operator, I wanted to check 
if 1.19.1 is Java 17 compatible or not. In documentation it says in version 
1.18 we added experimental support for it but nothing concrete is said whether 
it supports Java 17 completely.

Thanks & Regards,
Sachin Sharma
+1-669-278-5239




Re: TaskManager Using more CPU Cores in Kubernetes Deployment than configured

2024-09-17 Thread Zhanghao Chen
Flink relies on JDK's methods to get the amount of resources available for an 
TM. If you are using an outdated version of JDK (<8u191 for JDK8 with cgroup-v1 
based containerization for example), the exact amount of resources available in 
a containerized environment cannot be retrieved, and the resources of the 
underlying host are returned instead. Check [1] for more information.

[1] https://jvmaware.com/container-aware-jvm/

Best,
Zhanghao Chen

From: Oliver Schmied 
Sent: Wednesday, September 11, 2024 16:49
To: user@flink.apache.org 
Subject: TaskManager Using more CPU Cores in Kubernetes Deployment than 
configured


Dear Flink Community,

I am currently running Apache Flink Stateful Functions (version 3.2.0) on a 
Kubernetes cluster. I’ve configured the statefun-master and statefun-worker 
deployments with the following resource limits:

resources:

requests:

memory: "2Gi"

cpu: "1"

limits:

memory: "2Gi"

cpu: "1"



However, when I check the Flink UI (see screenshot in attachment), I see that 
the statefun-worker TaskManager is using 8 CPU cores, despite the fact that 
I’ve set the CPU requests and limits to 1 core in the Kubernetes deployment. 
Below are the relevant settings in the flink-conf.yaml file:

taskmanager.numberOfTaskSlots: 2
classloader.parent-first-patterns.additional: 
org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
state.backend: rocksdb
state.backend.rocksdb.timer-service.factory: ROCKSDB
state.backend.incremental: true
parallelism.default: 1

jobmanager.memory.process.size: 2g
taskmanager.memory.process.size: 2g

kubernetes.taskmanager.cpu: 1
kubernetes.jobmanager.cpu: 1
taskmanager.cpu.cores: 1



I would appreciate any guidance on why Flink might be utilizing more CPU cores 
than specified and how I can ensure that the TaskManager adheres to the 
configured resource limits.

Any insights or suggestions would be greatly appreciated.

Best regards,
Oliver


Re: genericRecord serialisation fail with npe when logicalType is used

2024-09-12 Thread Nicolas Paris
I was able to fix with the bellow modifications. Turns out flink can
encode genericRecords to avro when the schema is provided (returns
chained). Also the kryo stuff is not anymore needed then.


@@ -22,14 +22,12 @@
 env.fromSource(source,
 WatermarkStrategy.noWatermarks(),
 "Generator Source");
-// this for being able to map the stream
-Class unmodColl =
Class.forName("java.util.Collections$UnmodifiableCollection");
-env.getConfig().addDefaultKryoSerializer(unmodColl,
UnmodifiableCollectionsSerializer.class);
-
  stream.map(record -> {
   GenericRecord newRecord = new GenericData.Record(abSchema);
   newRecord.put("a", 1L);
   newRecord.put("b", "bar");
   return newRecord;
-}).sinkTo(sink);
+})
+ .returns(new GenericRecordAvroTypeInfo(abSchema))
+ .sinkTo(sink);
 env.execute();



  1   2   3   4   5   6   7   8   9   10   >