????: ?????? flink sql????????????????sink table?

2021-09-25 文章 wukon...@foxmail.com
hi : 
UDF ??SQL ?? 
?? topic ??
?? https://mp.weixin.qq.com/s/IKzCRTh8eamk6TX7_RHYdQ



wukon...@foxmail.com
 
 JasonLee
?? 2021-09-23 21:56
 user-zh@flink.apache.org
?? ?? flink sqlsink table?
Hi
 
 
,  SQL  SQL 
??,??
 
 
Best
JasonLee
 
 
??2021??09??23?? 09:28 ??
sql??sql??
 
iPhone
 
 
--  --
??: 2572805166 <2572805...@qq.com.INVALID
: 2021??9??23?? 09:23
??: user-zh https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/create/#like
 casel.chen ??2021??9??18?? 8:27?? kafka 
topic??topic)
 ??flink sqlsink table


??????flink-1.12.0(1.13.2????????????) select datas[1].filed_1,datas[1].filed_2????????????????????

2021-09-25 文章 kcz
soryy ??




----
??: 
   "kcz"
<573693...@qq.com;
:2021??9??26??(??) 10:53
??:"user-zh"

flink-1.12.0(1.13.2????????????) select datas[1].filed_1,datas[1].filed_2????????????????????

2021-09-25 文章 kcz
??INDEX??INDEX++ 
??valuearray
CREATE TABLE KafkaTable (
 datas array

Re: flink-1.12.0 ddl设置watermark error,但是1.13.2没有报错

2021-09-25 文章 Leonard Xu
这是个已知bug[1], 在1.13.0 和 1.12.3上都修复了, 你可以用下flink 1.12.5 或 1.13.2的小版本

[1]https://issues.apache.org/jira/browse/FLINK-22082

祝好

> 在 2021年9月25日,21:29,kcz <573693...@qq.com.INVALID> 写道:
> 
> SQL定义如下,当1.12.0将watermark语句移除之后,就不报错了。
> CREATE TABLE KafkaTable (
>  test array  gatherTime STRING,
>  log_ts as TO_TIMESTAMP(FROM_UNIXTIME(CAST(gatherTime AS 
> bigint)/1000,'-MM-dd HH:mm:ss')),
>  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'scan.startup.mode' = 'earliest-offset',
>  'format' = 'json'
> );
> 
> SELECT test[1].signalValue from KafkaTable;
> 
> 
> 
> 
> Exception in thread "main" scala.MatchError: ITEM($0, 1) (of class 
> org.apache.calcite.rex.RexCall)
>   at 
> org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273)
>   at 
> org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283)
>   at 
> org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269)
>   at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92)
>   at 
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112)
>   at 
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111)
>   at 
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:127)
>   at 
> org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:62)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:284)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:268)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:985)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1245)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
>   at 
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:486)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:309)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> 

flink-1.12.0 ddl????watermark error??????1.13.2????????

2021-09-25 文章 kcz
SQL1.12.0??watermark??
CREATE TABLE KafkaTable (
 test array