已解决 where 条件始终为假。
------------------------------------------------------------------
发件人:silence <slle...@aliyun.com.INVALID>
发送时间:2021年7月7日(星期三) 12:05
收件人:user-zh <user-zh@flink.apache.org>
主 题:flinksql问题请教


请教一下下面的sql为什么会被翻译成有限数据集?源表是个kafka source,用了row_number进行开窗和两次Table Function join
flink版本:1.12.2

Stage 1 : Data Source
content : Source: Values(tuples=[[]])
Stage 2 : Operator
content : Correlate(invocation=[LateralArray($cor3.gift_list)], 
correlate=[table(LateralArray($cor3.gift_list))], 
select=[order_id,stack_id,order_item_id,sku_id,sku_name,quantity,product_type,original_price,unit_original_price,promotion_reduce_price,coupon_reduce_price,total_pay,unit_total_pay,is_gift,promotion_id,promotion_type,promotion_quantity,promotion_discount,gift_list,process_time,EXPR$0],
 rowType=[RecordType(BIGINT order_id, BIGINT stack_id, BIGINT order_item_id, 
BIGINT sku_id, VARCHAR(2147483647) sku_name, INTEGER quantity, INTEGER 
product_type, BIGINT original_price, BIGINT unit_original_price, BIGINT 
promotion_reduce_price, BIGINT coupon_reduce_price, BIGINT total_pay, BIGINT 
unit_total_pay, INTEGER is_gift, BIGINT promotion_id, INTEGER promotion_type, 
INTEGER promotion_quantity, BIGINT promotion_discount, VARCHAR(2147483647) 
gift_list, TIME ATTRIBUTE(PROCTIME) process_time, VARCHAR(2147483647) EXPR$0)], 
joinType=[LEFT])
select
  order_id,
  stack_id,
  order_item_id,
  sku_id,
  sku_name,
  quantity,
  product_type,
  original_price,
  unit_original_price,
  promotion_reduce_price,
  coupon_reduce_price,
  total_pay,
  unit_total_pay,
  is_gift,
  promotion_id,
  promotion_type,
  promotion_quantity,
  promotion_discount,
  process_time
from
  (
    select
      order_id,
      stack_id,
      order_item_id,
      sku_id,
      sku_name,
      quantity,
      product_type,
      original_price,
      unit_original_price,
      promotion_reduce_price,
      coupon_reduce_price,
      total_pay,
      unit_total_pay,
      is_gift,
      promotion_id,
      promotion_type,
      if(
        is_gift = 0,
        promotion_quantity,
        if(
          gift_item is null,
          promotion_quantity,
          if(
            product_type = 1,
            cast(
              JsonValue(gift_item, '$.quantity') as INTEGER
            ),
            cast(1 as INTEGER)
          )
        )
      ) as promotion_quantity,
      -- 赠品用内部属性复写
      if(
        is_gift = 0,
        promotion_discount,
        if(
          gift_item is null,
          promotion_discount,
          cast(
            JsonValue(gift_item, '$.discount') as BIGINT
          )
        )
      ) as promotion_discount,
      -- 赠品用内部属性复写
      process_time
    from
      (
        select
          order_id,
          stack_id,
          order_item_id,
          cast(JsonValue(sku_info, '$.skuId') as BIGINT) as sku_id,
          JsonValue(sku_info, '$.name') as sku_name,
          if(
            cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
            cast(JsonValue(sku_info, '$.quantity') as INTEGER),
            1
          ) as quantity,
          -- 1.标品, 2.散装
          cast(JsonValue(sku_info, '$.productType') as INTEGER) as product_type,
          original_price,
          original_price / if(
            cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
            cast(JsonValue(sku_info, '$.quantity') as INTEGER),
            1
          ) as unit_original_price,
          promotion_reduce_price,
          coupon_reduce_price,
          total_pay,
          total_pay / if(
            cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
            cast(JsonValue(sku_info, '$.quantity') as INTEGER),
            1
          ) as unit_total_pay,
          -- 单位商品支付金额
          cast(is_gift as INTEGER) as is_gift,
          if(
            promotion_detail is null,
            cast(-1 as BIGINT),
            cast(
              JsonValue(promotion_detail, '$.promotionId') as BIGINT
            )
          ) as promotion_id,
          if(
            promotion_detail is null,
            cast(-1 as INTEGER),
            cast(
              JsonValue(promotion_detail, '$.promotionType') as INTEGER
            )
          ) as promotion_type,
          if(
            promotion_detail is null,
            cast(0 as INTEGER),
            if(
              cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
              cast(
                JsonValue(promotion_detail, '$.quantity') as INTEGER
              ),
              cast(1 as INTEGER)
            )
          ) as promotion_quantity,
          if(
            promotion_detail is null,
            cast(0 as BIGINT),
            cast(
              JsonValue(promotion_detail, '$.discount') as BIGINT
            )
          ) as promotion_discount,
          JsonValue(promotion_detail, '$.giftList') as gift_list,
          process_time
        from
          (
            select
              *
            from
              (
                select
                  *,
                  row_number() over (
                    partition by order_id,
                    order_item_id,
                    stack_id
                    order by
                      utime desc
                  ) as row_num
                from
                  sku_price
              )
            where
              row_num = 1
          ) sku_price
          LEFT JOIN LATERAL TABLE(LateralArray(promotion_reduce_price_detail)) 
as T(promotion_detail) ON TRUE
      ) pro_view
      LEFT JOIN LATERAL TABLE(LateralArray(gift_list)) as T(gift_item) ON TRUE
  ) pro_gift_view
where
  is_gift = 1
  and (
    is_gift = 0
    and (
      (
        promotion_id > 0
        and promotion_discount > 0
      )
      or promotion_id < 1
    )  );

Reply via email to