Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-03 文章 Qishang
Hi 社区。
Flink 1.12.1

现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有
forword 的ETL没有作用。

insert into table_a select id,udf(a),b,c from table_b;

发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区
1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗?

```
== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: TableSourceScan(table=[[default_catalog,
default_database, temp_table]], fields=[id...])

Stage 3 : Operator
content : ChangelogNormalize(key=[id])
ship_strategy : HASH

Stage 4 : Operator
content : Calc(select=[...])
ship_strategy : FORWARD

Stage 5 : Data Sink
content : Sink: Sink(table=[default_catalog.default_database.table_a],
fields=[id...])
ship_strategy : FORWARD
```


Re: flink sql中如何使用异步io关联维表?

2021-03-03 文章 HunterXHunter
定义一个 sourcetable



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-savepoint问题

2021-03-03 文章 Congxian Qiu
对于 keyed state,需要保证同一个 key 在 同一个 keygroup 中,如果是某个 key 有热点,可以在 keyby 之前进行一次
map(在 key 后面拼接一些 后缀),然后 keyby,最后处理完成之后,将这些进行聚合
Best,
Congxian


guomuhua <663021...@qq.com> 于2021年3月4日周四 下午12:49写道:

> 我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢?
> nobleyd wrote
> > 是不是使用了随机key。
>
> > guaishushu1103@
>
> >  <
>
> > guaishushu1103@
>
> > > 于2021年3月3日周三 下午6:53写道:> checkpoint 可以成功保存,但是savepoint出现错误:>
> > java.lang.Exception: Could not materialize checkpoint 2404 for operator>
> > KeyedProcess (21/48).> at>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)>
>
> > at>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)>
>
> > at>
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)>
>
> > at>
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)>
>
> > at java.lang.Thread.run(Thread.java:745)> Caused by:
> > java.util.concurrent.ExecutionException:>
> > java.lang.IllegalArgumentException: Key group 0 is not in>
> > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at
> > java.util.concurrent.FutureTask.report(FutureTask.java:122)> at
> > java.util.concurrent.FutureTask.get(FutureTask.java:192)> at>
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)>
>
> > at> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.
>
> > (OperatorSnapshotFinalizer.java:47)> at>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)>
>
> > ... 3 more> Caused by: java.lang.IllegalArgumentException: Key group 0 is
> > not in> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at>
> >
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)>
>
> > at>
> >
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)>
>
> > at>
> >
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)>
>
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)> at>
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)>
>
> > ... 5 more>>>
>
> > guaishushu1103@
>
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-savepoint问题

2021-03-03 文章 guomuhua
我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢?
nobleyd wrote
> 是不是使用了随机key。

> guaishushu1103@

>  <

> guaishushu1103@

> > 于2021年3月3日周三 下午6:53写道:> checkpoint 可以成功保存,但是savepoint出现错误:>
> java.lang.Exception: Could not materialize checkpoint 2404 for operator>
> KeyedProcess (21/48).> at>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)>
> 
> at>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)>
> 
> at>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)>
> 
> at>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)>
> 
> at java.lang.Thread.run(Thread.java:745)> Caused by:
> java.util.concurrent.ExecutionException:>
> java.lang.IllegalArgumentException: Key group 0 is not in>
> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at
> java.util.concurrent.FutureTask.report(FutureTask.java:122)> at
> java.util.concurrent.FutureTask.get(FutureTask.java:192)> at>
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)>
> 
> at> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.

> (OperatorSnapshotFinalizer.java:47)> at>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)>
> 
> ... 3 more> Caused by: java.lang.IllegalArgumentException: Key group 0 is
> not in> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at>
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)>
> 
> at>
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)>
> 
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)>
> 
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)>
> 
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)>
> 
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)>
> 
> at>
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)>
> 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)> at>
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)>
> 
> ... 5 more>>> 

> guaishushu1103@

>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-savepoint问题

2021-03-03 文章 guomuhua
我也遇到同样问题,为了打散数据,在keyby时加了随机数作为后缀,去掉随机数,可以正常savepoint,加上随机数就savepoint失败。所以如果确有要打散数据的需求,应该怎么处理呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink Application Native k8s使用oss作为backend日志偶尔报错

2021-03-03 文章 王 羽凡
2021-03-04 02:33:25,292 DEBUG org.apache.flink.runtime.rpc.akka.SupervisorActor 
   [] - Starting FencedAkkaRpcActor with name jobmanager_2.
