Hello,
I am executing a heterogeneous SQL query (part of the data is in Hive and
part in Kafka. The query utilizes TPC-DS benchmark 100GB data.) in
BatchMode. However, the execution time is excessively long, taking
approximately 11 minutes to complete , although the request to Hive only
(without Kafka) is completed in 12 seconds.
How can I speed up execution heterogeneous SQL query to Kafka + Hive?
* Versions of Components in Use:*
· Apache Flink: 1.17.1
· Kafka: 3.2.3
· Hive: 3.1.2.3.4.5.1-1
* Flink Job Code:*
EnviromentSettings settings =
EnviromentSettings.newInstance().inBatchMode().build();
TableEnviroment tEnv = TableEnviroment.create(settings);
*Hive Catalog*
HiveCatalog catalog = new HiveCatalog(“hive”, DEFAULT_DATABASE,
PATH_TO_CONF, HiveVersionInfo.getVersion());
tEnv.registerCatalog(“hive”, catalog);
tEnv.useCatalog(“hive”);
Creating tables with Kafka connector:
public static final String *CREATE_STORE_SALES *= "CREATE TEMPORARY TABLE
store_sales_kafka(\n" +
" ss_sold_date_sk INTEGER,\n" +
// here are 21 columns
" ss_net_profit DOUBLE\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'key.format' = 'avro',\n" +
" 'key.fields' = 'ss_item_sk;ss_ticket_number',\n" +
" 'properties.group.id' = 'store_sales_group',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'scan.bounded.mode' = 'latest-offset',\n" +
" 'properties.bootstrap.servers' = 'xyz1:9092, xyz2:9092, xyz3:9092,
xyz4:9092, xyz5:9092',\n" +
" 'topic' = 'storeSales100',\n" +
" 'value.format' = 'avro',\n" +
" 'value.fields-include' = 'EXCEPT_KEY'\n" +
" );";
Q77 with Flink
tEnv.executeSql(Tpcds100.*CREATE_STORE_SALES*);
Table result = tEnv.sqlQuery(Tpcds100.*Q77_WITH_KAFKA*);
List<Row> res = CollectionUtil.*iteratorToList*(result.execute().collect());
for (Row row : res) {
System.*out*.println(row);
}
Kafka Settings: (kafka cluster consists of 6 topics(6 tables) and each has:
512 partitions, replication factor 3)
· num.network.threads=12
· num.io.threads=10
· socket.send.buffer.bytes=2097152
· socket.request.max.bytes=1073741824
Cluster consists of 5 machines and each has:
· 2 CPU x86-64 20 cores, 40 threads, 2200 MHz base frequency, 3600
MHz max turbo frequency. 40 cores, 80 threads total on each machine.
· RAM 768GB, up to 640GB is available for Flink.
· 2 network cards 10 Gigabit each
· 10 HDD 5.5 TB
Kind regards,
Vladimir