Re: Flink SQL支持side output

2021-09-15 文章 Caizhi Weng
Hi!

就我所知目前暂时没有支持 side output 的打算。可以描述一下需求和场景吗?

Ada Luna  于2021年9月15日周三 下午8:38写道:

> Flink SQL 未来会支持side output,侧向输出一些脏数据吗?
>


Re: [ANNOUNCE] Flink mailing lists archive service has migrated to Apache Archive service

2021-09-15 文章 Matthias Pohl
Thanks Leonard for the announcement. I guess that is helpful.

@Robert is there any way we can change the default setting to something
else (e.g. greater than 0 days)? Only having the last month available as a
default is kind of annoying considering that the time setting is quite
hidden.

Matthias

PS: As a workaround, one could use the gte=0d parameter which is encoded in
the URL (e.g. if you use managed search engines in Chrome or Firefox's
bookmark keywords:
https://lists.apache.org/x/list.html?u...@flink.apache.org:gte=0d:%s). That
will make all posts available right-away.

On Mon, Sep 6, 2021 at 3:16 PM JING ZHANG  wrote:

> Thanks Leonard for driving this.
> The information is helpful.
>
> Best,
> JING ZHANG
>
> Jark Wu  于2021年9月6日周一 下午4:59写道:
>
>> Thanks Leonard,
>>
>> I have seen many users complaining that the Flink mailing list doesn't
>> work (they were using Nabble).
>> I think this information would be very helpful.
>>
>> Best,
>> Jark
>>
>> On Mon, 6 Sept 2021 at 16:39, Leonard Xu  wrote:
>>
>>> Hi, all
>>>
>>> The mailing list archive service Nabble Archive was broken at the end of
>>> June, the Flink community has migrated the mailing lists archives[1] to
>>> Apache Archive service by commit[2], you can refer [3] to know more mailing
>>> lists archives of Flink.
>>>
>>> Apache Archive service is maintained by ASF thus the stability is
>>> guaranteed, it’s a web-based mail archive service which allows you to
>>> browse, search, interact, subscribe, unsubscribe, etc. with mailing lists.
>>>
>>> Apache Archive service shows mails of the last month by default, you can
>>> specify the date range to browse, search the history mails.
>>>
>>>
>>> Hope it would be helpful.
>>>
>>> Best,
>>> Leonard
>>>
>>> [1] The Flink mailing lists in Apache archive service
>>> dev mailing list archives:
>>> https://lists.apache.org/list.html?d...@flink.apache.org
>>> user mailing list archives :
>>> https://lists.apache.org/list.html?u...@flink.apache.org
>>> user-zh mailing list archives :
>>> https://lists.apache.org/list.html?user-zh@flink.apache.org
>>> [2]
>>> https://github.com/apache/flink-web/commit/9194dda862da00d93f627fd315056471657655d1
>>> [3] https://flink.apache.org/community.html#mailing-lists
>>
>>


Flink SQL支持side output

2021-09-15 文章 Ada Luna
Flink SQL 未来会支持side output,侧向输出一些脏数据吗?


Re: flink cdc SQL2ES,GC overhead limit exceeded

2021-09-15 文章 Leonard Xu
应该和Flink CDC无关,CDC只是source,这个异常栈是从join节点抛出来的,你可以贴下你的SQL和配置
这样大家才好分析一点
Best,
Leonard


> 在 2021年9月15日,15:01,wukon...@foxmail.com 写道:
> 
> hi LIYUAN:
> 请描述下如何使用的flink,以及什么场景下 会造成这个报错, 这样方便大家帮助你定位问题。
> 
> 
> 
> wukon...@foxmail.com
> 
> 发件人: LI YUAN
> 发送时间: 2021-09-09 20:38
> 收件人: user-zh
> 主题: flink cdc SQL2ES,GC overhead limit exceeded
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at org.rocksdb.RocksIterator.key0(Native Method)
> at org.rocksdb.RocksIterator.key(RocksIterator.java:37)
> at 
> org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.key(RocksIteratorWrapper.java:99)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapIterator.loadCache(RocksDBMapState.java:670)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapIterator.hasNext(RocksDBMapState.java:585)
> at 
> org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateViews$InputSideHasNoUniqueKey$1.hasNext(OuterJoinRecordStateViews.java:285)
> at 
> org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator$AssociatedRecords.of(AbstractStreamingJoinOperator.java:199)
> at 
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:211)
> at 
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement2(StreamingJoinOperator.java:129)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord2(StreamTwoInputProcessorFactory.java:221)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$1(StreamTwoInputProcessorFactory.java:190)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$$Lambda$363/366743523.accept(Unknown
>  Source)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:291)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$208/999379751.runDefaultAction(Unknown
>  Source)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$625/601779266.run(Unknown
>  Source)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> at java.lang.Thread.run(Thread.java:748)
> 
> Environment :
> 
> Flink version : 1.13.1
> 
> Flink CDC version: 1.4.0
> 
> Database and version: Mysql 7.0



????: flink cdc SQL2ES??GC overhead limit exceeded

2021-09-15 文章 wukon...@foxmail.com
hi LIYUAN:
 ??flink  




wukon...@foxmail.com
 
 LI YUAN
?? 2021-09-09 20:38
 user-zh
