One of the MultiInput operators works for 12 minutes. The screenshot shows
all stages of Flink job.

[image: 3.png]
Q77 Query of TPC-DS Benchmark. All *_sales and *_returns tables(6 tables)
are read from Kafka, and the remaining 3 tables ( date_dim, web_page,
store) from Hive.


with ss as
 (select s_store_sk,
         sum(ss_ext_sales_price) as sales,
         sum(ss_net_profit) as profit
 from store_sales,
      date_dim,
      store
 where ss_sold_date_sk = d_date_sk
       and d_date between cast('1998-08-04' as date)
                  and (cast('1998-08-04' as date) +  30 days)
       and ss_store_sk = s_store_sk
 group by s_store_sk)
 ,
 sr as
 (select s_store_sk,
         sum(sr_return_amt) as returns,
         sum(sr_net_loss) as profit_loss
 from store_returns,
      date_dim,
      store
 where sr_returned_date_sk = d_date_sk
       and d_date between cast('1998-08-04' as date)
                  and (cast('1998-08-04' as date) +  30 days)
       and sr_store_sk = s_store_sk
 group by s_store_sk),
 cs as
 (select cs_call_center_sk,
        sum(cs_ext_sales_price) as sales,
        sum(cs_net_profit) as profit
 from catalog_sales,
      date_dim
 where cs_sold_date_sk = d_date_sk
       and d_date between cast('1998-08-04' as date)
                  and (cast('1998-08-04' as date) +  30 days)
 group by cs_call_center_sk
 ),
 cr as
 (select cr_call_center_sk,
         sum(cr_return_amount) as returns,
         sum(cr_net_loss) as profit_loss
 from catalog_returns,
      date_dim
 where cr_returned_date_sk = d_date_sk
       and d_date between cast('1998-08-04' as date)
                  and (cast('1998-08-04' as date) +  30 days)
 group by cr_call_center_sk
 ),
 ws as
 ( select wp_web_page_sk,
        sum(ws_ext_sales_price) as sales,
        sum(ws_net_profit) as profit
 from web_sales,
      date_dim,
      web_page
 where ws_sold_date_sk = d_date_sk
       and d_date between cast('1998-08-04' as date)
                  and (cast('1998-08-04' as date) +  30 days)
       and ws_web_page_sk = wp_web_page_sk
 group by wp_web_page_sk),
 wr as
 (select wp_web_page_sk,
        sum(wr_return_amt) as returns,
        sum(wr_net_loss) as profit_loss
 from web_returns,
      date_dim,
      web_page
 where wr_returned_date_sk = d_date_sk
       and d_date between cast('1998-08-04' as date)
                  and (cast('1998-08-04' as date) +  30 days)
       and wr_web_page_sk = wp_web_page_sk
 group by wp_web_page_sk)
  select  channel
        , id
        , sum(sales) as sales
        , sum(returns) as returns
        , sum(profit) as profit
 from
 (select 'store channel' as channel
        , ss.s_store_sk as id
        , sales
        , coalesce(returns, 0) as returns
        , (profit - coalesce(profit_loss,0)) as profit
 from   ss left join sr
        on  ss.s_store_sk = sr.s_store_sk
 union all
 select 'catalog channel' as channel
        , cs_call_center_sk as id
        , sales
        , returns
        , (profit - profit_loss) as profit
 from  cs
       , cr
 union all
 select 'web channel' as channel
        , ws.wp_web_page_sk as id
        , sales
        , coalesce(returns, 0) returns
        , (profit - coalesce(profit_loss,0)) as profit
 from   ws left join wr
        on  ws.wp_web_page_sk = wr.wp_web_page_sk
 ) x
 group by rollup (channel, id)
 order by channel
         ,id
 LIMIT 100;


Kind regards,
Vladimir.


чт, 25 янв. 2024 г. в 14:43, Ron liu <ron9....@gmail.com>:

