Re: flink table over 窗口报错

2021-08-04 文章 yanyunpeng
好的 谢谢答疑  我这边尝试把时间字段设置成processTime的时候就好使了  抽空是看看源码分析下


在 2021年8月4日 21:01,Shengkai Fang 写道:


我发现换成流模式下,这个问题就解了。批的模式下,应该不需要定义watermark。这里看样子是有一个 bug 存在的,建议去社区建一个jira。 
yanyunpeng  于2021年8月4日周三 下午5:42写道: > Exception in 
thread "main" org.apache.flink.table.api.ValidationException: > Ordering must 
be defined on a time attribute. > at > 
org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:111)
 > at > 
org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:69)
 > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:249)
 > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:160)
 > at java.util.Optional.orElseGet(Optional.java:267) > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:160)
 > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
 > at > 
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
 > at > 
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
 > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143)
 > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
 > at > 
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
 > at > 
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
 > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88)
 > at > 
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269) > at 
> 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) 
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
> at > 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89)
 > at > 
org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235)
 > at java.util.function.Function.lambda$andThen$1(Function.java:88) > at > 
org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198)
 > at > 
org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194)
 > at > 
org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:183)
 > at > 
org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:956)
 > at com.rk.linkdata.flink.BatchFlinkTask.main(BatchFlinkTask.java:44) > > > > 
> Flink 版本1.13.0 > > > 在 2021年8月4日 17:37,Shengkai Fang 写道: > 
> > 能发一下具体的异常栈吗?是哪个版本? yanyunpeng  于2021年8月4日周三 > 
下午2:47写道: > Table table = tableEnv > .from("t_yyp_test") > > 
.window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > > 
.orderBy($("f_time")) > .preceding("unbounded_range") > > 
.following(CURRENT_RANGE) > .as("w")) > .select($("f_value"), > $("f_h"), > > 
$("f_l"), > $("f_j"), > $("f_value").avg().over($("w")), > > 
$("f_value").varPop().over($("w")), > > $("f_value").stddevPop().over($("w"))); 
> 也是一样的 > Exception in thread > "main" 
org.apache.flink.table.api.ValidationException: > Ordering must be > defined on 
a time attribute. > > 在 2021年8月4日 14:34,Caizhi Weng< > tsreape...@gmail.com> 
写道: > > > Hi! order by 的字段是 f_time_bak,但是 watermark > 的字段是 f_time,这两个不一致。 
yanyunpeng < > yanyunp...@rockontrol.com> > 于2021年8月4日周三 下午2:30写道: > 代码如下: > > 
EnvironmentSettings bbSettings = > > > 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > > 
> TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > > > 
tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " > > 
f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value > > 
DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, > > 
" + > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS > > 
f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = > > 

Re: flink table over 窗口报错

2021-08-04 文章 Shengkai Fang
我发现换成流模式下,这个问题就解了。批的模式下,应该不需要定义watermark。这里看样子是有一个 bug 存在的,建议去社区建一个jira。

yanyunpeng  于2021年8月4日周三 下午5:42写道:

> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Ordering must be defined on a time attribute.
> at
> org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:111)
> at
> org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:69)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:249)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:160)
> at java.util.Optional.orElseGet(Optional.java:267)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:160)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
> at
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
> at
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
> at
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
> at
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88)
> at
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89)
> at
> org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235)
> at java.util.function.Function.lambda$andThen$1(Function.java:88)
> at
> org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198)
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194)
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:183)
> at
> org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:956)
> at com.rk.linkdata.flink.BatchFlinkTask.main(BatchFlinkTask.java:44)
>
>
>
>
> Flink 版本1.13.0
>
>
> 在 2021年8月4日 17:37,Shengkai Fang 写道:
>
>
> 能发一下具体的异常栈吗?是哪个版本? yanyunpeng  于2021年8月4日周三
> 下午2:47写道: > Table table = tableEnv > .from("t_yyp_test") >
> .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) >
> .orderBy($("f_time")) > .preceding("unbounded_range") >
> .following(CURRENT_RANGE) > .as("w")) > .select($("f_value"), > $("f_h"), >
> $("f_l"), > $("f_j"), > $("f_value").avg().over($("w")), >
> $("f_value").varPop().over($("w")), >
> $("f_value").stddevPop().over($("w"))); > 也是一样的 > Exception in thread
> "main" org.apache.flink.table.api.ValidationException: > Ordering must be
> defined on a time attribute. > > 在 2021年8月4日 14:34,Caizhi Weng<
> tsreape...@gmail.com> 写道: > > > Hi! order by 的字段是 f_time_bak,但是 watermark
> 的字段是 f_time,这两个不一致。 yanyunpeng < > yanyunp...@rockontrol.com>
> 于2021年8月4日周三 下午2:30写道: > 代码如下: > > EnvironmentSettings bbSettings = > >
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> > > TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > >
> tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > "
> > f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value >
> DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n,
> > " + > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS
> > f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' =
> > 'jdbc:mysql://***',\n" + > " 'table-name' = '123',\n" + > " 'username' =
> > '123',\n" + > " 'password' = '123'\n" + > ")"); > >
> 

