RE: Issue with Flink Job when Reading Data from Kafka and Executing SQL Query (q77 TPC-DS)

2024-01-03 Thread Schwalbe Matthias
Hi Vladimir,

I might be mistaken, here my observations:


  *   List res = 
CollectionUtil.iteratorToList(result.execute().collect()); will block until the 
job is finished
  *   However, we have a unbounded streaming job which will not finish until 
you cancel it
  *   If you just want to print results, the print sink will do
  *   You might want to directly iterate on the iterator returned by 
result.execute().collect()
  *   And make sure to close/dispose of the iterator once done

Sincere greetings
Thias

From: Alexey Novakov via user 
Sent: Tuesday, January 2, 2024 12:05 PM
To: Вова Фролов 
Cc: user@flink.apache.org
Subject: Re: Issue with Flink Job when Reading Data from Kafka and Executing 
SQL Query (q77 TPC-DS)

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Vladimir,

As I see, your SQL query is reading data from the Kafka topic and pulls all 
data to the client side. The "*.collect" method is quite network/memory 
intensive. You probably do want that.

If you want to debug/print the ingested data via SQL, I would recommend the 
"print" connector. 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/print/
It means you could INSERT from SELECT to the print table.

Also, could it be that Flink becomes silent because it has read all data from 
the Kafka topic and just waits for any new record to be inserted to the topic?
Although I would not expect those Node - 1 disconnected messages in such a 
scenario.

Alexey

On Tue, Dec 19, 2023 at 10:40 AM Вова Фролов 
mailto:vovafrolov1...@gmail.com>> wrote:
Hello Flink Community,
I am texting to you with an issue I have encountered while using Apache Flink 
version 1.17.1. In my Flink Job, I am using Kafka version 3.6.0 to ingest data 
from TPC-DS(current tpcds100 target size tpcds1), and then I am executing 
SQL queries, specifically, the q77 query, on the data in Kafka.
Versions of Components in Use:

•Apache Flink: 1.17.1

•Kafka: 3.6.0
Kafka Settings: (kafka cluster consists of 9 topics and each has: 512 
partitions, replication factor 3)

•num.network.threads=12

•num.io.threads=10

•socket.send.buffer.bytes=2097152

