我用的是以下代码:
String s = streamTableEnvironment.explainSql("select header.customer_id" +
",item.goods_id" +
",header.id" +
",header.order_status" +
",header.shop_id" +
",header.parent_order_id" +
",header.order_at" +
",header.pay_at" +
",header.channel_id" +
",header.root_order_id" +
",item.id" +
",item.row_num" +
",item.p_sp_sub_amt" +
",item.display_qty" +
",item.qty" +
",item.bom_type" +
" from header JOIN item on header.id = item.order_id");

System.out.println("explain:" + s);




plan信息为:
explain:== Abstract Syntax Tree ==
LogicalProject(customer_id=[$2], goods_id=[$15], id=[$0], order_status=[$1], 
shop_id=[$3], parent_order_id=[$4], order_at=[$5], pay_at=[$6], 
channel_id=[$7], root_order_id=[$8], id0=[$12], row_num=[$14], 
p_sp_sub_amt=[$19], display_qty=[$22], qty=[$17], bom_type=[$20])
+- LogicalJoin(condition=[=($0, $13)], joinType=[inner])
   :- LogicalTableScan(table=[[default_catalog, default_database, 
Unregistered_DataStream_Source_5]])
   +- LogicalTableScan(table=[[default_catalog, default_database, 
Unregistered_DataStream_Source_8]])


== Optimized Physical Plan ==
Calc(select=[customer_id, goods_id, id, order_status, shop_id, parent_order_id, 
order_at, pay_at, channel_id, root_order_id, id0, row_num, p_sp_sub_amt, 
display_qty, qty, bom_type])
+- Join(joinType=[InnerJoin], where=[=(id, order_id)], select=[id, 
order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, 
channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, p_sp_sub_amt, 
bom_type, display_qty], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[id, order_status, customer_id, shop_id, parent_order_id, 
order_at, pay_at, channel_id, root_order_id])
   :     +- TableSourceScan(table=[[default_catalog, default_database, 
Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, 
shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, 
last_updated_at, business_flag, mysql_op_type])
   +- Exchange(distribution=[hash[order_id]])
      +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, 
bom_type, display_qty])
         +- TableSourceScan(table=[[default_catalog, default_database, 
Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, 
s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, 
display_qty, is_first_flag])


== Optimized Execution Plan ==
Calc(select=[customer_id, goods_id, id, order_status, shop_id, parent_order_id, 
order_at, pay_at, channel_id, root_order_id, id0, row_num, p_sp_sub_amt, 
display_qty, qty, bom_type])
+- Join(joinType=[InnerJoin], where=[(id = order_id)], select=[id, 
order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, 
channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, p_sp_sub_amt, 
bom_type, display_qty], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[id, order_status, customer_id, shop_id, parent_order_id, 
order_at, pay_at, channel_id, root_order_id])
   :     +- TableSourceScan(table=[[default_catalog, default_database, 
Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, 
shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, 
last_updated_at, business_flag, mysql_op_type])
   +- Exchange(distribution=[hash[order_id]])
      +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, 
bom_type, display_qty])
         +- TableSourceScan(table=[[default_catalog, default_database, 
Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, 
s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, 
display_qty, is_first_flag])













在 2022-06-10 11:02:56,"Shengkai Fang" <fskm...@gmail.com> 写道:
>你好,能提供下具体的 plan 供大家查看下吗?
>
>你可以直接 使用 tEnv.executeSql("Explain JSON_EXECUTION_PLAN
><YOUR_QUERY>").print() 打印下相关的信息。
>
>Best,
>Shengkai
>
>lxk <lxk7...@163.com> 于2022年6月10日周五 10:29写道:
>
>> flink 版本:1.14.4
>> 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval
>> join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。
>> 水印是直接使用kafka 自带的时间戳生成watermark
>>
>>
>> 以下是代码 ---interval join
>>
>> SingleOutputStreamOperator<HeaderFull> headerFullStream =
>> headerFilterStream.keyBy(data -> data.getId())
>> .intervalJoin(filterItemStream.keyBy(data -> data.getOrder_id()))
>> .between(Time.seconds(-10), Time.seconds(20))
>> .process(new ProcessJoinFunction<OrderHeader, OrderItem, HeaderFull>() {
>> @Override
>> public void processElement(OrderHeader left, OrderItem right, Context
>> context, Collector<HeaderFull> collector) throws Exception {
>> HeaderFull headerFull = new HeaderFull();
>> BeanUtilsBean beanUtilsBean = new BeanUtilsBean();
>> beanUtilsBean.copyProperties(headerFull, left);
>> beanUtilsBean.copyProperties(headerFull, right);
>> String event_date = left.getOrder_at().substring(0, 10);
>> headerFull.setEvent_date(event_date);
>> headerFull.setItem_id(right.getId());
>> collector.collect(headerFull);
>> }
>>         }
>> 使用sql 进行join
>> Configuration conf = new Configuration();
>> conf.setString("table.exec.mini-batch.enabled","true");
>> conf.setString("table.exec.mini-batch.allow-latency","15 s");
>> conf.setString("table.exec.mini-batch.size","100");
>> conf.setString("table.exec.state.ttl","20 s");
>> env.configure(conf);
>> Table headerTable =
>> streamTableEnvironment.fromDataStream(headerFilterStream);
>> Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream);
>>
>>
>> streamTableEnvironment.createTemporaryView("header",headerTable);
>> streamTableEnvironment.createTemporaryView("item",itemTable);
>>
>> Table result = streamTableEnvironment.sqlQuery("select header.customer_id"
>> +
>> ",item.goods_id" +
>> ",header.id" +
>> ",header.order_status" +
>> ",header.shop_id" +
>> ",header.parent_order_id" +
>> ",header.order_at" +
>> ",header.pay_at" +
>> ",header.channel_id" +
>> ",header.root_order_id" +
>> ",item.id" +
>> ",item.row_num" +
>> ",item.p_sp_sub_amt" +
>> ",item.display_qty" +
>> ",item.qty" +
>> ",item.bom_type" +
>> " from header JOIN item on header.id = item.order_id");
>>
>>
>> DataStream<Row> rowDataStream =
>> streamTableEnvironment.toChangelogStream(result);
>> 不太理解为什么使用interval join会丢这么多数据,按照我的理解使用sql join,底层应该也是用的类似interval
>> join,为啥两者最终关联上的结果差异这么大。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>

Reply via email to