Re: Data & Task distribution among the available Nodes

2023-06-29 Thread Shammon FY
Hi Mahmoud,

For the third quest, currently flink uses Fine-Grained Resource Management
to choose a TM for tasks, you can refer to the doc [1] for more information.


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/finegrained_resource/

Best,
Shammon FY


On Thu, Jun 29, 2023 at 4:17 PM Martijn Visser 
wrote:

> Hi Mahmoud,
>
> While it's not an answer to your questions, I do want to point out
> that the DataSet API is deprecated and will be removed in a future
> version of Flink. I would recommend moving to either the Table API or
> the DataStream API.
>
> Best regards,
>
> Martijn
>
> On Thu, Jun 22, 2023 at 6:14 PM Mahmoud Awad 
> wrote:
> >
> > Hello everyone,
> >
> > I am trying to understand the mechanism by which Flink distributed the
> data and the tasks among the nodes/task managers in the cluster, assuming
> all TMs have equal resources. I am using the DataSet API on my own machine.
> > I will try to address the issue with the following questions :
> >
> > -When we  firstly read the data from the source(Text,CSV..etc.), How
> does Flink ensures the fairly distribution of data from the source to the
> next subtask ?
> >
> > -Are there any preferences by which Flink will prefer a task manager on
> the other(assuming all task managers have equal resources) ?
> >
> > - Based on what, will Flink choose to deploy a specific task in a
> specific task manager ?
> >
> > I hope I was able to explain my point, thank you in advanced.
> >
> > Best regards
> > Mahmoud
> >
> >
> >
> > Gesendet von Mail für Windows
> >
> >
>


RE: Join two streams

2023-06-29 Thread Schwalbe Matthias
Привет Иван,

The source of your problem is quite easy:
- If you do windowing by event time, all the sources need to emit watermarks.
- watermarks are the logical clock used when event-time timing
- you could use either processing time windows, or adjust watermark strategy of 
your sources accordingly

... didn't check other potential sources of troubles in your code

Hope this helps

Thias


-Original Message-
From: Иван Борисов  
Sent: Freitag, 30. Juni 2023 05:45
To: user@flink.apache.org
Subject: Join two streams

Hello,
plz help me, I can't join two streams. In the joined stream I've got zero 
messages and can't understand why?

Kafka Topics:
1st stream
topic1: {'data': {'temp':25.2, 'sensore_name': 'T1', 'timestamp':
123123131}, 'compare_with': 'T2'}
2nd stream
topic2: {'data': {'temp':28, 'sensore_name': 'T2', 'timestamp':
53543543}, 'compare_with': 'T1'}


DataStreamT1_Stream = env.fromSource( T1_Source, 
WatermarkStrategy.noWatermarks(),
"T1 Stream");

DataStream T2_Stream = env.fromSource( T2_Source, 
WatermarkStrategy.noWatermarks(),
"T2 Stream");

DataStream comparisonStream = T1_Stream
.join(T2_Stream)
.where(T1 -> T1.getCompare_with())
.equalTo(T2 -> T2.getSensor_Name())
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.apply((JoinFunction) (T1, T2) -> { double firstValue = 
T1.getTemp(); double secondValue = T2.getTemp(); double m = 
firstValue-secondValue; return m; }); 
comparisonStream.writeAsText("/tmp/output_k.txt",
org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);

And my file is empty!
What am I do wrong?

--
Yours truly, Ivan Borisov  |  С уважением, Иван Борисов
mob./WhatsApp: 7 913  088 8882
Telegram: @Ivan_S_Borisov
Skype: ivan.s.borisov
e-mail: ivan.s.bori...@gmail.com
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Re: Join two streams

2023-06-29 Thread Shrihari R
Hey Иван,

Use *TumblingProcessingTimeWindows* instead of TumblingEventTimeWindows.
TumblingEventTimeWindows requires a watermark strategy.


*Ref*
https://stackoverflow.com/questions/72291659/flink-tumbling-window-is-not-triggered-no-watermark-strategy

*Regards*
Shrihari

On Fri, Jun 30, 2023 at 9:16 AM Иван Борисов 
wrote:

