回复:flinksql问题请教

2021-07-06 文章 silence
已解决 where 条件始终为假。
--
发件人:silence 
发送时间:2021年7月7日(星期三) 12:05
收件人:user-zh 
主 题:flinksql问题请教


请教一下下面的sql为什么会被翻译成有限数据集?源表是个kafka source,用了row_number进行开窗和两次Table Function join
flink版本:1.12.2

Stage 1 : Data Source
content : Source: Values(tuples=[[]])
Stage 2 : Operator
content : Correlate(invocation=[LateralArray($cor3.gift_list)], 
correlate=[table(LateralArray($cor3.gift_list))], 
select=[order_id,stack_id,order_item_id,sku_id,sku_name,quantity,product_type,original_price,unit_original_price,promotion_reduce_price,coupon_reduce_price,total_pay,unit_total_pay,is_gift,promotion_id,promotion_type,promotion_quantity,promotion_discount,gift_list,process_time,EXPR$0],
 rowType=[RecordType(BIGINT order_id, BIGINT stack_id, BIGINT order_item_id, 
BIGINT sku_id, VARCHAR(2147483647) sku_name, INTEGER quantity, INTEGER 
product_type, BIGINT original_price, BIGINT unit_original_price, BIGINT 
promotion_reduce_price, BIGINT coupon_reduce_price, BIGINT total_pay, BIGINT 
unit_total_pay, INTEGER is_gift, BIGINT promotion_id, INTEGER promotion_type, 
INTEGER promotion_quantity, BIGINT promotion_discount, VARCHAR(2147483647) 
gift_list, TIME ATTRIBUTE(PROCTIME) process_time, VARCHAR(2147483647) EXPR$0)], 
joinType=[LEFT])
select
  order_id,
  stack_id,
  order_item_id,
  sku_id,
  sku_name,
  quantity,
  product_type,
  original_price,
  unit_original_price,
  promotion_reduce_price,
  coupon_reduce_price,
  total_pay,
  unit_total_pay,
  is_gift,
  promotion_id,
  promotion_type,
  promotion_quantity,
  promotion_discount,
  process_time
