昨晚解决了,暂时还没查清原因,解决方式把createTemporaryTable 方法换成之前过时的那个方法registerTableSource,别的不用动。
pengchengl...@163.com 发件人: PCL 发送时间: 2020-05-12 23:39 收件人: user-zh 主题: Re:回复:flink10读取kafka报错 感谢回复! 这个很神奇的是,执行sqlquery就没问题 /*Table tb1 =tableEnv.sqlQuery("select sum(amount),TUMBLE_END(proctime, INTERVAL '5' SECOND)" + " from sourceTable group by TUMBLE(proctime, INTERVAL '5' SECOND) "); tb1.printSchema();*/ 放开注释后,打印的schema是 root |-- EXPR$0: DOUBLE |-- EXPR$1: TIMESTAMP(3) 在 2020-05-12 22:36:17,"忝忝向��" <153488...@qq.com> 写道: >TIMESTAMP(3)时间格式不对吧 > > >------------------ 原始邮件 ------------------ >发件人: "PCL"<pengchengl...@163.com>; >发送时间: 2020年5月12日(星期二) 晚上9:43 >收件人: "user-zh"<user-zh@flink.apache.org>; > >主题: flink10读取kafka报错 > > > >各位大佬: > 有没有遇到过这个问题,Window aggregate can only be defined over a time attribute >column, but TIMESTAMP(3) encountered. >无论是事件时间还是处理时间,都报这个错;flink和blink的planner报错差不多。 >版本如下: ><flink.version>1.10.0</flink.version> ><scala.version>2.11</scala.version> >代码如下: >//获取运行环境 >StreamExecutionEnvironment env = >StreamExecutionEnvironment.getExecutionEnvironment(); >EnvironmentSettings settings = >EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >//EnvironmentSettings settings = >EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); > //创建一个tableEnvironment >StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); > >Schema schema = new Schema() >//.field("id", "VARCHAR").from("id") >.field("id", "STRING") >//.field("name", "VARCHAR") >.field("amount", "DOUBLE") > > .field("proctime", Types.SQL_TIMESTAMP).proctime() >//.field("rowtime", Types.SQL_TIMESTAMP) > > //.rowtime( > > // new Rowtime() > > >// > .timestampsFromField( > > >// > "eventtime") > > >// > .watermarksPeriodicBounded(2000)) >; > >// "0.8", "0.9", "0.10", "0.11", and "universal" >tableEnv.connect(new Kafka().version("universal") > > .topic("source0511") > > .property("zookeeper.connect", "172.16.44.28:7758") > > .property("bootstrap.servers", "172.16.44.28:9096") > > .property("group.id", "source0511-group") > > .startFromEarliest() > > ) > > .withFormat(new Csv()) > > .withSchema(schema) > > .inAppendMode() > > .createTemporaryTable("sourceTable"); > >tableEnv.connect( >new Kafka() > > .version("universal") >// "0.8", "0.9", "0.10", "0.11", and "universal" >.topic("sink0511") > > .property("acks", "all") > > .property("retries", "0") > > .property("batch.size", "16384") > > .property("linger.ms", "10") > > .property("zookeeper.connect", "172.16.44.28:7758") > > .property("bootstrap.servers", "172.16.44.28:9096") > > .sinkPartitionerFixed()) > > .inAppendMode() > > .withFormat(new Json()) > > .withSchema( >new Schema().field("totalamount", "DOUBLE") >//.field("total", "INT") >.field("time", Types.SQL_TIMESTAMP) > > ) > > .createTemporaryTable("sinkTable"); > >tableEnv.sqlUpdate("insert into sinkTable" >+ " select sum(amount),TUMBLE_END(proctime, INTERVAL '5' SECOND) " >+ "from sourceTable group by TUMBLE(proctime, INTERVAL '5' SECOND)"); >//SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT >user_name) > // FROM user_actions > // GROUP BY TUMBLE(user_action_time, >INTERVAL '10' MINUTE); >env.execute("test");