> Hello,
> plz help me, I can't join two streams. In the joined stream I've got
> zero messages and can't understand why?
>
> Kafka Topics:
> 1st stream
> topic1: {'data': {'temp':25.2, 'sensore_name': 'T1', 'timestamp':
> 123123131}, 'compare_with': 'T2'}
> 2nd stream
> topic2: {'data': {'temp':28, 'sensore_name': 'T2', 'timestamp':
> 53543543}, 'compare_with': 'T1'}
>
>
> DataStreamT1_Stream = env.fromSource(
> T1_Source,
> WatermarkStrategy.noWatermarks(),
> "T1 Stream");
>
> DataStream T2_Stream = env.fromSource(
> T2_Source,
> WatermarkStrategy.noWatermarks(),
> "T2 Stream");
>
> DataStream comparisonStream = T1_Stream
> .join(T2_Stream)
> .where(T1 -> T1.getCompare_with())
> .equalTo(T2 -> T2.getSensor_Name())
> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> .apply((JoinFunction) (T1, T2) -> {
> double firstValue = T1.getTemp();
> double secondValue = T2.getTemp();
> double m = firstValue-secondValue;
> return m;
> });
> comparisonStream.writeAsText("/tmp/output_k.txt",
> org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);
>
> And my file is empty!
> What am I do wrong?
>
> --
> Yours truly, Ivan Borisov  |  С уважением, Иван Борисов
> mob./WhatsApp: 7 913  088 8882
> Telegram: @Ivan_S_Borisov
> Skype: ivan.s.borisov
> e-mail: ivan.s.bori...@gmail.com
>

-- 


THIS EMAIL COMMUNICATION IS PRIVILEGED AND MAY CONTAIN CONFIDENTIAL 
INFORMATION OF RAPIDO. IF YOU ARE NOT THE INTENDED RECIPIENT, YOU ARE 
HEREBY NOTIFIED THAT YOU HAVE RECEIVED THIS MESSAGE IN ERROR AND ANY 
REVIEW, DISSEMINATION, DISTRIBUTION OR COPYING OF THIS MESSAGE IS STRICTLY 
PROHIBITED. PLEASE NOTIFY US IMMEDIATELY BY EMAIL AND DELETE THE MESSAGE 
FROM YOUR SYSTEM.

NOTHING CONTAINED IN THIS DISCLAIMER SHALL BE 
CONSTRUED IN ANY WAY TO GRANT PERMISSION TO TRANSMIT CONFIDENTIAL 
INFORMATION OR AS A WAIVER OF ANY CONFIDENTIALITY OR PRIVILEGE.

RAPIDO 
DOES NOT ACCEPT ANY RESPONSIBILITY OR LIABILITY ARISING FROM THE USE OF 
THIS COMMUNICATION. NO REPRESENTATION IS BEING MADE THAT THE INFORMATION 
PRESENTED IS ACCURATE, CURRENT OR COMPLETE AND SUCH INFORMATION IS AT ALL 
TIMES SUBJECT TO CHANGE WITHOUT NOTICE


Join two streams

2023-06-29 Thread Иван Борисов
Hello,
plz help me, I can't join two streams. In the joined stream I've got
zero messages and can't understand why?

Kafka Topics:
1st stream
topic1: {'data': {'temp':25.2, 'sensore_name': 'T1', 'timestamp':
123123131}, 'compare_with': 'T2'}
2nd stream
topic2: {'data': {'temp':28, 'sensore_name': 'T2', 'timestamp':
53543543}, 'compare_with': 'T1'}


DataStreamT1_Stream = env.fromSource(
T1_Source,
WatermarkStrategy.noWatermarks(),
"T1 Stream");

DataStream T2_Stream = env.fromSource(
T2_Source,
WatermarkStrategy.noWatermarks(),
"T2 Stream");

DataStream comparisonStream = T1_Stream
.join(T2_Stream)
.where(T1 -> T1.getCompare_with())
.equalTo(T2 -> T2.getSensor_Name())
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.apply((JoinFunction) (T1, T2) -> {
double firstValue = T1.getTemp();
double secondValue = T2.getTemp();
double m = firstValue-secondValue;
return m;
});
comparisonStream.writeAsText("/tmp/output_k.txt",
org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);