Re: flink table over 窗口报错

2021-08-04 文章 yanyunpeng
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Ordering must be defined on a time attribute.
at 
org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:111)
at 
org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:69)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:249)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:160)
at java.util.Optional.orElseGet(Optional.java:267)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:160)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
at 
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
at 
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
at 
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
at 
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88)
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89)
at 
org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at 
org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198)
at 
org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194)
at 
org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:183)
at 
org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:956)
at com.rk.linkdata.flink.BatchFlinkTask.main(BatchFlinkTask.java:44)




Flink 版本1.13.0


在 2021年8月4日 17:37,Shengkai Fang 写道:


能发一下具体的异常栈吗?是哪个版本? yanyunpeng  于2021年8月4日周三 
下午2:47写道: > Table table = tableEnv > .from("t_yyp_test") > 
.window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > .orderBy($("f_time")) 
> .preceding("unbounded_range") > .following(CURRENT_RANGE) > .as("w")) > 
.select($("f_value"), > $("f_h"), > $("f_l"), > $("f_j"), > 
$("f_value").avg().over($("w")), > $("f_value").varPop().over($("w")), > 
$("f_value").stddevPop().over($("w"))); > 也是一样的 > Exception in thread "main" 
org.apache.flink.table.api.ValidationException: > Ordering must be defined on a 
time attribute. > > 在 2021年8月4日 14:34,Caizhi Weng 写道: > > 
> Hi! order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。 yanyunpeng < > 
yanyunp...@rockontrol.com> 于2021年8月4日周三 下午2:30写道: > 代码如下: > > 
EnvironmentSettings bbSettings = > > 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > > 
TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > > 
tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " > 
f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value > 
DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, > " 
+ > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS > 
f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = > 
'jdbc:mysql://***',\n" + > " 'table-name' = '123',\n" + > " 'username' = > 
'123',\n" + > " 'password' = '123'\n" + > ")"); > > 
tableEnv.registerFunction("GaussianFunction", new GaussianFunction()); > > 
Table table = tableEnv > .from("t_yyp_test") > > 
.window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > > 
.orderBy($("f_time_bak")) > .preceding("unbounded_range") > > 
.following(CURRENT_RANGE) > .as("w")) > 

Re: flink table over 窗口报错

2021-08-04 文章 Shengkai Fang
能发一下具体的异常栈吗?是哪个版本?

yanyunpeng  于2021年8月4日周三 下午2:47写道:

> Table table = tableEnv
>  .from("t_yyp_test")
>  .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j"))
>  .orderBy($("f_time"))
>  .preceding("unbounded_range")
>  .following(CURRENT_RANGE)
>  .as("w"))
>  .select($("f_value"),
>  $("f_h"),
>  $("f_l"),
>  $("f_j"),
>  $("f_value").avg().over($("w")),
>  $("f_value").varPop().over($("w")),
>  $("f_value").stddevPop().over($("w")));
> 也是一样的
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Ordering must be defined on a time attribute.
>
> 在 2021年8月4日 14:34,Caizhi Weng 写道:
>
>
> Hi! order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。 yanyunpeng <
> yanyunp...@rockontrol.com> 于2021年8月4日周三 下午2:30写道: > 代码如下: >
> EnvironmentSettings bbSettings = >
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> > TableEnvironment tableEnv = TableEnvironment.create(bbSettings); >
> tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > "
> f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value
> DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n,
> " + > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS
> f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' =
> 'jdbc:mysql://***',\n" + > " 'table-name' = '123',\n" + > " 'username' =
> '123',\n" + > " 'password' = '123'\n" + > ")"); >
> tableEnv.registerFunction("GaussianFunction", new GaussianFunction()); >
> Table table = tableEnv > .from("t_yyp_test") >
> .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) >
> .orderBy($("f_time_bak")) > .preceding("unbounded_range") >
> .following(CURRENT_RANGE) > .as("w")) > .select($("f_h"), > $("f_l"), >
> $("f_j"), > $("f_value").avg().over($("w")), >
> $("f_value").varPop().over($("w")), >
> $("f_value").stddevPop().over($("w"))); > > > 已经定义了eventTime
> 使用eventTIme或者别的时间字段排序都报错 > > > Exception in thread "main"
> org.apache.flink.table.api.ValidationException: > Ordering must be defined
> on a time attribute. > > > 请问这是什么原因


Re: flink table over 窗口报错

