flinksql问题请教

2021-07-06 Thread silence

请教一下下面的sql为什么会被翻译成有限数据集?源表是个kafka source,用了row_number进行开窗和两次Table Function join
flink版本:1.12.2

Stage 1 : Data Source
content : Source: Values(tuples=[[]])
Stage 2 : Operator
content : Correlate(invocation=[LateralArray($cor3.gift_list)], 
correlate=[table(LateralArray($cor3.gift_list))], 
select=[order_id,stack_id,order_item_id,sku_id,sku_name,quantity,product_type,original_price,unit_original_price,promotion_reduce_price,coupon_reduce_price,total_pay,unit_total_pay,is_gift,promotion_id,promotion_type,promotion_quantity,promotion_discount,gift_list,process_time,EXPR$0],
 rowType=[RecordType(BIGINT order_id, BIGINT stack_id, BIGINT order_item_id, 
BIGINT sku_id, VARCHAR(2147483647) sku_name, INTEGER quantity, INTEGER 
product_type, BIGINT original_price, BIGINT unit_original_price, BIGINT 
promotion_reduce_price, BIGINT coupon_reduce_price, BIGINT total_pay, BIGINT 
unit_total_pay, INTEGER is_gift, BIGINT promotion_id, INTEGER promotion_type, 
INTEGER promotion_quantity, BIGINT promotion_discount, VARCHAR(2147483647) 
gift_list, TIME ATTRIBUTE(PROCTIME) process_time, VARCHAR(2147483647) EXPR$0)], 
joinType=[LEFT])
select
  order_id,
  stack_id,
  order_item_id,
  sku_id,
  sku_name,
  quantity,
  product_type,
  original_price,
  unit_original_price,
  promotion_reduce_price,
  coupon_reduce_price,
  total_pay,
  unit_total_pay,
  is_gift,
  promotion_id,
  promotion_type,
  promotion_quantity,
  promotion_discount,
  process_time