And my file is empty!
What am I do wrong?

-- 
Yours truly, Ivan Borisov  |  С уважением, Иван Борисов
mob./WhatsApp: 7 913  088 8882
Telegram: @Ivan_S_Borisov
Skype: ivan.s.borisov
e-mail: ivan.s.bori...@gmail.com


Re: Flink TCP custom source - secured server socket

2023-06-29 Thread Jan Lukavský

> ... a state backward in (processing) time ...
(of course not processing, I meant to say event time)

On 6/29/23 14:45, Jan Lukavský wrote:


Hi Kamal,

you probably have several options:

 a) bundle your private key and certificate into your Flink 
application's jar (not recommended, your service's private key will 
have to be not exactly "private")
 b) create a service which will provide certificate for your service 
during runtime (e.g. ACME based or similar)


I have a different note, though. Flink (or for that matters any 
streaming engine, I'm more focused on Apache Beam) heavily relies on 
the ability of sources to restore a state backward in (processing) 
time. That is definitely not the case of a plain TCP socket. It is 
likely you will experience data-loss issues with this solution 
(regardless of security). This might be okay for you, I just felt it 
would be good to stress this out.


Best,

 Jan

On 6/29/23 12:53, Kamal Mittal via user wrote:


Hello Community,

I have created TCP stream custom source and reading data from TCP 
stream source.


But this TCP connection needs to be secured i.e. SSL based, query is 
how to configure/provide certificates via Flink for Client-Server 
secured TCP connection?


Rgds,

Kamal


Re: Flink TCP custom source - secured server socket

2023-06-29 Thread Jan Lukavský

Hi Kamal,

you probably have several options:

 a) bundle your private key and certificate into your Flink 
application's jar (not recommended, your service's private key will have 
to be not exactly "private")
 b) create a service which will provide certificate for your service 
during runtime (e.g. ACME based or similar)