from
  (
select
  order_id,
  stack_id,
  order_item_id,
  sku_id,
  sku_name,
  quantity,
  product_type,
  original_price,
  unit_original_price,
  promotion_reduce_price,
  coupon_reduce_price,
  total_pay,
  unit_total_pay,
  is_gift,
  promotion_id,
  promotion_type,
  if(
is_gift = 0,
promotion_quantity,
if(
  gift_item is null,
  promotion_quantity,
  if(
product_type = 1,
cast(
  JsonValue(gift_item, '$.quantity') as INTEGER
),
cast(1 as INTEGER)
  )
)
  ) as promotion_quantity,
  -- 赠品用内部属性复写
  if(
is_gift = 0,
promotion_discount,
if(
  gift_item is null,
  promotion_discount,
  cast(
JsonValue(gift_item, '$.discount') as BIGINT
  )
)
  ) as promotion_discount,
  -- 赠品用内部属性复写
  process_time
from
  (
select
  order_id,
  stack_id,
  order_item_id,
  cast(JsonValue(sku_info, '$.skuId') as BIGINT) as sku_id,
  JsonValue(sku_info, '$.name') as sku_name,
  if(
cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
cast(JsonValue(sku_info, '$.quantity') as INTEGER),
1
  ) as quantity,
  -- 1.标品, 2.散装
  cast(JsonValue(sku_info, '$.productType') as INTEGER) as product_type,
  original_price,
  original_price / if(
cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
cast(JsonValue(sku_info, '$.quantity') as INTEGER),
1
  ) as unit_original_price,
  promotion_reduce_price,
  coupon_reduce_price,
  total_pay,
  total_pay / if(
cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
cast(JsonValue(sku_info, '$.quantity') as INTEGER),
1
  ) as unit_total_pay,
  -- 单位商品支付金额
  cast(is_gift as INTEGER) as is_gift,
  if(
promotion_detail is null,
cast(-1 as BIGINT),
cast(
  JsonValue(promotion_detail, '$.promotionId') as BIGINT
)
  ) as promotion_id,
  if(
promotion_detail is null,
cast(-1 as INTEGER),
cast(
  JsonValue(promotion_detail, '$.promotionType') as INTEGER
)
  ) as promotion_type,
  if(
promotion_detail is null,
cast(0 as INTEGER),
if(
  cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
  cast(
JsonValue(promotion_detail, '$.quantity') as INTEGER
  ),
  cast(1 as INTEGER)
)
  ) as promotion_quantity,
  if(
promotion_detail is null,
cast(0 as BIGINT),
cast(
  JsonValue(promotion_detail, '$.discount') as BIGINT
)
  ) as promotion_discount,
  JsonValue(promotion_detail, '$.giftList') as gift_list,
  process_time
from
  (
select
  *
from
  (
select
  *,
  row_number() 

Re: flinksql 流维join 问题

2021-07-06 文章 Terry Wang
语句看起来是没有问题的,可以检查下数据是否能关联上 

Best,
Terry Wang



> 2021年7月6日 下午3:24,赵旭晨  写道:
> 
> 流表:
> CREATE TABLE flink_doris_source (
> reporttime STRING,
> tenantcode STRING,
> visitid STRING,
> orderid STRING,
>   orderdetailid STRING,
>   paymentdetailid STRING,
>   etltype STRING,
>   ptime as proctime()
> ) WITH (
>  'connector' = 'kafka',  
>  'topic' = 'demo', 
>  'properties.bootstrap.servers' = '10.26.255.82:9092', 
>  'properties.group.id' = 'consumer-55',  
>  'format' = 'json',  
>  'scan.startup.mode' = 'latest-offset'  
> );
> 
> 维表:
> CREATE TABLE people (
>   `Id` int,
>   `Name` String,
>   `Sex` tinyint,
>   `Birth` timestamp,
>   `Etltype` String,
>   PRIMARY KEY (Id) NOT ENFORCED
> ) WITH (
>'connector' = 'jdbc',
>'url' = 'jdbc:mysql://10.26.20.122:3306/test1',
>'table-name' = 'people',
>'username'='root',
>'password'='***',
>'lookup.cache.ttl'='10s',
>'lookup.cache.max-rows'='20'
> );
> 
> mysql ddl:
> CREATE TABLE `people` (
>   `Id` int(11) NOT NULL AUTO_INCREMENT,
>   `Name` varchar(40) NOT NULL,
>   `Sex` tinyint(3) unsigned NOT NULL,
>   `Birth` datetime DEFAULT NULL,
>   `Etltype` varchar(40) NOT NULL,
>   PRIMARY KEY (`Id`)
> ) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4
> 
> 
> --流维join
> INSERT INTO flink_doris_sink select 
> a.reporttime,a.tenantcode,a.visitid,a.orderid,a.orderdetailid,a.paymentdetailid,b.Etltype
>  from flink_doris_source a left join people FOR SYSTEM_TIME AS OF a.ptime b 
> on a.tenantcode = b.Name;
> 
> 
> 
> 维表数据没有带出来,设置的维表ttl也没效果,是语句问题么?
> flink版本:flink-1.12.0-bin-scala_2.11.tgz   mysql:5.7.32   mysql驱动:8.0.22
> 
> 
> 
>  



flinksql问题请教

2021-07-06 文章 silence

请教一下下面的sql为什么会被翻译成有限数据集?源表是个kafka source,用了row_number进行开窗和两次Table Function join
flink版本:1.12.2

Stage 1 : Data Source
content : Source: Values(tuples=[[]])
Stage 2 : Operator
content : Correlate(invocation=[LateralArray($cor3.gift_list)], 
correlate=[table(LateralArray($cor3.gift_list))], 
select=[order_id,stack_id,order_item_id,sku_id,sku_name,quantity,product_type,original_price,unit_original_price,promotion_reduce_price,coupon_reduce_price,total_pay,unit_total_pay,is_gift,promotion_id,promotion_type,promotion_quantity,promotion_discount,gift_list,process_time,EXPR$0],
 rowType=[RecordType(BIGINT order_id, BIGINT stack_id, BIGINT order_item_id, 
BIGINT sku_id, VARCHAR(2147483647) sku_name, INTEGER quantity, INTEGER 
product_type, BIGINT original_price, BIGINT unit_original_price, BIGINT 
promotion_reduce_price, BIGINT coupon_reduce_price, BIGINT total_pay, BIGINT 
unit_total_pay, INTEGER is_gift, BIGINT promotion_id, INTEGER promotion_type, 
INTEGER promotion_quantity, BIGINT promotion_discount, VARCHAR(2147483647) 
gift_list, TIME ATTRIBUTE(PROCTIME) process_time, VARCHAR(2147483647) EXPR$0)], 
joinType=[LEFT])
select
  order_id,
  stack_id,
  order_item_id,
  sku_id,
  sku_name,
  quantity,
  product_type,
  original_price,
  unit_original_price,
  promotion_reduce_price,
  coupon_reduce_price,
  total_pay,
  unit_total_pay,
  is_gift,
  promotion_id,
  promotion_type,
  promotion_quantity,
  promotion_discount,
  process_time
from
  (
select
  order_id,
  stack_id,
  order_item_id,
  sku_id,
  sku_name,
  quantity,
  product_type,
  original_price,
  unit_original_price,
  promotion_reduce_price,
  coupon_reduce_price,
  total_pay,
  unit_total_pay,
  is_gift,
  promotion_id,
  promotion_type,
  if(
is_gift = 0,
promotion_quantity,
if(
  gift_item is null,
  promotion_quantity,
  if(
product_type = 1,
cast(
  JsonValue(gift_item, '$.quantity') as INTEGER
),
cast(1 as INTEGER)
  )
)
  ) as promotion_quantity,
  -- 赠品用内部属性复写
  if(
is_gift = 0,
promotion_discount,
if(
  gift_item is null,
  promotion_discount,
  cast(
JsonValue(gift_item, '$.discount') as BIGINT
  )
)
  ) as promotion_discount,
  -- 赠品用内部属性复写
  process_time
from
  (
select
  order_id,
  stack_id,
  order_item_id,
  cast(JsonValue(sku_info, '$.skuId') as BIGINT) as sku_id,
  JsonValue(sku_info, '$.name') as sku_name,
  if(
cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
cast(JsonValue(sku_info, '$.quantity') as INTEGER),
1
  ) as quantity,
  -- 1.标品, 2.散装
  cast(JsonValue(sku_info, '$.productType') as INTEGER) as product_type,
  original_price,
  original_price / if(
cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
cast(JsonValue(sku_info, '$.quantity') as INTEGER),
1
  ) as unit_original_price,
  promotion_reduce_price,
  coupon_reduce_price,
  total_pay,
  total_pay / if(
cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
cast(JsonValue(sku_info, '$.quantity') as INTEGER),
1
  ) as unit_total_pay,
  -- 单位商品支付金额
  cast(is_gift as INTEGER) as is_gift,
  if(
promotion_detail is null,
cast(-1 as BIGINT),
cast(
  JsonValue(promotion_detail, '$.promotionId') as BIGINT
)
  ) as promotion_id,
  if(
promotion_detail is null,
cast(-1 as INTEGER),
cast(
  JsonValue(promotion_detail, '$.promotionType') as INTEGER
)
  ) as promotion_type,
  if(
promotion_detail is null,
cast(0 as INTEGER),
if(
  cast(JsonValue(sku_info, '$.productType') as INTEGER) = 1,
  cast(
JsonValue(promotion_detail, '$.quantity') as INTEGER
  ),
  cast(1 as INTEGER)
)
  ) as promotion_quantity,
  if(
promotion_detail is null,
cast(0 as BIGINT),
cast(
  JsonValue(promotion_detail, '$.discount') as BIGINT
)
  ) as promotion_discount,
  JsonValue(promotion_detail, '$.giftList') as gift_list,
  process_time
from
  (
select
  *
from
  (
select
  *,
  row_number() over (
partition by order_id,
order_item_id,
stack_id
order by
 

Re:Reply:flinksql 流维join 问题

2021-07-06 文章 赵旭晨



单独查kafka,mysql都有数据,
关联temporal join 维表字段就带不出来

看了下流表的proctime比当前时间早8小时,会不会这个原因?








在 2021-07-06 15:33:08,"guoyb" <861277...@qq.com.INVALID> 写道:
>select的问题,排查一下,单独查表是否有数据,关联条件。
>
>
>
>---Original---
>From: "赵旭晨"Sent at: 2021年7月6日(Tue) PM3:25
>To: "user-zh"Subject: flinksql 流维join 问题
>
>
>流表:
>CREATE TABLE flink_doris_source (
>  reporttime STRING,
>  tenantcode STRING,
>  visitid STRING,
>  orderid STRING,
>orderdetailid STRING,
>paymentdetailid STRING,
>etltype STRING,
>ptime as proctime()
>) WITH (
>'connector' = 'kafka', 
>'topic' = 'demo', 
>'properties.bootstrap.servers' = '10.26.255.82:9092', 
>'properties.group.id' = 'consumer-55', 
>'format' = 'json', 
>'scan.startup.mode' = 'latest-offset' 
>);
>
>
>维表:
>CREATE TABLE people (
> `Id` int,
> `Name` String,
> `Sex` tinyint,
> `Birth` timestamp,
> `Etltype` String,
> PRIMARY KEY (Id) NOT ENFORCED
>) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://10.26.20.122:3306/test1',
> 'table-name' = 'people',
> 'username'='root',
> 'password'='***',
> 'lookup.cache.ttl'='10s',
> 'lookup.cache.max-rows'='20'
>);
>
>
>mysql ddl:
>CREATE TABLE `people` (
> `Id` int(11) NOT NULL AUTO_INCREMENT,
> `Name` varchar(40) NOT NULL,
> `Sex` tinyint(3) unsigned NOT NULL,
> `Birth` datetime DEFAULT NULL,
> `Etltype` varchar(40) NOT NULL,
> PRIMARY KEY (`Id`)
>) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4
>
>
>
>
>
>--流维join
>INSERT INTO flink_doris_sink select 
>a.reporttime,a.tenantcode,a.visitid,a.orderid,a.orderdetailid,a.paymentdetailid,b.Etltype
> from flink_doris_source a left join people FOR SYSTEM_TIME AS OF a.ptime b on 
>a.tenantcode = b.Name;
>
>
>
>
>
>维表数据没有带出来,设置的维表ttl也没效果,是语句问题么?
>flink版本:flink-1.12.0-bin-scala_2.11.tgz mysql:5.7.32 
>mysql驱动:8.0.22
>
>
>
>
>
>
>
>
>
>


Re: 退订

2021-07-06 文章 Px New
如果需要取消订阅 u...@flink.apache.org 
邮件组,请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org  。

张保淇  于2021年7月6日周二 下午4:13写道:

> 退订


退订

2021-07-06 文章 张保淇
退订

Re: Flink 1.10 内存问题

2021-07-06 文章 Yun Tang
Hi,

有可能的,如果下游发生了死锁,无法消费任何数据的话,整个作业就假死了。要定位root cause还是需要查一下下游的task究竟怎么了

祝好
唐云

From: Ada Luna 
Sent: Tuesday, July 6, 2021 12:04
To: user-zh@flink.apache.org 
Subject: Re: Flink 1.10 内存问题

反压会导致整个Flink任务假死吗?一条Kafka数据都不消费了。持续几天,不重启不恢复的

Yun Tang  于2021年7月6日周二 上午11:12写道:
>
> Hi,
>
> LocalBufferPool.requestMemorySegment 
> 这个方法并不是在申请内存,而是因为作业存在反压,因为下游没有及时消费,相关buffer被占用,所以上游会卡在requestMemorySegment上面。
>
> 想要解决还是查一下为什么下游会反压。
>
>
> 祝好
> 唐云
> 
> From: Ada Luna 
> Sent: Tuesday, July 6, 2021 10:43
> To: user-zh@flink.apache.org 
> Subject: Re: Flink 1.10 内存问题
>
> "Source: test_records (2/3)" #78 prio=5 os_prio=0
> tid=0x7fd4c4a24800 nid=0x21bf in Object.wait()
> [0x7fd4d581a000]
> java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:251)
> - locked <0x00074d8b0df0> (a java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:264)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:192)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> - locked <0x00074cbd3be0> (a java.lang.Object)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> - locked <0x00074cbd3be0> (a java.lang.Object)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711)
> at 
> com.dtstack.flink.sql.source.kafka.KafkaConsumer011.run(KafkaConsumer011.java:66)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
>
> Ada Luna  于2021年7月6日周二 上午10:13写道:
> >
> > 下面报错调大TaskManager内存即可解决,但是我不知道为什么Flink内存不够大会出现如下假死情况。申请内存卡住。整个任务状态为RUNNING但是不再消费数据。
> >
> >
> >
> > "Map -> to: Tuple2 -> Map -> (from: (id, sid, item, val, unit, dt,
> > after_index, tablename, PROCTIME) -> where: (AND(=(tablename,
> > CONCAT(_UTF-16LE't_real', currtime2(dt, _UTF-16LE'MMdd'))),
> > OR(=(after_index, _UTF-16LE'2.6'), AND(=(sid, _UTF-16LE'7296'),
> > =(after_index, _UTF-16LE'2.10'))), LIKE(item, _UTF-16LE'??5%'),
> > =(MOD(getminutes(dt), 5), 0))), select: (id AS ID, sid AS STID, item
> > AS ITEM, val AS VAL, unit AS UNIT, dt AS DATATIME), from: (id, sid,
> > item, val, unit, dt, after_index, tablename, PROCTIME) -> where:
> > (AND(=(tablename, CONCAT(_UTF-16LE't_real', currtime2(dt,
> > _UTF-16LE'MMdd'))), OR(=(after_index, _UTF-16LE'2.6'), AND(=(sid,
> > _UTF-16LE'7296'), =(after_index, _UTF-16LE'2.10'))), LIKE(item,
> > _UTF-16LE'??5%'), =(MOD(getminutes(dt), 5), 0))), select: (sid AS
> > STID, val AS VAL, dt AS DATATIME)) (1/1)" #79 prio=5 os_prio=0
> > tid=0x7fd4c4a94000 nid=0x21c0 in Object.wait()
> > [0x7fd4d5719000]
> > java.lang.Thread.State: TIMED_WAITING (on object monitor)
> > at java.lang.Object.wait(Native Method)
> > at 
> > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:251)
> > - locked <0x00074e6c8b98> (a java.util.ArrayDeque)
> > at 
> > 

Reply:flinksql 流维join 问题

2021-07-06 文章 guoyb
select的问题,排查一下,单独查表是否有数据,关联条件。



---Original---
From: "赵旭晨"

flinksql 流维join 问题

2021-07-06 文章 赵旭晨
流表:
CREATE TABLE flink_doris_source (
reporttime STRING,
tenantcode STRING,
visitid STRING,
orderid STRING,
orderdetailid STRING,
paymentdetailid STRING,
etltype STRING,
ptime as proctime()
) WITH (
 'connector' = 'kafka',  
 'topic' = 'demo', 
 'properties.bootstrap.servers' = '10.26.255.82:9092', 
 'properties.group.id' = 'consumer-55',  
 'format' = 'json',  
 'scan.startup.mode' = 'latest-offset'  
);


维表:
CREATE TABLE people (
  `Id` int,
  `Name` String,
  `Sex` tinyint,
  `Birth` timestamp,
  `Etltype` String,
  PRIMARY KEY (Id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://10.26.20.122:3306/test1',
   'table-name' = 'people',
   'username'='root',
   'password'='***',
   'lookup.cache.ttl'='10s',
   'lookup.cache.max-rows'='20'
);


mysql ddl:
CREATE TABLE `people` (
  `Id` int(11) NOT NULL AUTO_INCREMENT,
  `Name` varchar(40) NOT NULL,
  `Sex` tinyint(3) unsigned NOT NULL,
  `Birth` datetime DEFAULT NULL,
  `Etltype` varchar(40) NOT NULL,
  PRIMARY KEY (`Id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4




--流维join
INSERT INTO flink_doris_sink select 
a.reporttime,a.tenantcode,a.visitid,a.orderid,a.orderdetailid,a.paymentdetailid,b.Etltype
 from flink_doris_source a left join people FOR SYSTEM_TIME AS OF a.ptime b on 
a.tenantcode = b.Name;




维表数据没有带出来,设置的维表ttl也没效果,是语句问题么?
flink版本:flink-1.12.0-bin-scala_2.11.tgz   mysql:5.7.32   mysql驱动:8.0.22