2021/3/4 上午10:33:25 2021-03-04 02:33:25,304 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC 
endpoint for org.apache.flink.runtime.jobmaster.JobMaster at 
akka://flink/user/rpc/jobmanager_2 .
2021/3/4 上午10:33:25 2021-03-04 02:33:25,310 INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing 
job TransactionAndAccount ().
2021/3/4 上午10:33:25 2021-03-04 02:33:25,323 INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart 
back off time strategy 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, 
backoffTimeMS=1000) for TransactionAndAccount 
().
2021/3/4 上午10:33:25 2021-03-04 02:33:25,380 INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - Running 
initialization on master for job TransactionAndAccount 
().
2021/3/4 上午10:33:25 2021-03-04 02:33:25,380 INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully 
ran initialization on master in 0 ms.
2021/3/4 上午10:33:25 2021-03-04 02:33:25,381 DEBUG 
org.apache.flink.runtime.jobmaster.JobMaster [] - Adding 2 
vertices from job graph TransactionAndAccount 
().
2021/3/4 上午10:33:25 2021-03-04 02:33:25,381 DEBUG 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Attaching 2 
topologically sorted vertices to existing job graph with 0 vertices and 0 
intermediate results.
2021/3/4 上午10:33:25 2021-03-04 02:33:25,389 DEBUG 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Connecting 
ExecutionJobVertex cbc357ccb763df2852fee8c4fc7d55f2 (Source: Custom Source -> 
format to json -> Filter -> process timestamp range -> Timestamps/Watermarks) 
to 0 predecessors.
2021/3/4 上午10:33:25 2021-03-04 02:33:25,389 DEBUG 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Connecting 
ExecutionJobVertex 337adade1e207453ed3502e01d75fd03 
(Window(TumblingEventTimeWindows(8640), EventTimeTrigger, SumAggregator, 
PassThroughWindowFunction) -> Flat Map -> Sink: tidb) to 1 predecessors.
2021/3/4 上午10:33:25 2021-03-04 02:33:25,389 DEBUG 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Connecting 
input 0 of vertex 337adade1e207453ed3502e01d75fd03 
(Window(TumblingEventTimeWindows(8640), EventTimeTrigger, SumAggregator, 
PassThroughWindowFunction) -> Flat Map -> Sink: tidb) to intermediate result 
referenced via predecessor cbc357ccb763df2852fee8c4fc7d55f2 (Source: Custom 
Source -> format to json -> Filter -> process timestamp range -> 
Timestamps/Watermarks).
2021/3/4 上午10:33:25 2021-03-04 02:33:25,395 INFO  
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 
1 pipelined regions in 2 ms
2021/3/4 上午10:33:25 2021-03-04 02:33:25,396 DEBUG 
org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully 
created execution graph from job graph TransactionAndAccount 
().
2021/3/4 上午10:33:25 2021-03-04 02:33:25,406 INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - Using 
job/cluster config to configure application-defined state backend: File State 
Backend (checkpoints: 'oss://xx/backend', savepoints: 'null', asynchronous: 
TRUE, fileStateThreshold: 20480)
2021/3/4 上午10:33:25 2021-03-04 02:33:25,406 INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - Using 
application-defined state backend: File State Backend (checkpoints: 
'oss://xx/backend', savepoints: 'null', asynchronous: TRUE, fileStateThreshold: 
20480)
2021/3/4 上午10:33:25 2021-03-04 02:33:25,419 INFO  
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss  [] - 
[Server]Unable to execute HTTP request: Not Found
2021/3/4 上午10:33:25 [ErrorCode]: NoSuchKey
2021/3/4 上午10:33:25 [RequestId]: 604046F58B49C830320A1A53
2021/3/4 上午10:33:25 [HostId]: null
2021/3/4 上午10:33:25 2021-03-04 02:33:25,432 INFO  
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss  [] - 
[Server]Unable to execute HTTP request: Not Found
2021/3/4 上午10:33:25 [ErrorCode]: NoSuchKey
2021/3/4 上午10:33:25 [RequestId]: 604046F58B49C830322A1A53
2021/3/4 上午10:33:25 [HostId]: null
2021/3/4 上午10:33:25 2021-03-04 02:33:25,442 INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
Recovering checkpoints from 
KubernetesStateHandleStore{configMapName='demo--jobmanager-leader'}.
2021/3/4 上午10:33:25 2021-03-04 02:33:25,448 INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Found 
1 checkpoints in 
KubernetesStateHandleStore{configMapName='demo--jobmanager-leader'}.
2021/3/4 上午10:33:25 2021-03-04 02:33:25,449 INFO  

flink sql中如何使用异步io关联维表?

2021-03-03 文章 casel.chen
flink sql中如何使用异步io关联维表?官网文档有介绍么?

Re:flink TableEnvironment.sqlUpdate不支持update 多表关联更新吗

2021-03-03 文章 Michael Ran






SQL 也不能这样吧- -