I have a different note, though. Flink (or for that matters any 
streaming engine, I'm more focused on Apache Beam) heavily relies on the 
ability of sources to restore a state backward in (processing) time. 
That is definitely not the case of a plain TCP socket. It is likely you 
will experience data-loss issues with this solution (regardless of 
security). This might be okay for you, I just felt it would be good to 
stress this out.


Best,

 Jan

On 6/29/23 12:53, Kamal Mittal via user wrote:


Hello Community,

I have created TCP stream custom source and reading data from TCP 
stream source.


But this TCP connection needs to be secured i.e. SSL based, query is 
how to configure/provide certificates via Flink for Client-Server 
secured TCP connection?


Rgds,

Kamal


Re: Kafka source with idleness and alignment stops consuming

2023-06-29 Thread Alexis Sarda-Espinosa
BTW, it seems I spoke too soon in my previous email. I left the job running
overnight with each source having its own alignment group to evaluate only
per-split alignment, and I can see that eventually some partitions never
resumed consumption and the consumer lag increased.

Regards,
Alexis.

Am Do., 29. Juni 2023 um 10:08 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Hi Martjin,
>
> thanks for the pointers. I think the issue I'm seeing is not caused by
> those because in my case the watermarks are not negative. Some more
> information from my setup in case it's relevant:
>
> - All Kafka topics have 6 partitions.
> - Job parallelism is 2, but 2 of the Kafka sources are hard-coded to
> parallelism=1.
>
> Regards,
> Alexis.
>
> Am Do., 29. Juni 2023 um 10:00 Uhr schrieb Martijn Visser <
> martijnvis...@apache.org>:
>
>> Hi Alexis,
>>
>> There are a couple of recent Flink tickets on watermark alignment,
>> specifically https://issues.apache.org/jira/browse/FLINK-32414 and
>> https://issues.apache.org/jira/browse/FLINK-32420 - Could the later be
>> also applicable in your case?
>>
>> Best regards,
>>
>> Martijn
>>
>> On Wed, Jun 28, 2023 at 11:33 AM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> just for completeness, I don't see the problem if I assign a different
>>> alignment group to each source, i.e. using only split-level watermark
>>> alignment.
>>>
>>> Regards,
>>> Alexis.
>>>
>>> Am Mi., 28. Juni 2023 um 08:13 Uhr schrieb haishui :
>>>
 Hi,
 I have the same trouble. This is really a bug.
 `shouldWaitForAlignment` needs to be another change.

 By the way, a source will be marked as idle, when the source has
 waiting for alignment for a long time. Is this a bug?






 在 2023-06-27 23:25:38,"Alexis Sarda-Espinosa" 
 写道:

 Hello,

 I am currently evaluating idleness and alignment with Flink 1.17.1 and
 the externalized Kafka connector. My job has 3 sources whose watermark
 strategies are defined like this:

 WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
 .withIdleness(idleTimeout)
 .withWatermarkAlignment("group", maxAllowedWatermarkDrift,
 Duration.ofSeconds(1L))

 The max allowed drift is currently 5 seconds, and my sources have an
 idleTimeout of 1, 1.5, and 5 seconds.

 What I observe is that, when I restart the job, all sources publish
 messages, but then 2 of them are marked as idle and never resume. I found
 https://issues.apache.org/jira/browse/FLINK-31632, which should be
 fixed in 1.17.1, but I don't think it's the same issue, my logs don't show
 negative values:

 2023-06-27 15:11:42,927 DEBUG
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
 reported watermark=Watermark @ 1687878696690 (2023-06-27 15:11:36.690) from
 subTaskId=1
 2023-06-27 15:11:43,009 DEBUG
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
 reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
 07:12:55.807) from subTaskId=0
 2023-06-27 15:11:43,091 DEBUG
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
 reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
 07:12:55.807) from subTaskId=0
 2023-06-27 15:11:43,116 DEBUG
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
 reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
 07:12:55.807) from subTaskId=0
 2023-06-27 15:11:43,298 INFO
  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
 Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0, 1]
 2023-06-27 15:11:43,304 INFO
  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
 Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
 2023-06-27 15:11:43,306 INFO
  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
 Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
 2023-06-27 15:11:43,486 INFO
  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
 Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
 2023-06-27 15:11:43,489 INFO
  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
 Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
 2023-06-27 15:11:43,492 INFO
  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
 Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]

 Does anyone know if I'm missing something or this is really a bug?

 Regards,
 Alexis.




Flink TCP custom source - secured server socket

2023-06-29 Thread Kamal Mittal via user
Hello Community,

I have created TCP stream custom source and reading data from TCP stream source.

But this TCP connection needs to be secured i.e. SSL based, query is how to 
configure/provide certificates via Flink for Client-Server secured TCP 
connection?

Rgds,
Kamal


Re: Data & Task distribution among the available Nodes

2023-06-29 Thread Martijn Visser
Hi Mahmoud,

While it's not an answer to your questions, I do want to point out
that the DataSet API is deprecated and will be removed in a future
version of Flink. I would recommend moving to either the Table API or
the DataStream API.

Best regards,

Martijn

On Thu, Jun 22, 2023 at 6:14 PM Mahmoud Awad  wrote:
>
> Hello everyone,
>
> I am trying to understand the mechanism by which Flink distributed the data 
> and the tasks among the nodes/task managers in the cluster, assuming all TMs 
> have equal resources. I am using the DataSet API on my own machine.
> I will try to address the issue with the following questions :
>
> -When we  firstly read the data from the source(Text,CSV..etc.), How does 
> Flink ensures the fairly distribution of data from the source to the next 
> subtask ?
>
> -Are there any preferences by which Flink will prefer a task manager on the 
> other(assuming all task managers have equal resources) ?
>
> - Based on what, will Flink choose to deploy a specific task in a specific 
> task manager ?
>
> I hope I was able to explain my point, thank you in advanced.
>
> Best regards
> Mahmoud
>
>
>
> Gesendet von Mail für Windows
>
>


[ANNOUNCE] Apache flink-connector-jdbc 3.1.1 released

