Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问
Hi 社区。 Flink 1.12.1 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有 forword 的ETL没有作用。 insert into table_a select id,udf(a),b,c from table_b; 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置? 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗? ``` == Physical Execution Plan == Stage 1 : Data Source content : Source: TableSourceScan(table=[[default_catalog, default_database, temp_table]], fields=[id...]) Stage 3 : Operator content : ChangelogNormalize(key=[id]) ship_strategy : HASH Stage 4 : Operator content : Calc(select=[...]) ship_strategy : FORWARD Stage 5 : Data Sink content : Sink: Sink(table=[default_catalog.default_database.table_a], fields=[id...]) ship_strategy : FORWARD ```
Re: flink sql中如何使用异步io关联维表?
定义一个 sourcetable -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-savepoint问题
对于 keyed state,需要保证同一个 key 在 同一个 keygroup 中,如果是某个 key 有热点,可以在 keyby 之前进行一次 map(在 key 后面拼接一些 后缀),然后 keyby,最后处理完成之后,将这些进行聚合 Best, Congxian guomuhua <663021...@qq.com> 于2021年3月4日周四 下午12:49写道: > 我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢? > nobleyd wrote > > 是不是使用了随机key。 > > > guaishushu1103@ > > > < > > > guaishushu1103@ > > > > 于2021年3月3日周三 下午6:53写道:> checkpoint 可以成功保存,但是savepoint出现错误:> > > java.lang.Exception: Could not materialize checkpoint 2404 for operator> > > KeyedProcess (21/48).> at> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)> > > > at> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)> > > > at> > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)> > > > at> > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)> > > > at java.lang.Thread.run(Thread.java:745)> Caused by: > > java.util.concurrent.ExecutionException:> > > java.lang.IllegalArgumentException: Key group 0 is not in> > > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at > > java.util.concurrent.FutureTask.report(FutureTask.java:122)> at > > java.util.concurrent.FutureTask.get(FutureTask.java:192)> at> > > > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)> > > > at> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer. > > > (OperatorSnapshotFinalizer.java:47)> at> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)> > > > ... 3 more> Caused by: java.lang.IllegalArgumentException: Key group 0 is > > not in> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at> > > > org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)> > > > at> > > > org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)> > > > at> > > > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)> > > > at> > > > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)> > > > at> > > > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)> > > > at> > > > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)> > > > at> > > > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)> > > > at java.util.concurrent.FutureTask.run(FutureTask.java:266)> at> > > > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)> > > > ... 5 more>>> > > > guaishushu1103@ > > >> > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-savepoint问题
我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢? nobleyd wrote > 是不是使用了随机key。 > guaishushu1103@ > < > guaishushu1103@ > > 于2021年3月3日周三 下午6:53写道:> checkpoint 可以成功保存,但是savepoint出现错误:> > java.lang.Exception: Could not materialize checkpoint 2404 for operator> > KeyedProcess (21/48).> at> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)> > > at> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)> > > at> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)> > > at> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)> > > at java.lang.Thread.run(Thread.java:745)> Caused by: > java.util.concurrent.ExecutionException:> > java.lang.IllegalArgumentException: Key group 0 is not in> > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at > java.util.concurrent.FutureTask.report(FutureTask.java:122)> at > java.util.concurrent.FutureTask.get(FutureTask.java:192)> at> > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)> > > at> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer. > (OperatorSnapshotFinalizer.java:47)> at> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)> > > ... 3 more> Caused by: java.lang.IllegalArgumentException: Key group 0 is > not in> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at> > org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)> > > at> > org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)> > > at> > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)> > > at> > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)> > > at> > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)> > > at> > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)> > > at> > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)> > > at java.util.concurrent.FutureTask.run(FutureTask.java:266)> at> > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)> > > ... 5 more>>> > guaishushu1103@ >> -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-savepoint问题
我也遇到同样问题,为了打散数据,在keyby时加了随机数作为后缀,去掉随机数,可以正常savepoint,加上随机数就savepoint失败。所以如果确有要打散数据的需求,应该怎么处理呢? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink Application Native k8s使用oss作为backend日志偶尔报错
2021-03-04 02:33:25,292 DEBUG org.apache.flink.runtime.rpc.akka.SupervisorActor [] - Starting FencedAkkaRpcActor with name jobmanager_2. 2021/3/4 上午10:33:25 2021-03-04 02:33:25,304 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_2 . 2021/3/4 上午10:33:25 2021-03-04 02:33:25,310 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job TransactionAndAccount (). 2021/3/4 上午10:33:25 2021-03-04 02:33:25,323 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for TransactionAndAccount (). 2021/3/4 上午10:33:25 2021-03-04 02:33:25,380 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Running initialization on master for job TransactionAndAccount (). 2021/3/4 上午10:33:25 2021-03-04 02:33:25,380 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully ran initialization on master in 0 ms. 2021/3/4 上午10:33:25 2021-03-04 02:33:25,381 DEBUG org.apache.flink.runtime.jobmaster.JobMaster [] - Adding 2 vertices from job graph TransactionAndAccount (). 2021/3/4 上午10:33:25 2021-03-04 02:33:25,381 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Attaching 2 topologically sorted vertices to existing job graph with 0 vertices and 0 intermediate results. 2021/3/4 上午10:33:25 2021-03-04 02:33:25,389 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Connecting ExecutionJobVertex cbc357ccb763df2852fee8c4fc7d55f2 (Source: Custom Source -> format to json -> Filter -> process timestamp range -> Timestamps/Watermarks) to 0 predecessors. 2021/3/4 上午10:33:25 2021-03-04 02:33:25,389 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Connecting ExecutionJobVertex 337adade1e207453ed3502e01d75fd03 (Window(TumblingEventTimeWindows(8640), EventTimeTrigger, SumAggregator, PassThroughWindowFunction) -> Flat Map -> Sink: tidb) to 1 predecessors. 2021/3/4 上午10:33:25 2021-03-04 02:33:25,389 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Connecting input 0 of vertex 337adade1e207453ed3502e01d75fd03 (Window(TumblingEventTimeWindows(8640), EventTimeTrigger, SumAggregator, PassThroughWindowFunction) -> Flat Map -> Sink: tidb) to intermediate result referenced via predecessor cbc357ccb763df2852fee8c4fc7d55f2 (Source: Custom Source -> format to json -> Filter -> process timestamp range -> Timestamps/Watermarks). 2021/3/4 上午10:33:25 2021-03-04 02:33:25,395 INFO org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 pipelined regions in 2 ms 2021/3/4 上午10:33:25 2021-03-04 02:33:25,396 DEBUG org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully created execution graph from job graph TransactionAndAccount (). 2021/3/4 上午10:33:25 2021-03-04 02:33:25,406 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using job/cluster config to configure application-defined state backend: File State Backend (checkpoints: 'oss://xx/backend', savepoints: 'null', asynchronous: TRUE, fileStateThreshold: 20480) 2021/3/4 上午10:33:25 2021-03-04 02:33:25,406 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using application-defined state backend: File State Backend (checkpoints: 'oss://xx/backend', savepoints: 'null', asynchronous: TRUE, fileStateThreshold: 20480) 2021/3/4 上午10:33:25 2021-03-04 02:33:25,419 INFO org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss [] - [Server]Unable to execute HTTP request: Not Found 2021/3/4 上午10:33:25 [ErrorCode]: NoSuchKey 2021/3/4 上午10:33:25 [RequestId]: 604046F58B49C830320A1A53 2021/3/4 上午10:33:25 [HostId]: null 2021/3/4 上午10:33:25 2021-03-04 02:33:25,432 INFO org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss [] - [Server]Unable to execute HTTP request: Not Found 2021/3/4 上午10:33:25 [ErrorCode]: NoSuchKey 2021/3/4 上午10:33:25 [RequestId]: 604046F58B49C830322A1A53 2021/3/4 上午10:33:25 [HostId]: null 2021/3/4 上午10:33:25 2021-03-04 02:33:25,442 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Recovering checkpoints from KubernetesStateHandleStore{configMapName='demo--jobmanager-leader'}. 2021/3/4 上午10:33:25 2021-03-04 02:33:25,448 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Found 1 checkpoints in KubernetesStateHandleStore{configMapName='demo--jobmanager-leader'}. 2021/3/4 上午10:33:25 2021-03-04 02:33:25,449 INFO
flink sql中如何使用异步io关联维表?
flink sql中如何使用异步io关联维表?官网文档有介绍么?
Re:flink TableEnvironment.sqlUpdate不支持update 多表关联更新吗
SQL 也不能这样吧- - At 2021-03-03 16:43:49, "JackJia" wrote: >Hi 诸位同仁: >诸同仁好,flink TableEnvironment.sqlUpdate是不是不支持update 多表关联更新? > > >如下代码: >bbTableEnv.sqlUpdate("update order_tb a, min_max_usertime_tb b set a.mark=-2 " >+ >" where a.mac=b.mac and extract(epoch from a.usertime)/7200 = b.user_2hour " + >" and a.usertime > b.min_usertime and a.usertime < b.max_usertime"); >报错如下: >Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL >parse failed. Encountered "," at line 1, column 17. >Was expecting: >"SET" ... > >at >org.apache.flink.table.planner.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:96) >at >org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:127) >at >org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335) >at com.sir.idle.IdleAnalysis.runBlinkBatch(IdleAnalysis.java:101) >at com.sir.BatchMain.main(BatchMain.java:17) >Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "," at >line 1, column 17. >Was expecting: >"SET" ... > >at >org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:368) >at >org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:167) >at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:147) >at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:162) >at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:187) >at >org.apache.flink.table.planner.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:92) >... 4 more >Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "," at >line 1, column 17. >Was expecting: >"SET" ... > >at >org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:33107) >at >org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:32921) >at >org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlUpdate(FlinkSqlParserImpl.java:8227) >at >org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3646) >at >org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3669) >at >org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:214) >at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:160) >... 6 more
Re: flink-savepoint问题
是不是使用了随机key。 guaishushu1...@163.com 于2021年3月3日周三 下午6:53写道: > checkpoint 可以成功保存,但是savepoint出现错误: > java.lang.Exception: Could not materialize checkpoint 2404 for operator > KeyedProcess (21/48). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalArgumentException: Key group 0 is not in > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}. > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) > ... 3 more > Caused by: java.lang.IllegalArgumentException: Key group 0 is not in > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}. > at > org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142) > at > org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174) > at > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447) > ... 5 more > > > guaishushu1...@163.com >
flink-savepoint问题
checkpoint 可以成功保存,但是savepoint出现错误: java.lang.Exception: Could not materialize checkpoint 2404 for operator KeyedProcess (21/48). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=54, endKeyGroup=55}. at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) ... 3 more Caused by: java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=54, endKeyGroup=55}. at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142) at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447) ... 5 more guaishushu1...@163.com
flink TableEnvironment.sqlUpdate??????update ??????????????
Hi ?? ??flink TableEnvironment.sqlUpdateupdate ?? ?? bbTableEnv.sqlUpdate("update order_tb a, min_max_usertime_tb b set a.mark=-2 " + " where a.mac=b.mac and extract(epoch from a.usertime)/7200 = b.user_2hour " + " and a.usertime > b.min_usertime and a.usertime < b.max_usertime"); ?? Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "," at line 1, column 17. Was expecting: "SET" ... at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:96) at org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:127) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335) at com.sir.idle.IdleAnalysis.runBlinkBatch(IdleAnalysis.java:101) at com.sir.BatchMain.main(BatchMain.java:17) Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "," at line 1, column 17. Was expecting: "SET" ... at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:368) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:167) at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:147) at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:162) at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:187) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:92) ... 4 more Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "," at line 1, column 17. Was expecting: "SET" ... at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:33107) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:32921) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlUpdate(FlinkSqlParserImpl.java:8227) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3646) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3669) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:214) at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:160) ... 6 more
??????????Flink1.11??flink-runtime-web????
hi Michael, ?? -- -- ??: "user-zh"
flink TableEnvironment.sqlUpdate??????update ??????????????
Hi ?? ??flink TableEnvironment.sqlUpdateupdate ?? ?? bbTableEnv.sqlUpdate("update order_tb a, min_max_usertime_tb b set a.mark=-2 " + " where a.mac=b.mac and extract(epoch from a.usertime)/7200 = b.user_2hour " + " and a.usertime > b.min_usertime and a.usertime < b.max_usertime"); ?? Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "," at line 1, column 17. Was expecting: "SET" ... at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:96) at org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:127) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335) at com.sir.idle.IdleAnalysis.runBlinkBatch(IdleAnalysis.java:101) at com.sir.BatchMain.main(BatchMain.java:17) Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "," at line 1, column 17. Was expecting: "SET" ... at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:368) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:167) at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:147) at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:162) at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:187) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:92) ... 4 more Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "," at line 1, column 17. Was expecting: "SET" ... at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:33107) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:32921) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlUpdate(FlinkSqlParserImpl.java:8227) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3646) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3669) at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:214) at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:160) ... 6 more