•socket.request.max.bytes=1073741824
Flink Job Code:
Creating tables with Kafka connector:
public static final String CREATE_STORE_SALES = "CREATE TEMPORARY TABLE 
store_sales_kafka(\n" +
"  ss_sold_date_sk INTEGER,\n" +

// here are 21 columns

"  ss_net_profit DECIMAL(7, 2)\n" +


") WITH (\n" +

"   'connector' = 'kafka',\n" +

"   'key.format' = 'avro',\n" +

"   'key.fields' = 'ss_item_sk;ss_ticket_number',\n" +

"   'properties.group.id<http://properties.group.id>' = 'store_sales_group',\n" 
+

"   'scan.startup.mode' = 'earliest-offset',\n" +

"   'properties.bootstrap.servers' = 'xyz1:9092, xyz2:9092, xyz3:9092, 
xyz4:9092, xyz5:9092',\n" +

"   'topic' = 'storeSales100',\n" +

"'value.format' = 'avro',\n" +

"'value.fields-include' = 'EXCEPT_KEY'\n" +

"   );";

Q77 with Flink

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


tEnv.executeSql(Tpcds100.CREATE_STORE_SALES);

Table result = tEnv.sqlQuery(Tpcds100.Q77_WITH_KAFKA);

List res = CollectionUtil.iteratorToList(result.execute().collect());

for (Row row : res) {

System.out.println(row);

}


Flink Job Configuration:
I tried several configurations, but here are the main ones:

1. slots per TaskManager 10, parallelism 100;

2. slots per TaskManager 5, parallelism 50;

3. slots per TaskManager 15, parallelism 375;
The result is always about the same
Logs and Errors:
The logs from my Flink Job do not contain any errors.
Description of the Issue:
The Flink Job runs smoothly for approximately 5 minutes, during which 
data processing and transformations occur as expected. However, after this 
initial period, the Flink Job seems to enter a state where no further changes 
or updates are observed in the processed data. In the logs I see a message:
 “
2023-12-18 INFO  org.apache.kafka.clients.NetworkClient   
[] - [AdminClient clientId=store_group-enumerator-admin-client] Node -1 
disconnected
“
, that is written every 5 minutes
It's worth noting that, despite the lack of errors in the logs, the 
Flink Job essentially becomes unresponsive or ceases to make progress, 
resulting in a stagnation of data processing.
This behavior is consistent across different configurations tested, 
including variations in the number of slots per TaskManager and parallelism.
While the logs do not indicate any errors, they do not provide insights 
into the reason behind the observed data processing stagnation.


Re: Issue with Flink Job when Reading Data from Kafka and Executing SQL Query (q77 TPC-DS)

2024-01-02 Thread Alexey Novakov via user
Hi Vladimir,

As I see, your SQL query is reading data from the Kafka topic and pulls all
data to the client side. The "*.collect" method is quite network/memory
intensive. You probably do want that.

If you want to debug/print the ingested data via SQL, I would recommend the
"print" connector.
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/print/

It means you could INSERT from SELECT to the print table.

Also, could it be that Flink becomes silent because it has read all data
from the Kafka topic and just waits for any new record to be inserted to
the topic?
Although I would not expect those *Node - 1 disconnected* messages in such
a scenario.

Alexey

On Tue, Dec 19, 2023 at 10:40 AM Вова Фролов 
wrote:

> Hello Flink Community,
>
> I am texting to you with an issue I have encountered while using Apache
> Flink version 1.17.1. In my Flink Job, I am using Kafka version 3.6.0 to
> ingest data from TPC-DS(current tpcds100 target size tpcds1), and then
> I am executing SQL queries, specifically, the q77 query, on the data in
> Kafka.
>
> *Versions of Components in Use:*
>
> ·Apache Flink: 1.17.1
>
> ·Kafka: 3.6.0
>
> Kafka Settings: (kafka cluster consists of 9 topics and each has: 512
> partitions, replication factor 3)
>
> ·num.network.threads=12
>
> ·num.io.threads=10
>
> ·socket.send.buffer.bytes=2097152
>
> ·socket.request.max.bytes=1073741824
>
> *Flink Job Code:*
>
> Creating tables with Kafka connector:
>
> public static final String *CREATE_STORE_SALES *= "CREATE TEMPORARY TABLE
> store_sales_kafka(\n" +
> "  ss_sold_date_sk INTEGER,\n" +
>
> // here are 21 columns
>
> "  ss_net_profit DECIMAL(7, 2)\n" +
> ") WITH (\n" +"   'connector' = 'kafka',\n" +"   'key.format' = 'avro',\n" +" 
>   'key.fields' = 'ss_item_sk;ss_ticket_number',\n" +"   'properties.group.id' 
> = 'store_sales_group',\n" +"   'scan.startup.mode' = 'earliest-offset',\n" +" 
>   'properties.bootstrap.servers' = 'xyz1:9092, xyz2:9092, xyz3:9092, 
> xyz4:9092, xyz5:9092',\n" +"   'topic' = 'storeSales100',\n" +
> "'value.format' = 'avro',\n" +
> "'value.fields-include' = 'EXCEPT_KEY'\n" +
>
> "   );";
>
>
>
> Q77 with Flink
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.*getExecutionEnvironment*();StreamTableEnvironment 
> tEnv = StreamTableEnvironment.*create*(env);
>
>
>
> tEnv.executeSql(Tpcds100.*CREATE_STORE_SALES*);Table result = 
> tEnv.sqlQuery(Tpcds100.*Q77_WITH_KAFKA*);List res = 
> CollectionUtil.*iteratorToList*(result.execute().collect());
> for (Row row : res) {
> System.*out*.println(row);}
>
>
>
>
>
> *Flink Job Configuration:*
>
> I tried several configurations, but here are the main ones:
>
> 1. slots per TaskManager 10, parallelism 100;
>
> 2. slots per TaskManager 5, parallelism 50;
>
> 3. slots per TaskManager 15, parallelism 375;
>
> The result is always about the same
>
> *Logs and Errors:*
>
> The logs from my Flink Job do not contain any errors.
>
> *Description of the Issue:*
>
> The Flink Job runs smoothly for approximately 5 minutes, during
> which data processing and transformations occur as expected. However, after
> this initial period, the Flink Job seems to enter a state where no further
> changes or updates are observed in the processed data. In the logs I see a
> message:
>
>  “
>
> 2023-12-18 INFO  org.apache.kafka.clients.NetworkClient
> [] - [AdminClient clientId=store_group-enumerator-admin-client] Node -
> 1 disconnected
>
> “
>
> , that is written every 5 minutes
>
> It's worth noting that, despite the lack of errors in the logs,
> the Flink Job essentially becomes unresponsive or ceases to make progress,
> resulting in a stagnation of data processing.
>
> This behavior is consistent across different configurations
> tested, including variations in the number of slots per TaskManager and
> parallelism.
>
> While the logs do not indicate any errors, they do not provide
> insights into the reason behind the observed data processing stagnation.
>
>
>
> Cluster consists of 5 machines and each has:
>
> ·2 CPU x86-64 20 cores, 40 threads, 2200 MHz base frequency, 3600
> MHz max turbo frequency. 40 cores, 80 threads total on each machine.
>
> ·RAM 768GB, up to 640GB is available for Flink.
>
> ·2 network cards 10 Gigabit each
>
> ·10 HDD 5.5 TB
>
>
>
> This issue significantly hinders the overall effectiveness of utilizing
> Apache Flink for my data processing needs. I am seeking guidance to
> understand and resolve the underlying cause of this behavior.
>
>
>
> I am looking forward to receiving yours advises. Please let me know if you
> need additional details.
>
>
>
> Kind regards,
>
> Vladimir
>


Issue with Flink Job when Reading Data from Kafka and Executing SQL Query (q77 TPC-DS)

2023-12-19 Thread Вова Фролов
Hello Flink Community,

I am texting to you with an issue I have encountered while using Apache
Flink version 1.17.1. In my Flink Job, I am using Kafka version 3.6.0 to
ingest data from TPC-DS(current tpcds100 target size tpcds1), and then
I am executing SQL queries, specifically, the q77 query, on the data in
Kafka.

*Versions of Components in Use:*

·Apache Flink: 1.17.1

·Kafka: 3.6.0

Kafka Settings: (kafka cluster consists of 9 topics and each has: 512
partitions, replication factor 3)

·num.network.threads=12

·num.io.threads=10

·socket.send.buffer.bytes=2097152

·socket.request.max.bytes=1073741824

*Flink Job Code:*

Creating tables with Kafka connector:

public static final String *CREATE_STORE_SALES *= "CREATE TEMPORARY TABLE
store_sales_kafka(\n" +
"  ss_sold_date_sk INTEGER,\n" +

// here are 21 columns

"  ss_net_profit DECIMAL(7, 2)\n" +
") WITH (\n" +"   'connector' = 'kafka',\n" +"   'key.format' =
'avro',\n" +"   'key.fields' = 'ss_item_sk;ss_ticket_number',\n" +"
'properties.group.id' = 'store_sales_group',\n" +"
'scan.startup.mode' = 'earliest-offset',\n" +"
'properties.bootstrap.servers' = 'xyz1:9092, xyz2:9092, xyz3:9092,
xyz4:9092, xyz5:9092',\n" +"   'topic' = 'storeSales100',\n" +
"'value.format' = 'avro',\n" +
"'value.fields-include' = 'EXCEPT_KEY'\n" +

"   );";



Q77 with Flink

StreamExecutionEnvironment env =
StreamExecutionEnvironment.*getExecutionEnvironment*();StreamTableEnvironment
tEnv = StreamTableEnvironment.*create*(env);



tEnv.executeSql(Tpcds100.*CREATE_STORE_SALES*);Table result =
tEnv.sqlQuery(Tpcds100.*Q77_WITH_KAFKA*);List res =
CollectionUtil.*iteratorToList*(result.execute().collect());
for (Row row : res) {
System.*out*.println(row);}





*Flink Job Configuration:*

I tried several configurations, but here are the main ones:

1. slots per TaskManager 10, parallelism 100;

2. slots per TaskManager 5, parallelism 50;

3. slots per TaskManager 15, parallelism 375;

The result is always about the same

*Logs and Errors:*

The logs from my Flink Job do not contain any errors.

*Description of the Issue:*

The Flink Job runs smoothly for approximately 5 minutes, during
which data processing and transformations occur as expected. However, after
this initial period, the Flink Job seems to enter a state where no further
changes or updates are observed in the processed data. In the logs I see a
message:

 “

2023-12-18 INFO  org.apache.kafka.clients.NetworkClient
  [] - [AdminClient clientId=store_group-enumerator-admin-client] Node -1
disconnected

“

, that is written every 5 minutes

It's worth noting that, despite the lack of errors in the logs, the
Flink Job essentially becomes unresponsive or ceases to make progress,
resulting in a stagnation of data processing.

This behavior is consistent across different configurations tested,
including variations in the number of slots per TaskManager and parallelism.

While the logs do not indicate any errors, they do not provide
insights into the reason behind the observed data processing stagnation.



Cluster consists of 5 machines and each has:

·2 CPU x86-64 20 cores, 40 threads, 2200 MHz base frequency, 3600
MHz max turbo frequency. 40 cores, 80 threads total on each machine.

·RAM 768GB, up to 640GB is available for Flink.

·2 network cards 10 Gigabit each

·10 HDD 5.5 TB



This issue significantly hinders the overall effectiveness of utilizing
Apache Flink for my data processing needs. I am seeking guidance to
understand and resolve the underlying cause of this behavior.



I am looking forward to receiving yours advises. Please let me know if you
need additional details.



Kind regards,

Vladimir