测试出来了, rowtime参数需要是最后一个参数, $(timeField).rowtime()

但是这个报错也太隐晦了吧 .





在 2020-08-30 14:54:15,"RS" <tinyshr...@163.com> 写道:
>Hi, 请教下
>
>
>启动任务的时候抛异常了, 但是没看懂报错原因, 麻烦各位大佬帮看下
>这里我是先创建了一个DataStreamSource, 然后配置转为view, 配置EventTime, 后面再用SQL DDL进行数据处理
>        DataStreamSource<PerfEvent> source = env.addSource(consumer);
>        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>        tableEnv.createTemporaryView(table_name, source, 
> $(timeField).rowtime(), $("cpu"));
>        tableEnv.from(table_name).window(
>                Tumble.over(lit(1).minutes())
>                        .on($(timeField))
>                        .as(table_name + "Window")
>        );
>        tableEnv.executeSql(sql1);  // CREATE TABLE t_out (`ts` TIMESTAMP(3), 
> `count` BIGINT) WITH ('connector' = 'print')   没有报错
>        tableEnv.executeSql(sql2);  //  INSERT INTO t_out SELECT 
> TUMBLE_START(`ts`, INTERVAL '1' MINUTE), COUNT(1) as `count` FROM t1 GROUP BY 
> TUMBLE(`ts`, INTERVAL '1' MINUTE)  抛异常
>
>
>异常堆栈:
>org.apache.flink.client.program.ProgramInvocationException: The main method 
>caused an error: Index: 1, Size: 1
>
>at 
>org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>
>at 
>org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>
>at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>
>at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>
>at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>
>at 
>org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>
>at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>
>at java.security.AccessController.doPrivileged(Native Method)
>
>at javax.security.auth.Subject.doAs(Subject.java:422)
>
>at 
>org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
>
>at 
>org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>
>at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>
>Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
>
>at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>
>at java.util.ArrayList.get(ArrayList.java:433)
>
>at java.util.Collections$UnmodifiableList.get(Collections.java:1311)
>
>at 
>org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:682)
>
>at 
>org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:665)
>
>at 
>org.apache.flink.table.planner.codegen.GenerateUtils$.generateInputAccess(GenerateUtils.scala:561)
>
>at 
>org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$generateConverterResultExpression$1(ExprCodeGenerator.scala:184)
>
>at 
>org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$generateConverterResultExpression$1$adapted(ExprCodeGenerator.scala:158)
>
>at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>
>at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
>
>at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
>
>at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:242)
>
>at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>
>at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>
>at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
>
>at 
>org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateConverterResultExpression(ExprCodeGenerator.scala:158)
>
>at 
>org.apache.flink.table.planner.plan.utils.ScanUtil$.convertToInternalRow(ScanUtil.scala:103)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan.translateToPlanInternal(StreamExecDataStreamScan.scala:126)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan.translateToPlanInternal(StreamExecDataStreamScan.scala:55)
>
>at 
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>
>at 
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan.translateToPlan(StreamExecDataStreamScan.scala:55)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>
>at 
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>
>at 
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:76)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
>
>at 
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>
>at 
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:117)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)
>
>at 
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>
>at 
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>
>at 
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>
>at 
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
>
>at 
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>
>at 
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
>
>at 
>org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:67)
>
>at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>
>at scala.collection.Iterator.foreach(Iterator.scala:937)
>
>at scala.collection.Iterator.foreach$(Iterator.scala:937)
>
>at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>
>at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>
>at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>
>at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
>at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>
>at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>
>at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
>at 
>org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
>
>at 
>org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
>
>at 
>org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>
>at 
>org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
>
>at 
>org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
>
>at 
>org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
>
>...

回复