from
  (
select
  order_id,
  stack_id,
  order_item_id,
  sku_id,
  sku_name,
  quantity,
  product_type,
  original_price,
  unit_original_price,
  promotion_reduce_price,
  coupon_reduce_price,
  total_pay,
  unit_total_pay,
  is_gift,
  promotion_id,
  promotion_type,
  if(
is_gift = 0,
promotion_quantity,
if(
  gift_item is null,
  promotion_quantity,
  if(
product_type = 1,
cast(
  JsonValue(gift_item, '$.quantity') as INTEGER
),
cast(1 as INTEGER)
  )
)
  ) as promotion_quantity,
  -- 赠品用内部属性复写
  if(
is_gift = 0,
promotion_discount,
if(
  gift_item is null,
  promotion_discount,
  cast(
JsonValue(gift_item, '$.discount') as BIGINT
  )
)
  ) as promotion_discount,
  -- 赠品用内部属性复写
  process_time
from
  (
select
  order_id,
  stack_id,
  order_item_id,
  cast(JsonValue(sku_info, '$.skuId') as BIGINT) as sku_id,
  JsonValue(sku_info, '$.name') as sku_name,
  if(
cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
cast(JsonValue(sku_info, '$.quantity') as INTEGER),
1
  ) as quantity,
  -- 1.标品, 2.散装
  cast(JsonValue(sku_info, '$.productType') as INTEGER) as product_type,
  original_price,
  original_price / if(
cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
cast(JsonValue(sku_info, '$.quantity') as INTEGER),
1
  ) as unit_original_price,
  promotion_reduce_price,
  coupon_reduce_price,
  total_pay,
  total_pay / if(
cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
cast(JsonValue(sku_info, '$.quantity') as INTEGER),
1
  ) as unit_total_pay,
  -- 单位商品支付金额
  cast(is_gift as INTEGER) as is_gift,
  if(
promotion_detail is null,
cast(-1 as BIGINT),
cast(
  JsonValue(promotion_detail, '$.promotionId') as BIGINT
)
  ) as promotion_id,
  if(
promotion_detail is null,
cast(-1 as INTEGER),
cast(
  JsonValue(promotion_detail, '$.promotionType') as INTEGER
)
  ) as promotion_type,
  if(
promotion_detail is null,
cast(0 as INTEGER),
if(
  cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
  cast(
JsonValue(promotion_detail, '$.quantity') as INTEGER
  ),
  cast(1 as INTEGER)
)
  ) as promotion_quantity,
  if(
promotion_detail is null,
cast(0 as BIGINT),
cast(
  JsonValue(promotion_detail, '$.discount') as BIGINT
)
  ) as promotion_discount,
  JsonValue(promotion_detail, '$.giftList') as gift_list,
  process_time
from
  (
select
  *
from
  (
select
  *,
  row_number() over (
partition by order_id,
order_item_id,
stack_id
order by
 

回复:flinksql问题请教

2021-07-06 Thread silence
已解决 where 条件始终为假。
--
发件人:silence 
发送时间:2021年7月7日(星期三) 12:05
收件人:user-zh 
主 题:flinksql问题请教


请教一下下面的sql为什么会被翻译成有限数据集?源表是个kafka source,用了row_number进行开窗和两次Table Function join
flink版本:1.12.2

Stage 1 : Data Source
content : Source: Values(tuples=[[]])
Stage 2 : Operator
content : Correlate(invocation=[LateralArray($cor3.gift_list)], 
correlate=[table(LateralArray($cor3.gift_list))], 
select=[order_id,stack_id,order_item_id,sku_id,sku_name,quantity,product_type,original_price,unit_original_price,promotion_reduce_price,coupon_reduce_price,total_pay,unit_total_pay,is_gift,promotion_id,promotion_type,promotion_quantity,promotion_discount,gift_list,process_time,EXPR$0],
 rowType=[RecordType(BIGINT order_id, BIGINT stack_id, BIGINT order_item_id, 
BIGINT sku_id, VARCHAR(2147483647) sku_name, INTEGER quantity, INTEGER 
product_type, BIGINT original_price, BIGINT unit_original_price, BIGINT 
promotion_reduce_price, BIGINT coupon_reduce_price, BIGINT total_pay, BIGINT 
unit_total_pay, INTEGER is_gift, BIGINT promotion_id, INTEGER promotion_type, 
INTEGER promotion_quantity, BIGINT promotion_discount, VARCHAR(2147483647) 
gift_list, TIME ATTRIBUTE(PROCTIME) process_time, VARCHAR(2147483647) EXPR$0)], 
joinType=[LEFT])
select
  order_id,
  stack_id,
  order_item_id,
  sku_id,
  sku_name,
  quantity,
  product_type,
  original_price,
  unit_original_price,
  promotion_reduce_price,
  coupon_reduce_price,
  total_pay,
  unit_total_pay,
  is_gift,
  promotion_id,
  promotion_type,
  promotion_quantity,
  promotion_discount,
  process_time
from
  (
select
  order_id,
  stack_id,
  order_item_id,
  sku_id,
  sku_name,
  quantity,
  product_type,
  original_price,
  unit_original_price,
  promotion_reduce_price,
  coupon_reduce_price,
  total_pay,
  unit_total_pay,
  is_gift,
  promotion_id,
  promotion_type,
  if(
is_gift = 0,
promotion_quantity,
if(
  gift_item is null,
  promotion_quantity,
  if(
product_type = 1,
cast(
  JsonValue(gift_item, '$.quantity') as INTEGER
),
cast(1 as INTEGER)
  )
)
  ) as promotion_quantity,
  -- 赠品用内部属性复写
  if(
is_gift = 0,
promotion_discount,
if(
  gift_item is null,
  promotion_discount,
  cast(
JsonValue(gift_item, '$.discount') as BIGINT
  )
)
  ) as promotion_discount,
  -- 赠品用内部属性复写
  process_time
from
  (
select
  order_id,
  stack_id,
  order_item_id,
  cast(JsonValue(sku_info, '$.skuId') as BIGINT) as sku_id,
  JsonValue(sku_info, '$.name') as sku_name,
  if(
cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
cast(JsonValue(sku_info, '$.quantity') as INTEGER),
1
  ) as quantity,
  -- 1.标品, 2.散装
  cast(JsonValue(sku_info, '$.productType') as INTEGER) as product_type,
  original_price,
  original_price / if(
cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
cast(JsonValue(sku_info, '$.quantity') as INTEGER),
1
  ) as unit_original_price,
  promotion_reduce_price,
  coupon_reduce_price,
  total_pay,
  total_pay / if(
cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
cast(JsonValue(sku_info, '$.quantity') as INTEGER),
1
  ) as unit_total_pay,
  -- 单位商品支付金额
  cast(is_gift as INTEGER) as is_gift,
  if(
promotion_detail is null,
cast(-1 as BIGINT),
cast(
  JsonValue(promotion_detail, '$.promotionId') as BIGINT
)
  ) as promotion_id,
  if(
promotion_detail is null,
cast(-1 as INTEGER),
cast(
  JsonValue(promotion_detail, '$.promotionType') as INTEGER
)
  ) as promotion_type,
  if(
promotion_detail is null,
cast(0 as INTEGER),
if(
  cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
  cast(
JsonValue(promotion_detail, '$.quantity') as INTEGER
  ),
  cast(1 as INTEGER)
)
  ) as promotion_quantity,
  if(
promotion_detail is null,
cast(0 as BIGINT),
cast(
  JsonValue(promotion_detail, '$.discount') as BIGINT
)
  ) as promotion_discount,
  JsonValue(promotion_detail, '$.giftList') as gift_list,
  process_