Re: Flink SQL支持side output
Hi! 就我所知目前暂时没有支持 side output 的打算。可以描述一下需求和场景吗? Ada Luna 于2021年9月15日周三 下午8:38写道: > Flink SQL 未来会支持side output,侧向输出一些脏数据吗? >
Re: [ANNOUNCE] Flink mailing lists archive service has migrated to Apache Archive service
Thanks Leonard for the announcement. I guess that is helpful. @Robert is there any way we can change the default setting to something else (e.g. greater than 0 days)? Only having the last month available as a default is kind of annoying considering that the time setting is quite hidden. Matthias PS: As a workaround, one could use the gte=0d parameter which is encoded in the URL (e.g. if you use managed search engines in Chrome or Firefox's bookmark keywords: https://lists.apache.org/x/list.html?u...@flink.apache.org:gte=0d:%s). That will make all posts available right-away. On Mon, Sep 6, 2021 at 3:16 PM JING ZHANG wrote: > Thanks Leonard for driving this. > The information is helpful. > > Best, > JING ZHANG > > Jark Wu 于2021年9月6日周一 下午4:59写道: > >> Thanks Leonard, >> >> I have seen many users complaining that the Flink mailing list doesn't >> work (they were using Nabble). >> I think this information would be very helpful. >> >> Best, >> Jark >> >> On Mon, 6 Sept 2021 at 16:39, Leonard Xu wrote: >> >>> Hi, all >>> >>> The mailing list archive service Nabble Archive was broken at the end of >>> June, the Flink community has migrated the mailing lists archives[1] to >>> Apache Archive service by commit[2], you can refer [3] to know more mailing >>> lists archives of Flink. >>> >>> Apache Archive service is maintained by ASF thus the stability is >>> guaranteed, it’s a web-based mail archive service which allows you to >>> browse, search, interact, subscribe, unsubscribe, etc. with mailing lists. >>> >>> Apache Archive service shows mails of the last month by default, you can >>> specify the date range to browse, search the history mails. >>> >>> >>> Hope it would be helpful. >>> >>> Best, >>> Leonard >>> >>> [1] The Flink mailing lists in Apache archive service >>> dev mailing list archives: >>> https://lists.apache.org/list.html?d...@flink.apache.org >>> user mailing list archives : >>> https://lists.apache.org/list.html?u...@flink.apache.org >>> user-zh mailing list archives : >>> https://lists.apache.org/list.html?user-zh@flink.apache.org >>> [2] >>> https://github.com/apache/flink-web/commit/9194dda862da00d93f627fd315056471657655d1 >>> [3] https://flink.apache.org/community.html#mailing-lists >> >>
Flink SQL支持side output
Flink SQL 未来会支持side output,侧向输出一些脏数据吗?
Re: flink cdc SQL2ES,GC overhead limit exceeded
应该和Flink CDC无关,CDC只是source,这个异常栈是从join节点抛出来的,你可以贴下你的SQL和配置 这样大家才好分析一点 Best, Leonard > 在 2021年9月15日,15:01,wukon...@foxmail.com 写道: > > hi LIYUAN: > 请描述下如何使用的flink,以及什么场景下 会造成这个报错, 这样方便大家帮助你定位问题。 > > > > wukon...@foxmail.com > > 发件人: LI YUAN > 发送时间: 2021-09-09 20:38 > 收件人: user-zh > 主题: flink cdc SQL2ES,GC overhead limit exceeded > java.lang.OutOfMemoryError: GC overhead limit exceeded > at org.rocksdb.RocksIterator.key0(Native Method) > at org.rocksdb.RocksIterator.key(RocksIterator.java:37) > at > org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.key(RocksIteratorWrapper.java:99) > at > org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapIterator.loadCache(RocksDBMapState.java:670) > at > org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapIterator.hasNext(RocksDBMapState.java:585) > at > org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateViews$InputSideHasNoUniqueKey$1.hasNext(OuterJoinRecordStateViews.java:285) > at > org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator$AssociatedRecords.of(AbstractStreamingJoinOperator.java:199) > at > org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:211) > at > org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement2(StreamingJoinOperator.java:129) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord2(StreamTwoInputProcessorFactory.java:221) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$1(StreamTwoInputProcessorFactory.java:190) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$$Lambda$363/366743523.accept(Unknown > Source) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:291) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$208/999379751.runDefaultAction(Unknown > Source) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$625/601779266.run(Unknown > Source) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.lang.Thread.run(Thread.java:748) > > Environment : > > Flink version : 1.13.1 > > Flink CDC version: 1.4.0 > > Database and version: Mysql 7.0
????: flink cdc SQL2ES??GC overhead limit exceeded
hi LIYUAN: ??flink wukon...@foxmail.com LI YUAN ?? 2021-09-09 20:38 user-zh ?? flink cdc SQL2ES??GC overhead limit exceeded java.lang.OutOfMemoryError: GC overhead limit exceeded at org.rocksdb.RocksIterator.key0(Native Method) at org.rocksdb.RocksIterator.key(RocksIterator.java:37) at org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.key(RocksIteratorWrapper.java:99) at org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapIterator.loadCache(RocksDBMapState.java:670) at org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapIterator.hasNext(RocksDBMapState.java:585) at org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateViews$InputSideHasNoUniqueKey$1.hasNext(OuterJoinRecordStateViews.java:285) at org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator$AssociatedRecords.of(AbstractStreamingJoinOperator.java:199) at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:211) at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement2(StreamingJoinOperator.java:129) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord2(StreamTwoInputProcessorFactory.java:221) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$1(StreamTwoInputProcessorFactory.java:190) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$$Lambda$363/366743523.accept(Unknown Source) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:291) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$208/999379751.runDefaultAction(Unknown Source) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$625/601779266.run(Unknown Source) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) Environment : Flink version : 1.13.1 Flink CDC version: 1.4.0 Database and version: Mysql 7.0
回复: Re: Temporal Joins 报 Currently the join key in Temporal Table Join can not be empty
Hi Wayne : 请查考Flink Join 官方文档, 右侧构建侧 需要定义 PRIMARY KEY 同时 在join 条件的时候带上。 参考地址: https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/joins.html#%E5%9F%BA%E4%BA%8E%E4%BA%8B%E4%BB%B6%E6%97%B6%E9%97%B4%E7%9A%84%E6%97%B6%E6%80%81-join 内容: 注意: 基于事件时间的时态表 Join 是通过左右两侧的 watermark 触发,请确保为 join 两侧的表设置了合适的 watermark。 注意: 基于事件时间的时态表 Join 的 join key 必须包含时态表的主键,例如:表 product_changelog 的主键 P.product_id 必须包含在 join 条件 O.product_id = P.product_id 中。 wukon...@foxmail.com 发件人: Wayne 发送时间: 2021-09-09 23:29 收件人: user-zh 主题: Re:Re:Re: Re: Temporal Joins 报 Currently the join key in Temporal Table Join can not be empty Hello 打扰了我最近再次尝试,还是报这个错误 我的flink版本为 flink-1.12.2-bin-scala_2.12 使用sql client执行 我的sql 如下 CREATE TABLE stub_trans ( `uuid` STRING, `columnInfos` MAP NOT NULL> NOT NULL, procTime TIMESTAMP(3) METADATA FROM 'timestamp' , WATERMARK FOR procTime AS procTime ) WITH ( 'connector' = 'kafka', .. 'format' = 'avro' ); CREATE TABLE kapeng_test ( `event_id` STRING, `genre_id` STRING, `user_guid` STRING, procTime TIMESTAMP(3) METADATA FROM 'timestamp' , WATERMARK FOR procTime AS procTime ) WITH ( 'connector' = 'kafka', 'format' = 'avro' ); CREATE TABLE purchase_hist ( rowkey STRING, d ROW < cost STRING, crdate STRING, currency STRING, eventid STRING, genreid STRING, quantity STRING >, PRIMARY KEY ( rowkey ) NOT ENFORCED ) WITH ( 'connector' = 'hbase-1.4', .); INSERT INTO purchase_hist SELECT rowkey, ROW ( cost, crdate, currency, eventid, genreid, quantity ) FROM ( SELECT CONCAT_WS( '_', user_guid, CAST( CAST( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue, '-MM-dd:HH:mm:ss.S' ) AS BIGINT ) AS STRING )) AS rowkey, columnInfos [ 'TICKET_COST' ].newValue AS cost, DATE_FORMAT( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue, '-MM-dd:HH:mm:ss.S' ), '-MM-dd' ) AS crdate, columnInfos [ 'EVENT_ID' ].newValue AS eventid, columnInfos [ 'CURRENCY_CODE' ].newValue AS currency, columnInfos [ 'QUANTITY' ].newValue AS quantity, genre_id AS genreid, user_guid AS userGuid FROM stub_trans LEFT JOIN kapeng_test FOR SYSTEM_TIME AS OF stub_trans.procTime ON stub_trans.columnInfos [ 'EVENT_ID' ].newValue = kapeng_test.event_id )m 报错如下 Exception in thread "main" org.apache.flink.table.api.ValidationException: Currently the join key in Temporal Table Join can not be empty. at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.immutable.Range.foreach(Range.scala:158) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at
Re: hbase 列设置TTL过期后,flink不能再写入数据
hi : 你是如何定义的source 表 和sink 表的吗? 能贴出来具体的建表语句吗? wukon...@foxmail.com 发件人: xiaohui zhang 发送时间: 2021-09-09 17:19 收件人: user-zh 主题: hbase 列设置TTL过期后,flink不能再写入数据 Flink:1.12.1 Flink-connector: 2.2 Hbase: 2.1.0 + CDH6.3.2 现象:如果hbase列族设置了TTL,当某一rowkey写入数据,到达过期时间,列族会被hbase标记为删除。 后续如果有相同key的数据过来,flink无法将数据写入到hbase中,查询hbase中列族一直为空。 执行的过程大致如下: 创建Hbase表,test, 两个列族 cf1 , TTL 60, cf2, TTL 120, 数据TTL分别为1分钟,2分钟。 使用sql写入数据至表中 insert into test select 'rowkey', ROW('123'), ROW('456') from sometable; 过一分钟后,通过hbase 查询,可发现无cf1数据,两分钟后该rowkey无对应数据。 此时再通过flink写入数据,发现无法写入,且flink不报错 请问这个情况是Bug,还是Hbase的问题呢?