?? flink cdc SQL2ES??GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.rocksdb.RocksIterator.key0(Native Method)
at org.rocksdb.RocksIterator.key(RocksIterator.java:37)
at 
org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.key(RocksIteratorWrapper.java:99)
at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapIterator.loadCache(RocksDBMapState.java:670)
at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapIterator.hasNext(RocksDBMapState.java:585)
at 
org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateViews$InputSideHasNoUniqueKey$1.hasNext(OuterJoinRecordStateViews.java:285)
at 
org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator$AssociatedRecords.of(AbstractStreamingJoinOperator.java:199)
at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:211)
at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement2(StreamingJoinOperator.java:129)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord2(StreamTwoInputProcessorFactory.java:221)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$1(StreamTwoInputProcessorFactory.java:190)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$$Lambda$363/366743523.accept(Unknown
 Source)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:291)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$208/999379751.runDefaultAction(Unknown
 Source)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$625/601779266.run(Unknown
 Source)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
 
Environment :
 
Flink version : 1.13.1
 
Flink CDC version: 1.4.0
 
Database and version: Mysql 7.0


回复: Re: Temporal Joins 报 Currently the join key in Temporal Table Join can not be empty

2021-09-15 文章 wukon...@foxmail.com
Hi Wayne :
   请查考Flink Join 官方文档, 右侧构建侧 需要定义  PRIMARY KEY 同时 在join 条件的时候带上。
参考地址: 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/joins.html#%E5%9F%BA%E4%BA%8E%E4%BA%8B%E4%BB%B6%E6%97%B6%E9%97%B4%E7%9A%84%E6%97%B6%E6%80%81-join
 

内容: 
注意: 基于事件时间的时态表 Join 是通过左右两侧的 watermark 触发,请确保为 join 两侧的表设置了合适的 watermark。
注意: 基于事件时间的时态表 Join 的 join key 必须包含时态表的主键,例如:表 product_changelog 的主键 
P.product_id 必须包含在 join 条件 O.product_id = P.product_id 中。



wukon...@foxmail.com
 
发件人: Wayne
发送时间: 2021-09-09 23:29
收件人: user-zh
主题: Re:Re:Re: Re: Temporal Joins 报 Currently the join key in Temporal Table 
Join can not be empty
Hello 
 
 
打扰了我最近再次尝试,还是报这个错误
我的flink版本为 flink-1.12.2-bin-scala_2.12
使用sql client执行
我的sql 如下  
 
 
CREATE TABLE stub_trans (
`uuid` STRING,
`columnInfos` MAP NOT 
NULL>  NOT NULL,
procTime TIMESTAMP(3) METADATA FROM 'timestamp' , 
WATERMARK FOR procTime AS procTime   
) WITH (
'connector' = 'kafka',
..
'format' = 'avro'
);
 
 
CREATE TABLE kapeng_test (
 
 
`event_id` STRING,
`genre_id` STRING,
`user_guid` STRING,
procTime TIMESTAMP(3) METADATA FROM 'timestamp'  ,  
WATERMARK FOR procTime AS procTime   
) WITH (
'connector' = 'kafka',

'format' = 'avro'
);
 
 
 
 
CREATE TABLE purchase_hist (
rowkey STRING,
d ROW < cost STRING, crdate STRING, currency STRING, eventid STRING, 
genreid STRING, quantity STRING >,
PRIMARY KEY ( rowkey ) NOT ENFORCED 
) WITH ( 
'connector' = 'hbase-1.4', 
.);
 
 
 
 
INSERT INTO purchase_hist SELECT
rowkey,
ROW ( cost, crdate, currency, eventid, genreid, quantity ) 
FROM
( SELECT
CONCAT_WS(
'_',
user_guid,
CAST( CAST( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue, 
'-MM-dd:HH:mm:ss.S' ) AS BIGINT ) AS STRING )) AS rowkey,
columnInfos [ 'TICKET_COST' ].newValue AS cost,
DATE_FORMAT( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue, 
'-MM-dd:HH:mm:ss.S' ), '-MM-dd' ) AS crdate,
columnInfos [ 'EVENT_ID' ].newValue AS eventid,
columnInfos [ 'CURRENCY_CODE' ].newValue AS currency,
columnInfos [ 'QUANTITY' ].newValue AS quantity,
genre_id AS genreid,
user_guid AS userGuid 
FROM
stub_trans
   LEFT  JOIN kapeng_test FOR SYSTEM_TIME AS OF stub_trans.procTime ON 
stub_trans.columnInfos [ 'EVENT_ID' ].newValue = kapeng_test.event_id )m
 
 
 
 
报错如下
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Currently the join key in Temporal Table Join can not be empty.
at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.immutable.Range.foreach(Range.scala:158)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at 

Re: hbase 列设置TTL过期后,flink不能再写入数据

2021-09-15 文章 wukon...@foxmail.com
hi :
 你是如何定义的source 表 和sink 表的吗? 能贴出来具体的建表语句吗? 



wukon...@foxmail.com
 
发件人: xiaohui zhang
发送时间: 2021-09-09 17:19
收件人: user-zh
主题: hbase 列设置TTL过期后,flink不能再写入数据
Flink:1.12.1
Flink-connector: 2.2
Hbase: 2.1.0 + CDH6.3.2
现象:如果hbase列族设置了TTL,当某一rowkey写入数据,到达过期时间,列族会被hbase标记为删除。
后续如果有相同key的数据过来,flink无法将数据写入到hbase中,查询hbase中列族一直为空。
 
执行的过程大致如下:
创建Hbase表,test, 两个列族 cf1 , TTL 60, cf2, TTL 120,
数据TTL分别为1分钟,2分钟。
使用sql写入数据至表中
 
insert into test
select
'rowkey',
ROW('123'),
ROW('456')
from
   sometable;
 
过一分钟后,通过hbase 查询,可发现无cf1数据,两分钟后该rowkey无对应数据。
此时再通过flink写入数据,发现无法写入,且flink不报错
 
请问这个情况是Bug,还是Hbase的问题呢?