At 2021-03-03 16:43:49, "JackJia"  wrote:
>Hi 诸位同仁:
>诸同仁好,flink TableEnvironment.sqlUpdate是不是不支持update 多表关联更新?
>
>
>如下代码:
>bbTableEnv.sqlUpdate("update order_tb a, min_max_usertime_tb b set a.mark=-2 " 
>+
>" where a.mac=b.mac and extract(epoch from a.usertime)/7200 = b.user_2hour " +
>" and a.usertime > b.min_usertime and a.usertime < b.max_usertime");
>报错如下:
>Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
>parse failed. Encountered "," at line 1, column 17.
>Was expecting:
>"SET" ...
>
>at 
>org.apache.flink.table.planner.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:96)
>at 
>org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:127)
>at 
>org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>at com.sir.idle.IdleAnalysis.runBlinkBatch(IdleAnalysis.java:101)
>at com.sir.BatchMain.main(BatchMain.java:17)
>Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "," at 
>line 1, column 17.
>Was expecting:
>"SET" ...
>
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:368)
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:167)
>at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:147)
>at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:162)
>at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:187)
>at 
>org.apache.flink.table.planner.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:92)
>... 4 more
>Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "," at 
>line 1, column 17.
>Was expecting:
>"SET" ...
>
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:33107)
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:32921)
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlUpdate(FlinkSqlParserImpl.java:8227)
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3646)
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3669)
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:214)
>at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:160)
>... 6 more


Re: flink-savepoint问题

2021-03-03 文章 yidan zhao
是不是使用了随机key。

guaishushu1...@163.com  于2021年3月3日周三 下午6:53写道:

> checkpoint 可以成功保存,但是savepoint出现错误:
> java.lang.Exception: Could not materialize checkpoint 2404 for operator
> KeyedProcess (21/48).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.IllegalArgumentException: Key group 0 is not in
> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
> ... 3 more
> Caused by: java.lang.IllegalArgumentException: Key group 0 is not in
> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
> at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
> at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
> ... 5 more
>
>
> guaishushu1...@163.com
>


flink-savepoint问题

2021-03-03 文章 guaishushu1...@163.com
checkpoint 可以成功保存,但是savepoint出现错误:
java.lang.Exception: Could not materialize checkpoint 2404 for operator 
KeyedProcess (21/48).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: 
java.lang.IllegalArgumentException: Key group 0 is not in 
KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
... 3 more
Caused by: java.lang.IllegalArgumentException: Key group 0 is not in 
KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
at 
org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
at 
org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
... 5 more


guaishushu1...@163.com


flink TableEnvironment.sqlUpdate??????update ??????????????

2021-03-03 文章 JackJia
Hi ??
??flink TableEnvironment.sqlUpdateupdate ??


??
bbTableEnv.sqlUpdate("update order_tb a, min_max_usertime_tb b set a.mark=-2 " +
" where a.mac=b.mac and extract(epoch from a.usertime)/7200 = b.user_2hour " +
" and a.usertime > b.min_usertime and a.usertime < b.max_usertime");
??
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
parse failed. Encountered "," at line 1, column 17.
Was expecting:
"SET" ...

at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:96)
at 
org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:127)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
at com.sir.idle.IdleAnalysis.runBlinkBatch(IdleAnalysis.java:101)
at com.sir.BatchMain.main(BatchMain.java:17)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "," at 
line 1, column 17.
Was expecting:
"SET" ...

at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:368)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:167)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:147)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:162)
at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:187)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:92)
... 4 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "," at 
line 1, column 17.
Was expecting:
"SET" ...

at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:33107)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:32921)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlUpdate(FlinkSqlParserImpl.java:8227)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3646)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3669)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:214)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:160)
... 6 more

??????????Flink1.11??flink-runtime-web????

2021-03-03 文章 Natasha
hi Michael,
    ??




--  --
??: 
   "user-zh"



flink TableEnvironment.sqlUpdate??????update ??????????????

2021-03-03 文章 JackJia
Hi ??
??flink TableEnvironment.sqlUpdateupdate ??


??
bbTableEnv.sqlUpdate("update order_tb a, min_max_usertime_tb b set a.mark=-2 " +
" where a.mac=b.mac and extract(epoch from a.usertime)/7200 = b.user_2hour " +
" and a.usertime > b.min_usertime and a.usertime < b.max_usertime");
??
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
parse failed. Encountered "," at line 1, column 17.
Was expecting:
"SET" ...

at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:96)
at 
org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:127)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
at com.sir.idle.IdleAnalysis.runBlinkBatch(IdleAnalysis.java:101)
at com.sir.BatchMain.main(BatchMain.java:17)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "," at 
line 1, column 17.
Was expecting:
"SET" ...

at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:368)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:167)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:147)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:162)
at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:187)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:92)
... 4 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "," at 
line 1, column 17.
Was expecting:
"SET" ...

at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:33107)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:32921)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlUpdate(FlinkSqlParserImpl.java:8227)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3646)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3669)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:214)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:160)
... 6 more