One of the MultiInput operators works for 12 minutes. The screenshot shows
all stages of Flink job.
[image: 3.png]
Q77 Query of TPC-DS Benchmark. All *_sales and *_returns tables(6 tables)
are read from Kafka, and the remaining 3 tables ( date_dim, web_page,
store) from Hive.
with ss as
(select s_store_sk,
sum(ss_ext_sales_price) as sales,
sum(ss_net_profit) as profit
from store_sales,
date_dim,
store
where ss_sold_date_sk = d_date_sk
and d_date between cast('1998-08-04' as date)
and (cast('1998-08-04' as date) + 30 days)
and ss_store_sk = s_store_sk
group by s_store_sk)
,
sr as
(select s_store_sk,
sum(sr_return_amt) as returns,
sum(sr_net_loss) as profit_loss
from store_returns,
date_dim,
store
where sr_returned_date_sk = d_date_sk
and d_date between cast('1998-08-04' as date)
and (cast('1998-08-04' as date) + 30 days)
and sr_store_sk = s_store_sk
group by s_store_sk),
cs as
(select cs_call_center_sk,
sum(cs_ext_sales_price) as sales,
sum(cs_net_profit) as profit
from catalog_sales,
date_dim
where cs_sold_date_sk = d_date_sk
and d_date between cast('1998-08-04' as date)
and (cast('1998-08-04' as date) + 30 days)
group by cs_call_center_sk
),
cr as
(select cr_call_center_sk,
sum(cr_return_amount) as returns,
sum(cr_net_loss) as profit_loss
from catalog_returns,
date_dim
where cr_returned_date_sk = d_date_sk
and d_date between cast('1998-08-04' as date)
and (cast('1998-08-04' as date) + 30 days)
group by cr_call_center_sk
),
ws as
( select wp_web_page_sk,
sum(ws_ext_sales_price) as sales,
sum(ws_net_profit) as profit
from web_sales,
date_dim,
web_page
where ws_sold_date_sk = d_date_sk
and d_date between cast('1998-08-04' as date)
and (cast('1998-08-04' as date) + 30 days)
and ws_web_page_sk = wp_web_page_sk
group by wp_web_page_sk),
wr as
(select wp_web_page_sk,
sum(wr_return_amt) as returns,
sum(wr_net_loss) as profit_loss
from web_returns,
date_dim,
web_page
where wr_returned_date_sk = d_date_sk
and d_date between cast('1998-08-04' as date)
and (cast('1998-08-04' as date) + 30 days)
and wr_web_page_sk = wp_web_page_sk
group by wp_web_page_sk)
select channel
, id
, sum(sales) as sales
, sum(returns) as returns
, sum(profit) as profit
from
(select 'store channel' as channel
, ss.s_store_sk as id
, sales
, coalesce(returns, 0) as returns
, (profit - coalesce(profit_loss,0)) as profit
from ss left join sr
on ss.s_store_sk = sr.s_store_sk
union all
select 'catalog channel' as channel
, cs_call_center_sk as id
, sales
, returns
, (profit - profit_loss) as profit
from cs
, cr
union all
select 'web channel' as channel
, ws.wp_web_page_sk as id
, sales
, coalesce(returns, 0) returns
, (profit - coalesce(profit_loss,0)) as profit
from ws left join wr
on ws.wp_web_page_sk = wr.wp_web_page_sk
) x
group by rollup (channel, id)
order by channel
,id
LIMIT 100;
Kind regards,
Vladimir.
чт, 25 янв. 2024 г. в 14:43, Ron liu <[email protected]>:
> Hi,
>
> Can you help to explain the q77 execution plan? And find which operator
> takes a long time in flink UI?
>
> Best
> Ron
>
> Вова Фролов <[email protected]> 于2024年1月24日周三 09:09写道:
>
>> Hello,
>>
>> I am executing a heterogeneous SQL query (part of the data is in Hive
>> and part in Kafka. The query utilizes TPC-DS benchmark 100GB data.) in
>> BatchMode. However, the execution time is excessively long, taking
>> approximately 11 minutes to complete , although the request to Hive only
>> (without Kafka) is completed in 12 seconds.
>>
>> How can I speed up execution heterogeneous SQL query to Kafka + Hive?
>>
>> * Versions of Components in Use:*
>>
>> · Apache Flink: 1.17.1
>>
>> · Kafka: 3.2.3
>>
>> · Hive: 3.1.2.3.4.5.1-1
>>
>> * Flink Job Code:*
>>
>> EnviromentSettings settings =
>> EnviromentSettings.newInstance().inBatchMode().build();
>>
>> TableEnviroment tEnv = TableEnviroment.create(settings);
>>
>>
>>
>> *Hive Catalog*
>>
>> HiveCatalog catalog = new HiveCatalog(“hive”, DEFAULT_DATABASE,
>> PATH_TO_CONF, HiveVersionInfo.getVersion());
>>
>> tEnv.registerCatalog(“hive”, catalog);
>>
>> tEnv.useCatalog(“hive”);
>>
>>
>>
>>
>>
>> Creating tables with Kafka connector:
>>
>>
>>
>> public static final String *CREATE_STORE_SALES *= "CREATE TEMPORARY
>> TABLE store_sales_kafka(\n" +
>> " ss_sold_date_sk INTEGER,\n" +
>>
>> // here are 21 columns
>>
>> " ss_net_profit DOUBLE\n" +
>> ") WITH (\n" +
>> " 'connector' = 'kafka',\n" +
>> " 'key.format' = 'avro',\n" +
>> " 'key.fields' = 'ss_item_sk;ss_ticket_number',\n" +
>> " 'properties.group.id' = 'store_sales_group',\n" +
>> " 'scan.startup.mode' = 'earliest-offset',\n" +
>>
>> " 'scan.bounded.mode' = 'latest-offset',\n" +
>> " 'properties.bootstrap.servers' = 'xyz1:9092, xyz2:9092, xyz3:9092,
>> xyz4:9092, xyz5:9092',\n" +
>> " 'topic' = 'storeSales100',\n" +
>>
>> " 'value.format' = 'avro',\n" +
>>
>> " 'value.fields-include' = 'EXCEPT_KEY'\n" +
>>
>>
>> " );";
>>
>>
>>
>> Q77 with Flink
>>
>> tEnv.executeSql(Tpcds100.*CREATE_STORE_SALES*);
>> Table result = tEnv.sqlQuery(Tpcds100.*Q77_WITH_KAFKA*);
>> List<Row> res = CollectionUtil.*iteratorToList*(result.execute().collect());
>>
>> for (Row row : res) {
>> System.*out*.println(row);
>> }
>>
>>
>>
>> Kafka Settings: (kafka cluster consists of 6 topics(6 tables) and each
>> has: 512 partitions, replication factor 3)
>>
>> · num.network.threads=12
>>
>> · num.io.threads=10
>>
>> · socket.send.buffer.bytes=2097152
>>
>> · socket.request.max.bytes=1073741824
>>
>> Cluster consists of 5 machines and each has:
>>
>> · 2 CPU x86-64 20 cores, 40 threads, 2200 MHz base frequency,
>> 3600 MHz max turbo frequency. 40 cores, 80 threads total on each machine.
>>
>> · RAM 768GB, up to 640GB is available for Flink.
>>
>> · 2 network cards 10 Gigabit each
>>
>> · 10 HDD 5.5 TB
>>
>> Kind regards,
>>
>> Vladimir
>>
>