2023-06-29 Thread Martijn Visser
The Apache Flink community is very happy to announce the release of
Apache flink-connector-jdbc v3.1.1. This version is compatible with
Flink 1.16 and Flink 1.17.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data
streaming applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12353281

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Regards,
Release Manager


Re: Kafka source with idleness and alignment stops consuming

2023-06-29 Thread Alexis Sarda-Espinosa
Hi Martjin,

thanks for the pointers. I think the issue I'm seeing is not caused by
those because in my case the watermarks are not negative. Some more
information from my setup in case it's relevant:

- All Kafka topics have 6 partitions.
- Job parallelism is 2, but 2 of the Kafka sources are hard-coded to
parallelism=1.

Regards,
Alexis.

Am Do., 29. Juni 2023 um 10:00 Uhr schrieb Martijn Visser <
martijnvis...@apache.org>:

> Hi Alexis,
>
> There are a couple of recent Flink tickets on watermark alignment,
> specifically https://issues.apache.org/jira/browse/FLINK-32414 and
> https://issues.apache.org/jira/browse/FLINK-32420 - Could the later be
> also applicable in your case?
>
> Best regards,
>
> Martijn
>
> On Wed, Jun 28, 2023 at 11:33 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> just for completeness, I don't see the problem if I assign a different
>> alignment group to each source, i.e. using only split-level watermark
>> alignment.
>>
>> Regards,
>> Alexis.
>>
>> Am Mi., 28. Juni 2023 um 08:13 Uhr schrieb haishui :
>>
>>> Hi,
>>> I have the same trouble. This is really a bug.
>>> `shouldWaitForAlignment` needs to be another change.
>>>
>>> By the way, a source will be marked as idle, when the source has waiting
>>> for alignment for a long time. Is this a bug?
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2023-06-27 23:25:38,"Alexis Sarda-Espinosa" 
>>> 写道:
>>>
>>> Hello,
>>>
>>> I am currently evaluating idleness and alignment with Flink 1.17.1 and
>>> the externalized Kafka connector. My job has 3 sources whose watermark
>>> strategies are defined like this:
>>>
>>> WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
>>> .withIdleness(idleTimeout)
>>> .withWatermarkAlignment("group", maxAllowedWatermarkDrift,
>>> Duration.ofSeconds(1L))
>>>
>>> The max allowed drift is currently 5 seconds, and my sources have an
>>> idleTimeout of 1, 1.5, and 5 seconds.
>>>
>>> What I observe is that, when I restart the job, all sources publish
>>> messages, but then 2 of them are marked as idle and never resume. I found
>>> https://issues.apache.org/jira/browse/FLINK-31632, which should be
>>> fixed in 1.17.1, but I don't think it's the same issue, my logs don't show
>>> negative values:
>>>
>>> 2023-06-27 15:11:42,927 DEBUG
>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>> reported watermark=Watermark @ 1687878696690 (2023-06-27 15:11:36.690) from
>>> subTaskId=1
>>> 2023-06-27 15:11:43,009 DEBUG
>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>>> 07:12:55.807) from subTaskId=0
>>> 2023-06-27 15:11:43,091 DEBUG
>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>>> 07:12:55.807) from subTaskId=0
>>> 2023-06-27 15:11:43,116 DEBUG
>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>>> 07:12:55.807) from subTaskId=0
>>> 2023-06-27 15:11:43,298 INFO
>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0, 1]
>>> 2023-06-27 15:11:43,304 INFO
>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
>>> 2023-06-27 15:11:43,306 INFO
>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
>>> 2023-06-27 15:11:43,486 INFO
>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>> 2023-06-27 15:11:43,489 INFO
>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>> 2023-06-27 15:11:43,492 INFO
>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>>
>>> Does anyone know if I'm missing something or this is really a bug?
>>>
>>> Regards,
>>> Alexis.
>>>
>>>


Re: PyFlink SQL from Kafka to Iceberg issues

2023-06-29 Thread Martijn Visser
Hi Dani,

There are two things that I notice:

1. You're mixing different Flink versions (1.16 and 1.17): all Flink
artifacts should be from the same Flink version
2. S3 plugins need to be added to the plugins folder of Flink, because they
are loaded via the plugin mechanism. See
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/

Best regards,

Martijn

On Sat, Jun 24, 2023 at 1:22 PM Dániel Pálma  wrote:

> Hey folks,
>
> Nice to meet ya'll!
>
> I'm trying to get the following stack up and running locally:
>
> - Kafka as source
> - pyFlink SQL
> - Iceberg on top of MinIO
>
> The goal is to have a pyflink script that reads data from a Kafka topic,
> does some transformations, and dumps it into an iceberg table.
>
> I have everything, except for the pyflink app running in Docker, defined
> in a docker-compose.yml:
>
> version: "3.7"
> services:
>
> mariadb:
> image: 'mariadb:latest'
> hostname: mariadb
> container_name: mariadb
> ports:
> - '3306:3306'
> environment:
> MYSQL_ROOT_PASSWORD: admin
> MYSQL_USER: admin
> MYSQL_PASSWORD: admin
> MYSQL_DATABASE: metastore_db
> volumes:
> - ./mariadb-data:/var/lib/mysql
> networks:
> iceberg_net:
>
> hive-metastore:
> hostname: hive-metastore
> container_name: hive-metastore
> build:
> context: hive
> ports:
> - '9083:9083'
> environment:
> METASTORE_DB_HOSTNAME: mariadb
> depends_on:
> - mariadb
> networks:
> iceberg_net:
>
> minio:
> hostname: "minio"
> image: "minio/minio:latest"
> container_name: "minio"
> ports:
> - "9001:9001"
> - "9000:9000"
> command:
> - "server"
> - "/data"
> - "--console-address"
> - ":9001"
> volumes:
> - "minio:/data"
> environment:
> MINIO_ROOT_USER: "minio"
> MINIO_ROOT_PASSWORD: "minio123"
> networks:
> iceberg_net:
> aliases:
> - iceberg.minio
>
> mc:
> depends_on:
> - "minio"
> image: "minio/mc"
> container_name: "mc"
> entrypoint: >
> /bin/sh -c "
> until (/usr/bin/mc config host add minio http://minio:9000 minio
> minio123) do echo "...waiting..." && sleep 1; done;
> /usr/bin/mc rm -r --force minio/iceberg;
> /usr/bin/mc mb minio/iceberg;
> /usr/bin/mc policy set public minio/iceberg;
> tail -f /dev/null
> "
> networks:
> iceberg_net:
>
> broker:
> image: confluentinc/cp-kafka:7.4.0
> hostname: broker
> container_name: broker
> depends_on:
> - controller
> ports:
> - "9092:9092"
> - "9101:9101"
> environment:
> KAFKA_NODE_ID: 1
> KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
> 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
> KAFKA_ADVERTISED_LISTENERS:
> 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
> KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
> KAFKA_JMX_PORT: 9101
> KAFKA_JMX_HOSTNAME: localhost
> KAFKA_PROCESS_ROLES: 'broker'
> KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
> KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://0.0.0.0:9092'
> KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
> KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
> KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
> # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh
> random-uuid"
> # See
> https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
> CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
> networks:
> iceberg_net:
>
> controller:
> image: confluentinc/cp-kafka:7.4.0
> hostname: controller
> container_name: controller
> ports:
> - "9093:9093"
> - "9102:9102"
> environment:
> KAFKA_NODE_ID: 2
> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
> KAFKA_JMX_PORT: 9102
> KAFKA_JMX_HOSTNAME: localhost
> KAFKA_PROCESS_ROLES: 'controller'
> KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
> KAFKA_LISTENERS: 'CONTROLLER://controller:9093'
> KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
> KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
> KAFKA_LOG_DIRS: '/tmp/kraft-controller-logs'
> # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh
> random-uuid"
> # See
> https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
> CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
> networks:
> iceberg_net:
>
> schema-registry:
> image: confluentinc/cp-schema-registry:7.4.0
> hostname: schema-registry
> container_name: schema-registry
> depends_on:
> - broker
> - controller
> ports:
> - "8081:8081"
> environment:
> SCHEMA_REGISTRY_HOST_NAME: schema-registry
> SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
> SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
> networks:
> iceberg_net:
>
> control-center:
> image: confluentinc/cp-enterprise-control-center:7.4.0
> hostname: control-center
> container_name: control-center
> depends_on:
> - broker
> - controller
> - schema-registry
> ports:
> - "9021:9021"
> environment:
> CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
> CONTROL_CENTER_SCHEMA_REGISTRY