2021-08-04 文章 yanyunpeng
Table table = tableEnv
 .from("t_yyp_test")
 .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j"))
 .orderBy($("f_time"))
 .preceding("unbounded_range")
 .following(CURRENT_RANGE)
 .as("w"))
 .select($("f_value"),
 $("f_h"),
 $("f_l"),
 $("f_j"),
 $("f_value").avg().over($("w")),
 $("f_value").varPop().over($("w")),
 $("f_value").stddevPop().over($("w")));
也是一样的
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Ordering must be defined on a time attribute.

在 2021年8月4日 14:34,Caizhi Weng 写道:


Hi! order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。 yanyunpeng 
 于2021年8月4日周三 下午2:30写道: > 代码如下: > 
EnvironmentSettings bbSettings = > 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > 
TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > 
tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " f_h 
STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value DOUBLE,\n" + 
> " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, " + > " PRIMARY 
KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS f_time \n" + > ") 
WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = 'jdbc:mysql://***',\n" + 
> " 'table-name' = '123',\n" + > " 'username' = '123',\n" + > " 'password' = 
'123'\n" + > ")"); > tableEnv.registerFunction("GaussianFunction", new 
GaussianFunction()); > Table table = tableEnv > .from("t_yyp_test") > 
.window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > 
.orderBy($("f_time_bak")) > .preceding("unbounded_range") > 
.following(CURRENT_RANGE) > .as("w")) > .select($("f_h"), > $("f_l"), > 
$("f_j"), > $("f_value").avg().over($("w")), > 
$("f_value").varPop().over($("w")), > $("f_value").stddevPop().over($("w"))); > 
> > 已经定义了eventTime 使用eventTIme或者别的时间字段排序都报错 > > > Exception in thread "main" 
org.apache.flink.table.api.ValidationException: > Ordering must be defined on a 
time attribute. > > > 请问这是什么原因

Re: flink table over 窗口报错

2021-08-04 文章 Caizhi Weng
Hi!

order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。

yanyunpeng  于2021年8月4日周三 下午2:30写道:

> 代码如下:
> EnvironmentSettings bbSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment tableEnv = TableEnvironment.create(bbSettings);
> tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" +
> "  f_id INT,\n" +
> "  f_h STRING,\n" +
> "  f_l STRING,\n" +
> "  f_j STRING,\n" +
> "  f_value DOUBLE,\n" +
> "  f_time TIMESTAMP(3)\n, " +
> "  f_time_bak TIMESTAMP(3)\n, " +
> "  PRIMARY KEY (f_id) NOT ENFORCED,\n" +
> "  WATERMARK FOR f_time AS f_time \n" +
> ") WITH (\n" +
> "   'connector' = 'jdbc',\n" +
> "   'url' = 'jdbc:mysql://***',\n" +
> "   'table-name' = '123',\n" +
> "   'username' = '123',\n" +
> "   'password' = '123'\n" +
> ")");
> tableEnv.registerFunction("GaussianFunction", new GaussianFunction());
> Table table = tableEnv
> .from("t_yyp_test")
> .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j"))
> .orderBy($("f_time_bak"))
> .preceding("unbounded_range")
> .following(CURRENT_RANGE)
> .as("w"))
> .select($("f_h"),
> $("f_l"),
> $("f_j"),
> $("f_value").avg().over($("w")),
> $("f_value").varPop().over($("w")),
> $("f_value").stddevPop().over($("w")));
>
>
> 已经定义了eventTime  使用eventTIme或者别的时间字段排序都报错
>
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Ordering must be defined on a time attribute.
>
>
> 请问这是什么原因


flink table over 窗口报错

2021-08-04 文章 yanyunpeng
代码如下:
EnvironmentSettings bbSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(bbSettings);
tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" +
"  f_id INT,\n" +
"  f_h STRING,\n" +
"  f_l STRING,\n" +
"  f_j STRING,\n" +
"  f_value DOUBLE,\n" +
"  f_time TIMESTAMP(3)\n, " +
"  f_time_bak TIMESTAMP(3)\n, " +
"  PRIMARY KEY (f_id) NOT ENFORCED,\n" +
"  WATERMARK FOR f_time AS f_time \n" +
") WITH (\n" +
"   'connector' = 'jdbc',\n" +
"   'url' = 'jdbc:mysql://***',\n" +
"   'table-name' = '123',\n" +
"   'username' = '123',\n" +
"   'password' = '123'\n" +
")");
tableEnv.registerFunction("GaussianFunction", new GaussianFunction());
Table table = tableEnv
.from("t_yyp_test")
.window(Over.partitionBy($("f_h"), $("f_l"), $("f_j"))
.orderBy($("f_time_bak"))
.preceding("unbounded_range")
.following(CURRENT_RANGE)
.as("w"))
.select($("f_h"),
$("f_l"),
$("f_j"),
$("f_value").avg().over($("w")),
$("f_value").varPop().over($("w")),
$("f_value").stddevPop().over($("w")));


已经定义了eventTime  使用eventTIme或者别的时间字段排序都报错


Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Ordering must be defined on a time attribute.  


请问这是什么原因