> Hi,
>
> Can you help to explain the q77 execution plan? And find which operator
> takes a long time in flink UI?
>
> Best
> Ron
>
> Вова Фролов <vovafrolov1...@gmail.com> 于2024年1月24日周三 09:09写道:
>
>> Hello,
>>
>> I am executing a heterogeneous SQL query  (part of the data is in Hive
>> and part in Kafka. The query utilizes TPC-DS benchmark 100GB data.) in
>> BatchMode. However, the execution time is excessively long, taking
>> approximately 11 minutes to complete , although the request to Hive only
>> (without Kafka) is completed in 12 seconds.
>>
>> How can I speed up execution heterogeneous SQL query to Kafka + Hive?
>>
>> *   Versions of Components in Use:*
>>
>> ·        Apache Flink: 1.17.1
>>
>> ·        Kafka: 3.2.3
>>
>> ·        Hive: 3.1.2.3.4.5.1-1
>>
>> *    Flink Job Code:*
>>
>> EnviromentSettings settings = 
>> EnviromentSettings.newInstance().inBatchMode().build();
>>
>> TableEnviroment tEnv = TableEnviroment.create(settings);
>>
>>
>>
>> *Hive Catalog*
>>
>> HiveCatalog catalog = new HiveCatalog(“hive”, DEFAULT_DATABASE, 
>> PATH_TO_CONF, HiveVersionInfo.getVersion());
>>
>> tEnv.registerCatalog(“hive”, catalog);
>>
>> tEnv.useCatalog(“hive”);
>>
>>
>>
>>
>>
>> Creating tables with Kafka connector:
>>
>>
>>
>> public static final String *CREATE_STORE_SALES *= "CREATE TEMPORARY
>> TABLE store_sales_kafka(\n" +
>> "  ss_sold_date_sk         INTEGER,\n" +
>>
>> // here are 21 columns
>>
>> "  ss_net_profit         DOUBLE\n" +
>> ") WITH (\n" +
>> "   'connector' = 'kafka',\n" +
>> "   'key.format' = 'avro',\n" +
>> "   'key.fields' = 'ss_item_sk;ss_ticket_number',\n" +
>> "   'properties.group.id' = 'store_sales_group',\n" +
>> "   'scan.startup.mode' = 'earliest-offset',\n" +
>>
>> "   'scan.bounded.mode' = 'latest-offset',\n" +
>> "   'properties.bootstrap.servers' = 'xyz1:9092, xyz2:9092, xyz3:9092, 
>> xyz4:9092, xyz5:9092',\n" +
>> "   'topic' = 'storeSales100',\n" +
>>
>> "    'value.format' = 'avro',\n" +
>>
>> "    'value.fields-include' = 'EXCEPT_KEY'\n" +
>>
>>
>> "   );";
>>
>>
>>
>> Q77 with Flink
>>
>> tEnv.executeSql(Tpcds100.*CREATE_STORE_SALES*);
>> Table result = tEnv.sqlQuery(Tpcds100.*Q77_WITH_KAFKA*);
>> List<Row> res = CollectionUtil.*iteratorToList*(result.execute().collect());
>>
>> for (Row row : res) {
>>     System.*out*.println(row);
>> }
>>
>>
>>
>> Kafka Settings: (kafka cluster consists of 6 topics(6 tables) and each
>> has: 512 partitions, replication factor 3)
>>
>> ·        num.network.threads=12
>>
>> ·        num.io.threads=10
>>
>> ·        socket.send.buffer.bytes=2097152
>>
>> ·        socket.request.max.bytes=1073741824
>>
>>     Cluster consists of 5 machines and each has:
>>
>> ·        2 CPU x86-64 20 cores, 40 threads, 2200 MHz base frequency,
>> 3600 MHz max turbo frequency. 40 cores, 80 threads total on each machine.
>>
>> ·        RAM 768GB, up to 640GB is available for Flink.
>>
>> ·        2 network cards 10 Gigabit each
>>
>> ·        10 HDD 5.5 TB
>>
>> Kind regards,
>>
>> Vladimir
>>
>

Reply via email to