Re: Kafka source with idleness and alignment stops consuming

2023-06-29 Thread Martijn Visser
Hi Alexis,

There are a couple of recent Flink tickets on watermark alignment,
specifically https://issues.apache.org/jira/browse/FLINK-32414 and
https://issues.apache.org/jira/browse/FLINK-32420 - Could the later be also
applicable in your case?

Best regards,

Martijn

On Wed, Jun 28, 2023 at 11:33 AM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hello,
>
> just for completeness, I don't see the problem if I assign a different
> alignment group to each source, i.e. using only split-level watermark
> alignment.
>
> Regards,
> Alexis.
>
> Am Mi., 28. Juni 2023 um 08:13 Uhr schrieb haishui :
>
>> Hi,
>> I have the same trouble. This is really a bug.
>> `shouldWaitForAlignment` needs to be another change.
>>
>> By the way, a source will be marked as idle, when the source has waiting
>> for alignment for a long time. Is this a bug?
>>
>>
>>
>>
>>
>>
>> 在 2023-06-27 23:25:38,"Alexis Sarda-Espinosa" 
>> 写道:
>>
>> Hello,
>>
>> I am currently evaluating idleness and alignment with Flink 1.17.1 and
>> the externalized Kafka connector. My job has 3 sources whose watermark
>> strategies are defined like this:
>>
>> WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
>> .withIdleness(idleTimeout)
>> .withWatermarkAlignment("group", maxAllowedWatermarkDrift,
>> Duration.ofSeconds(1L))
>>
>> The max allowed drift is currently 5 seconds, and my sources have an
>> idleTimeout of 1, 1.5, and 5 seconds.
>>
>> What I observe is that, when I restart the job, all sources publish
>> messages, but then 2 of them are marked as idle and never resume. I found
>> https://issues.apache.org/jira/browse/FLINK-31632, which should be fixed
>> in 1.17.1, but I don't think it's the same issue, my logs don't show
>> negative values:
>>
>> 2023-06-27 15:11:42,927 DEBUG
>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>> reported watermark=Watermark @ 1687878696690 (2023-06-27 15:11:36.690) from
>> subTaskId=1
>> 2023-06-27 15:11:43,009 DEBUG
>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>> 07:12:55.807) from subTaskId=0
>> 2023-06-27 15:11:43,091 DEBUG
>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>> 07:12:55.807) from subTaskId=0
>> 2023-06-27 15:11:43,116 DEBUG
>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>> 07:12:55.807) from subTaskId=0
>> 2023-06-27 15:11:43,298 INFO
>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0, 1]
>> 2023-06-27 15:11:43,304 INFO
>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
>> 2023-06-27 15:11:43,306 INFO
>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
>> 2023-06-27 15:11:43,486 INFO
>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>> 2023-06-27 15:11:43,489 INFO
>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>> 2023-06-27 15:11:43,492 INFO
>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>
>> Does anyone know if I'm missing something or this is really a bug?
>>
>> Regards,
>> Alexis.
>>
>>


Re: Very long launch of the Flink application in BATCH mode

2023-06-29 Thread Martijn Visser
Hi Vladislav,

I think it might be worthwhile to upgrade to Flink 1.17, given the
improvements that have been made in Flink 1.16 and 1.17 on batch
processing. See for example the release notes of 1.17, with an entire
section on batch processing
https://flink.apache.org/2023/03/23/announcing-the-release-of-apache-flink-1.17/#batch-processing

Best regards,

Martijn

On Wed, Jun 28, 2023 at 7:27 PM Vladislav Keda 
wrote:

> Hi Shammon,
>
> When I set log.level=DEBUG I have no more logs except  *2023-06-21
> 14:51:30,921 DEBUG
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Trigger heartbeat request.*
>
> Job freezes on stream graph generation. In STREAMING mode the job starts
> fast without same problems.
>
> ср, 28 июн. 2023 г. в 06:44, Shammon FY :
>
>> Hi Brendan,
>>
>> I think you may need to confirm which stage the job is blocked, the
>> client is submitting job or resourcemanage is scheduling job or tasks are
>> launching in TM? May be you need provide more information to help us to
>> figure the issue
>>
>> Best,
>> Shammon FY
>>
>> On Tuesday, June 27, 2023, Weihua Hu  wrote:
>>
>>> Hi, Brendan
>>>
>>> It looks like it's invoking your main method referring to the log. You
>>> can add more logs in the main method to figure out which part takes too
>>> long.
>>>
>>> Best,
>>> Weihua
>>>
>>>
>>> On Tue, Jun 27, 2023 at 5:06 AM Brendan Cortez <
>>> brendan.cortez...@gmail.com> wrote:
>>>
 No, I'm using a collection source + 20 same JDBC lookups + Kafka sink.

 On Mon, 26 Jun 2023 at 19:17, Yaroslav Tkachenko 
 wrote:

> Hey Brendan,
>
> Do you use a file source by any chance?
>
> On Mon, Jun 26, 2023 at 4:31 AM Brendan Cortez <
> brendan.cortez...@gmail.com> wrote:
>
>> Hi all!
>>
>> I'm trying to submit a Flink Job in Application Mode in the
>> Kubernetes cluster.
>>
>> I see some problems when an application has a big number of operators
>> (more than 20 same operators) - it freezes for ~6 minutes after
>> *2023-06-21 15:46:45,082 WARN
>>  org.apache.flink.connector.kafka.sink.KafkaSinkBuilder   [] - 
>> Property
>> [transaction.timeout.ms ] not specified.
>> Setting it to PT1H*
>>  and until
>>
>> *2023-06-21 15:53:20,002 INFO
>>  org.apache.flink.streaming.api.graph.StreamGraphGenerator[] - 
>> Disabled
>> Checkpointing. Checkpointing is not supported and not needed when 
>> executing
>> jobs in BATCH mode.*(logs in attachment)
>>
>> When I set log.level=DEBUG, I see only this message each 10 seconds:
>> *2023-06-21 14:51:30,921 DEBUG
>> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] 
>> -
>> Trigger heartbeat request.*
>>
>> Please, could you help me understand the cause of this problem and
>> how to fix it. I use the Flink 1.15.3 version.
>>
>> Thank you in advance!
>>
>> Best regards,
>> Brendan Cortez.
>>
>


Re: [Slack] Request to upload new invitation link

2023-06-29 Thread Martijn Visser
Thanks for reaching out Stephen. I've also updated the Slack invite link at
https://flink.apache.org/community/#slack

Best regards, Martijn

On Thu, Jun 29, 2023 at 3:20 AM yuxia  wrote:

> Hi, Stephen.
> Welcome to join Flink Slack channel. Here's my invitation link:
>
> https://join.slack.com/t/apache-flink/shared_invite/zt-1y7kmx7te-zUg1yfLdGu3Th9En_p4n~g
>
> Best regards,
> Yuxia
>
> --
> *发件人: *"Stephen Chu" 
> *收件人: *"User" 
> *抄送: *"Satyam Shanker" , "Vaibhav Gosain" <
> vaibhav.gos...@glean.com>, "Steve Jiang" 
> *发送时间: *星期四, 2023年 6 月 29日 上午 12:49:21
> *主题: *[Slack] Request to upload new invitation link
>
> Hi there,
> I'd love to join the Flink Slack channel, but it seems the link is
> outdated:
> https://join.slack.com/t/apache-flink/shared_invite/zt-1thin01ch-tYuj6Zwu8qf0QsivHY0anw
>
> Would someone be able to update or send me a new invite link?
>
> Thanks,
> Stephen
>
>