Re: flink table over 窗口报错
好的 谢谢答疑 我这边尝试把时间字段设置成processTime的时候就好使了 抽空是看看源码分析下 在 2021年8月4日 21:01,Shengkai Fang 写道: 我发现换成流模式下,这个问题就解了。批的模式下,应该不需要定义watermark。这里看样子是有一个 bug 存在的,建议去社区建一个jira。 yanyunpeng 于2021年8月4日周三 下午5:42写道: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Ordering must be defined on a time attribute. > at > org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:111) > at > org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:69) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:249) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:160) > at java.util.Optional.orElseGet(Optional.java:267) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:160) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94) > at > org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37) > at > org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94) > at > org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37) > at > org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88) > at > java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89) > at > org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235) > at java.util.function.Function.lambda$andThen$1(Function.java:88) > at > org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198) > at > org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194) > at > org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:183) > at > org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:956) > at com.rk.linkdata.flink.BatchFlinkTask.main(BatchFlinkTask.java:44) > > > > > Flink 版本1.13.0 > > > 在 2021年8月4日 17:37,Shengkai Fang 写道: > > > 能发一下具体的异常栈吗?是哪个版本? yanyunpeng 于2021年8月4日周三 > 下午2:47写道: > Table table = tableEnv > .from("t_yyp_test") > > .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > > .orderBy($("f_time")) > .preceding("unbounded_range") > > .following(CURRENT_RANGE) > .as("w")) > .select($("f_value"), > $("f_h"), > > $("f_l"), > $("f_j"), > $("f_value").avg().over($("w")), > > $("f_value").varPop().over($("w")), > > $("f_value").stddevPop().over($("w"))); > 也是一样的 > Exception in thread > "main" org.apache.flink.table.api.ValidationException: > Ordering must be > defined on a time attribute. > > 在 2021年8月4日 14:34,Caizhi Weng< > tsreape...@gmail.com> 写道: > > > Hi! order by 的字段是 f_time_bak,但是 watermark > 的字段是 f_time,这两个不一致。 yanyunpeng < > yanyunp...@rockontrol.com> > 于2021年8月4日周三 下午2:30写道: > 代码如下: > > EnvironmentSettings bbSettings = > > > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > > > TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > > > tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " > > f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value > > DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, > > " + > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS > > f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = > >
Re: flink table over 窗口报错
我发现换成流模式下,这个问题就解了。批的模式下,应该不需要定义watermark。这里看样子是有一个 bug 存在的,建议去社区建一个jira。 yanyunpeng 于2021年8月4日周三 下午5:42写道: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Ordering must be defined on a time attribute. > at > org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:111) > at > org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:69) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:249) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:160) > at java.util.Optional.orElseGet(Optional.java:267) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:160) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94) > at > org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37) > at > org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94) > at > org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37) > at > org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88) > at > java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89) > at > org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235) > at java.util.function.Function.lambda$andThen$1(Function.java:88) > at > org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198) > at > org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194) > at > org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:183) > at > org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:956) > at com.rk.linkdata.flink.BatchFlinkTask.main(BatchFlinkTask.java:44) > > > > > Flink 版本1.13.0 > > > 在 2021年8月4日 17:37,Shengkai Fang 写道: > > > 能发一下具体的异常栈吗?是哪个版本? yanyunpeng 于2021年8月4日周三 > 下午2:47写道: > Table table = tableEnv > .from("t_yyp_test") > > .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > > .orderBy($("f_time")) > .preceding("unbounded_range") > > .following(CURRENT_RANGE) > .as("w")) > .select($("f_value"), > $("f_h"), > > $("f_l"), > $("f_j"), > $("f_value").avg().over($("w")), > > $("f_value").varPop().over($("w")), > > $("f_value").stddevPop().over($("w"))); > 也是一样的 > Exception in thread > "main" org.apache.flink.table.api.ValidationException: > Ordering must be > defined on a time attribute. > > 在 2021年8月4日 14:34,Caizhi Weng< > tsreape...@gmail.com> 写道: > > > Hi! order by 的字段是 f_time_bak,但是 watermark > 的字段是 f_time,这两个不一致。 yanyunpeng < > yanyunp...@rockontrol.com> > 于2021年8月4日周三 下午2:30写道: > 代码如下: > > EnvironmentSettings bbSettings = > > > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > > > TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > > > tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " > > f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value > > DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, > > " + > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS > > f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = > > 'jdbc:mysql://***',\n" + > " 'table-name' = '123',\n" + > " 'username' = > > '123',\n" + > " 'password' = '123'\n" + > ")"); > > >
Re: flink table over 窗口报错
Exception in thread "main" org.apache.flink.table.api.ValidationException: Ordering must be defined on a time attribute. at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:111) at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:69) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:249) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:160) at java.util.Optional.orElseGet(Optional.java:267) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:160) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37) at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37) at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89) at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235) at java.util.function.Function.lambda$andThen$1(Function.java:88) at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198) at org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194) at org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:183) at org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:956) at com.rk.linkdata.flink.BatchFlinkTask.main(BatchFlinkTask.java:44) Flink 版本1.13.0 在 2021年8月4日 17:37,Shengkai Fang 写道: 能发一下具体的异常栈吗?是哪个版本? yanyunpeng 于2021年8月4日周三 下午2:47写道: > Table table = tableEnv > .from("t_yyp_test") > .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > .orderBy($("f_time")) > .preceding("unbounded_range") > .following(CURRENT_RANGE) > .as("w")) > .select($("f_value"), > $("f_h"), > $("f_l"), > $("f_j"), > $("f_value").avg().over($("w")), > $("f_value").varPop().over($("w")), > $("f_value").stddevPop().over($("w"))); > 也是一样的 > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Ordering must be defined on a time attribute. > > 在 2021年8月4日 14:34,Caizhi Weng 写道: > > > Hi! order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。 yanyunpeng < > yanyunp...@rockontrol.com> 于2021年8月4日周三 下午2:30写道: > 代码如下: > > EnvironmentSettings bbSettings = > > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > > TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > > tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " > f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value > DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, > " + > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS > f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = > 'jdbc:mysql://***',\n" + > " 'table-name' = '123',\n" + > " 'username' = > '123',\n" + > " 'password' = '123'\n" + > ")"); > > tableEnv.registerFunction("GaussianFunction", new GaussianFunction()); > > Table table = tableEnv > .from("t_yyp_test") > > .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > > .orderBy($("f_time_bak")) > .preceding("unbounded_range") > > .following(CURRENT_RANGE) > .as("w")) >
Re: flink table over 窗口报错
能发一下具体的异常栈吗?是哪个版本? yanyunpeng 于2021年8月4日周三 下午2:47写道: > Table table = tableEnv > .from("t_yyp_test") > .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > .orderBy($("f_time")) > .preceding("unbounded_range") > .following(CURRENT_RANGE) > .as("w")) > .select($("f_value"), > $("f_h"), > $("f_l"), > $("f_j"), > $("f_value").avg().over($("w")), > $("f_value").varPop().over($("w")), > $("f_value").stddevPop().over($("w"))); > 也是一样的 > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Ordering must be defined on a time attribute. > > 在 2021年8月4日 14:34,Caizhi Weng 写道: > > > Hi! order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。 yanyunpeng < > yanyunp...@rockontrol.com> 于2021年8月4日周三 下午2:30写道: > 代码如下: > > EnvironmentSettings bbSettings = > > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > > TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > > tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " > f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value > DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, > " + > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS > f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = > 'jdbc:mysql://***',\n" + > " 'table-name' = '123',\n" + > " 'username' = > '123',\n" + > " 'password' = '123'\n" + > ")"); > > tableEnv.registerFunction("GaussianFunction", new GaussianFunction()); > > Table table = tableEnv > .from("t_yyp_test") > > .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > > .orderBy($("f_time_bak")) > .preceding("unbounded_range") > > .following(CURRENT_RANGE) > .as("w")) > .select($("f_h"), > $("f_l"), > > $("f_j"), > $("f_value").avg().over($("w")), > > $("f_value").varPop().over($("w")), > > $("f_value").stddevPop().over($("w"))); > > > 已经定义了eventTime > 使用eventTIme或者别的时间字段排序都报错 > > > Exception in thread "main" > org.apache.flink.table.api.ValidationException: > Ordering must be defined > on a time attribute. > > > 请问这是什么原因
Re: flink table over 窗口报错
Table table = tableEnv .from("t_yyp_test") .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) .orderBy($("f_time")) .preceding("unbounded_range") .following(CURRENT_RANGE) .as("w")) .select($("f_value"), $("f_h"), $("f_l"), $("f_j"), $("f_value").avg().over($("w")), $("f_value").varPop().over($("w")), $("f_value").stddevPop().over($("w"))); 也是一样的 Exception in thread "main" org.apache.flink.table.api.ValidationException: Ordering must be defined on a time attribute. 在 2021年8月4日 14:34,Caizhi Weng 写道: Hi! order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。 yanyunpeng 于2021年8月4日周三 下午2:30写道: > 代码如下: > EnvironmentSettings bbSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, " + > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = 'jdbc:mysql://***',\n" + > " 'table-name' = '123',\n" + > " 'username' = '123',\n" + > " 'password' = '123'\n" + > ")"); > tableEnv.registerFunction("GaussianFunction", new GaussianFunction()); > Table table = tableEnv > .from("t_yyp_test") > .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > .orderBy($("f_time_bak")) > .preceding("unbounded_range") > .following(CURRENT_RANGE) > .as("w")) > .select($("f_h"), > $("f_l"), > $("f_j"), > $("f_value").avg().over($("w")), > $("f_value").varPop().over($("w")), > $("f_value").stddevPop().over($("w"))); > > > 已经定义了eventTime 使用eventTIme或者别的时间字段排序都报错 > > > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Ordering must be defined on a time attribute. > > > 请问这是什么原因
Re: flink table over 窗口报错
Hi! order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。 yanyunpeng 于2021年8月4日周三 下午2:30写道: > 代码如下: > EnvironmentSettings bbSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, " + > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = 'jdbc:mysql://***',\n" + > " 'table-name' = '123',\n" + > " 'username' = '123',\n" + > " 'password' = '123'\n" + > ")"); > tableEnv.registerFunction("GaussianFunction", new GaussianFunction()); > Table table = tableEnv > .from("t_yyp_test") > .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > .orderBy($("f_time_bak")) > .preceding("unbounded_range") > .following(CURRENT_RANGE) > .as("w")) > .select($("f_h"), > $("f_l"), > $("f_j"), > $("f_value").avg().over($("w")), > $("f_value").varPop().over($("w")), > $("f_value").stddevPop().over($("w"))); > > > 已经定义了eventTime 使用eventTIme或者别的时间字段排序都报错 > > > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Ordering must be defined on a time attribute. > > > 请问这是什么原因
flink table over 窗口报错
代码如下: EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(bbSettings); tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + " f_id INT,\n" + " f_h STRING,\n" + " f_l STRING,\n" + " f_j STRING,\n" + " f_value DOUBLE,\n" + " f_time TIMESTAMP(3)\n, " + " f_time_bak TIMESTAMP(3)\n, " + " PRIMARY KEY (f_id) NOT ENFORCED,\n" + " WATERMARK FOR f_time AS f_time \n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://***',\n" + " 'table-name' = '123',\n" + " 'username' = '123',\n" + " 'password' = '123'\n" + ")"); tableEnv.registerFunction("GaussianFunction", new GaussianFunction()); Table table = tableEnv .from("t_yyp_test") .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) .orderBy($("f_time_bak")) .preceding("unbounded_range") .following(CURRENT_RANGE) .as("w")) .select($("f_h"), $("f_l"), $("f_j"), $("f_value").avg().over($("w")), $("f_value").varPop().over($("w")), $("f_value").stddevPop().over($("w"))); 已经定义了eventTime 使用eventTIme或者别的时间字段排序都报错 Exception in thread "main" org.apache.flink.table.api.ValidationException: Ordering must be defined on a time attribute. 请问这是什么原因