还是不行, SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "time FROM" at line 1, column 44. Was expecting one of: "CURSOR" ... "EXISTS" ... "NOT" ... "ROW" ... "(" ... "+" ... "-" ... <UNSIGNED_INTEGER_LITERAL> ... <DECIMAL_NUMERIC_LITERAL> ... <APPROX_NUMERIC_LITERAL> ... <BINARY_STRING_LITERAL> ... <PREFIXED_STRING_LITERAL> ... <QUOTED_STRING> ... <UNICODE_STRING_LITERAL> ... "TRUE" ... "FALSE" ... "UNKNOWN" ... "NULL" ... <LBRACE_D> ... <LBRACE_T> ... <LBRACE_TS> ... "DATE" ... "TIME" <QUOTED_STRING> ... "TIMESTAMP" ... "INTERVAL" ... "?" ... "CAST" ... "EXTRACT" ... "POSITION" ... "CONVERT" ... "TRANSLATE" ... "OVERLAY" ... "FLOOR" ... "CEIL" ... "CEILING" ... "SUBSTRING" ... "TRIM" ... "CLASSIFIER" ... "MATCH_NUMBER" ... "RUNNING" ... "PREV" ... "NEXT" ... "JSON_EXISTS" ... "JSON_VALUE" ... "JSON_QUERY" ... "JSON_OBJECT" ... "JSON_OBJECTAGG" ... "JSON_ARRAY" ... "JSON_ARRAYAGG" ... <LBRACE_FN> ... "MULTISET" ... "ARRAY" ... "MAP" ... "PERIOD" ... "SPECIFIC" ... <IDENTIFIER> ... <QUOTED_IDENTIFIER> ... <BACK_QUOTED_IDENTIFIER> ... <BRACKET_QUOTED_IDENTIFIER> ... <UNICODE_QUOTED_IDENTIFIER> ... "ABS" ... "AVG" ... "CARDINALITY" ... "CHAR_LENGTH" ... "CHARACTER_LENGTH" ... "COALESCE" ... "COLLECT" ... "COVAR_POP" ... "COVAR_SAMP" ... "CUME_DIST" ... "COUNT" ... "CURRENT_DATE" ... "CURRENT_TIME" ... "CURRENT_TIMESTAMP" ... "DENSE_RANK" ... "ELEMENT" ... "EXP" ... "FIRST_VALUE" ... "FUSION" ... "GROUPING" ... "HOUR" ... "LAG" ... "LEAD" ... "LEFT" ... "LAST_VALUE" ... "LN" ... "LOCALTIME" ... "LOCALTIMESTAMP" ... "LOWER" ... "MAX" ... "MIN" ... "MINUTE" ... "MOD" ... "MONTH" ... "NTH_VALUE" ... "NTILE" ... "NULLIF" ... "OCTET_LENGTH" ... "PERCENT_RANK" ... "POWER" ... "RANK" ... "REGR_COUNT" ... "REGR_SXX" ... "REGR_SYY" ... "RIGHT" ... "ROW_NUMBER" ... "SECOND" ... "SQRT" ... "STDDEV_POP" ... "STDDEV_SAMP" ... "SUM" ... "UPPER" ... "TRUNCATE" ... "USER" ... "VAR_POP" ... "VAR_SAMP" ... "YEAR" ... "CURRENT_CATALOG" ... "CURRENT_DEFAULT_TRANSFORM_GROUP" ... "CURRENT_PATH" ... "CURRENT_ROLE" ... "CURRENT_SCHEMA" ... "CURRENT_USER" ... "SESSION_USER" ... "SYSTEM_USER" ... "NEW" ... "CASE" ... "CURRENT" ... at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50) at org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) at org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63) at org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
query: streamTableEnv.sqlUpdate( """ | |CREATE TABLE user_behavior ( | uid VARCHAR, | phoneType VARCHAR, | clickCount INT, | proctime AS PROCTIME(), | `time` TIMESTAMP(3) |) WITH ( | 'connector.type' = 'kafka', | 'connector.version' = 'universal', | 'connector.topic' = 'user_behavior', | 'connector.startup-mode' = 'earliest-offset', | 'connector.properties.0.key' = 'zookeeper.connect', | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181', | 'connector.properties.1.key' = 'bootstrap.servers', | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092', | 'update-mode' = 'append', | 'format.type' = 'json', | 'format.derive-schema' = 'true' |) |""".stripMargin) streamTableEnv.sqlUpdate( """ | |insert into user_cnt |SELECT | cast(b.`time` as string), u.age |FROM | user_behavior AS b | JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u | ON b.uid = u.uid | |""".stripMargin) 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME() 放在select 后面也不行。 在 2020-06-12 15:29:49,"Benchao Li" <libenc...@apache.org> 写道: >你写反了,是proctime AS PROCTIME()。 >计算列跟普通query里面的AS是反着的。 > >Zhou Zach <wander...@163.com> 于2020年6月12日周五 下午2:24写道: > >> flink 1.10.0: >> 在create table中,加PROCTIME() AS proctime字段报错 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-06-12 14:08:11,"Benchao Li" <libenc...@apache.org> 写道: >> >Hi, >> > >> >Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。 >> >可以参考下[1] >> > >> >[1] >> > >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html >> > >> >Zhou Zach <wander...@163.com> 于2020年6月12日周五 下午1:33写道: >> > >> >> SLF4J: Class path contains multiple SLF4J bindings. >> >> >> >> SLF4J: Found binding in >> >> >> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> >> >> >> SLF4J: Found binding in >> >> >> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> >> >> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >> >> explanation. >> >> >> >> SLF4J: Actual binding is of type >> >> [org.apache.logging.slf4j.Log4jLoggerFactory] >> >> >> >> ERROR StatusLogger No log4j2 configuration file found. Using default >> >> configuration: logging only errors to the console. >> >> >> >> Exception in thread "main" org.apache.flink.table.api.TableException: >> >> Cannot generate a valid execution plan for the given query: >> >> >> >> >> >> >> >> >> >> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], >> >> fields=[time, sum_age]) >> >> >> >> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) >> >> >> >> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) >> >> >> >> :- FlinkLogicalCalc(select=[uid, time]) >> >> >> >> : +- FlinkLogicalTableSourceScan(table=[[default_catalog, >> >> default_database, user_behavior, source: [KafkaTableSource(uid, >> phoneType, >> >> clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) >> >> >> >> +- FlinkLogicalSnapshot(period=[$cor0.time]) >> >> >> >> +- FlinkLogicalCalc(select=[uid, age]) >> >> >> >> +- FlinkLogicalTableSourceScan(table=[[default_catalog, >> >> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex, >> >> age, created_time)]]], fields=[uid, sex, age, created_time]) >> >> >> >> >> >> >> >> >> >> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left >> >> table's proctime field, doesn't support 'PROCTIME()' >> >> >> >> Please check the documentation for the set of currently supported SQL >> >> features. >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) >> >> >> >> at >> >> >> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >> >> >> >> at >> >> >> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >> >> >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> >> >> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> >> >> >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> >> >> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> >> >> >> at >> >> >> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) >> >> >> >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) >> >> >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) >> >> >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) >> >> >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >> >> >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) >> >> >> >> at >> >> >> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) >> >> >> >> at >> >> >> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) >> >> >> >> Caused by: org.apache.flink.table.api.TableException: Temporal table >> join >> >> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime >> >> field, doesn't support 'PROCTIME()' >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147) >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >> >> >> >> at >> >> >> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) >> >> >> >> at >> >> >> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) >> >> >> >> at >> >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) >> >> >> >> at >> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) >> >> >> >> at >> >> >> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) >> >> >> >> ... 20 more >> >> >> >> >> >> >> >> >> >> query: >> >> >> >> >> >> val streamExecutionEnv = >> StreamExecutionEnvironment.getExecutionEnvironment >> >> val blinkEnvSettings = >> >> >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >> >> val streamTableEnv = >> >> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings) >> >> >> >> streamTableEnv.sqlUpdate( >> >> """ >> >> | >> >> |CREATE TABLE user_behavior ( >> >> | uid VARCHAR, >> >> | phoneType VARCHAR, >> >> | clickCount INT, >> >> | `time` TIMESTAMP(3) >> >> |) WITH ( >> >> | 'connector.type' = 'kafka', >> >> | 'connector.version' = 'universal', >> >> | 'connector.topic' = 'user_behavior', >> >> | 'connector.startup-mode' = 'earliest-offset', >> >> | 'connector.properties.0.key' = 'zookeeper.connect', >> >> | 'connector.properties.0.value' = >> 'cdh1:2181,cdh2:2181,cdh3:2181', >> >> | 'connector.properties.1.key' = 'bootstrap.servers', >> >> | 'connector.properties.1.value' = >> 'cdh1:9092,cdh2:9092,cdh3:9092', >> >> | 'update-mode' = 'append', >> >> | 'format.type' = 'json', >> >> | 'format.derive-schema' = 'true' >> >> |) >> >> |""".stripMargin) >> >> streamTableEnv.sqlUpdate( >> >> """ >> >> | >> >> |CREATE TABLE user_cnt ( >> >> | `time` VARCHAR, >> >> | sum_age INT >> >> |) WITH ( >> >> | 'connector.type' = 'jdbc', >> >> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard', >> >> | 'connector.table' = 'user_cnt', >> >> | 'connector.username' = 'root', >> >> | 'connector.password' = '123456', >> >> | 'connector.write.flush.max-rows' = '1' >> >> |) >> >> |""".stripMargin) >> >> val userTableSource = new MysqlAsyncLookupTableSource( >> >> Array("uid", "sex", "age", "created_time"), >> >> Array(), >> >> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING)) >> >> streamTableEnv.registerTableSource("users", userTableSource) >> >> streamTableEnv.sqlUpdate( >> >> """ >> >> | >> >> |insert into user_cnt >> >> |SELECT >> >> | cast(b.`time` as string), u.age >> >> |FROM >> >> | user_behavior AS b >> >> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u >> >> | ON b.uid = u.uid >> >> | >> >> |""".stripMargin) >> >> streamTableEnv.execute("Temporal table join") >>