Hi xiaoyue, 看起來是這行造成的 tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); 你可能需要在執行下沉操作前將 SqlDialect 更換回 SqlDialect.DEFAULT。
best regards, xiaoyue <xiao...@ysstech.com> 於 2022年2月25日 週五 下午2:36寫道: > Hi tony, > 完整代码,是从hive取数据,执行flatmap, aggregate操作后再下沉到mysql。 由于篇幅, 中间的udf > function定义过程不完整贴出了,您可以参考下,非常感谢您的帮助,麻烦啦。 > > 代码: > # 执行环境 > env = StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings Settings = > EnvironmentSettings.newInstance().inBatchMode().build(); > tEnv = StreamTableEnvironment.create(env, Settings); > > # hive源 > tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > String confSite = "src\\main\\resources"; > > String version = "3.1.2"; > > String defaultDatabase = "fund_analysis"; > > HiveCatalog hiveCat = new HiveCatalog("hive", defaultDatabase, > confSite, confSite, version); > > tEnv.registerCatalog("hive", hiveCat); > > tEnv.useCatalog("hive"); > # hive 取数SQL > String biz_date = "20211130"; > String tblSource = String.format("select " + > "coalesce(a.rate,0) as yldrate, " + > "coalesce(c.rate,0) as riskless_yldrate, " + > "a.ccy_type, " + > "a.biz_date, " + > "b.is_exch_dt, " + > "a.pf_id " + > "from " + > "ts_pf_yldrate a " + > "inner join td_gl_day b on b.dt = a.biz_date " + > "inner join ts_pf_bm_yldrate c on c.biz_date = a.biz_date > and c.pf_id = a.pf_id " + > "where a.biz_date <= '%s'", biz_date); > Table table = tEnv.sqlQuery(tblSource); > > // 注册flatmap函数 > tEnv.createTemporarySystemFunction("RowFlatMap", > SharpeRatioFlatMap.class); > // 注册聚合函数 > tEnv.createTemporarySystemFunction("SharpeRatioAgg", > SharpeRatioAggregate.class); > > // 执行flatmap操作 > Table tagTbl = table.flatMap(call("RowFlatMap",$("yldrate"), > $("riskless_yldrate"),$("ccy_type"),$("biz_date"), > $("is_exch_dt"),$("pf_id"), biz_date)); > > // 切换catalog,并注册表 > tEnv.useCatalog("default_catalog"); > tEnv.createTemporaryView("tagTable",tagTbl); > > // 调用函数SharpeRatioAgg 计算结果 > Table result = tEnv.sqlQuery(String.format("select '%s' as > biz_date, dmo_index_code, SharpeRatioAgg(yldrate, yldrate_riskless, > dmo_index_code) as index_value from tagTable group by dmo_index_code", > biz_date)); > // result.execute().print(); (--> 该步 result 可成功打印) > > // 下沉操作 > String mysqlSink = "create table bulk_index_sink(" + > " biz_date string, " + > " dmo_index_code string, " + > " index_value string" + > ") with (" + > " 'connector' = 'jdbc', " + > " 'username' = 'root', " + > " 'password' = 'xxxxxxx', " + > " 'driver' = 'com.mysql.cj.jdbc.Driver', " + > " 'url' = > 'jdbc:mysql://hadoop104:3306/test?useSSL=False', " + > " 'table-name' = 'bulk_index_sink')"; > tEnv.executeSql(mysqlSink); > > > result.select("biz_date,dmo_index_code,index_value").insertInto("bulk_index_sink"); > tEnv.execute("mysql_sink_test"); > > > xiao...@ysstech.com > > 发件人: Tony Wei > 发送时间: 2022-02-25 14:13 > 收件人: user-zh > 主题: Re: flink1.14 注册mysql connector报错 > Hi xiaoyue, > > 請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件? > 我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。 > > public static void main(String[] args) { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings Settings = > EnvironmentSettings.newInstance().inBatchMode().build(); > StreamTableEnvironment tEnv = > StreamTableEnvironment.create(env, Settings); > > String mysqlSink = "create table bulk_index_sink(" + > " biz_date string, " + > " dmo_index_code string, " + > " index_value string, " + > " primary key(dmo_index_code) not enforced) " + > " with (" + > " 'connector' = 'jdbc', " + > " 'username' = 'root', " + > " 'password' = 'yss300377@ZT', " + > " 'driver' = 'com.mysql.cj.jdbc.Driver', " + > " 'url' = > 'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " + > " 'table-name' = 'bulk_index_sink')"; > tEnv.executeSql(mysqlSink).print(); > // tEnv.execute("mysql_sink_test"); > } > > 輸出的結果為: > +--------+ > | result | > +--------+ > | OK | > +--------+ > 1 row in set > > best regards, > > xiaoyue <xiao...@ysstech.com> 於 2022年2月25日 週五 下午1:37寫道: > > > flink1.14 注册mysql下车Connector报错,检查多次未发现语法错误,求助! > > > > 代码: > > env = StreamExecutionEnvironment.getExecutionEnvironment(); > > EnvironmentSettings Settings = > > EnvironmentSettings.newInstance().inBatchMode().build(); > > tEnv = StreamTableEnvironment.create(env, Settings); > > > > String mysqlSink = "create table bulk_index_sink(" + > > " biz_date string, " + > > " dmo_index_code string, " + > > " index_value string, " + > > " primary key(dmo_index_code) not enforced) " + > > " with (" + > > " 'connector' = 'jdbc', " + > > " 'username' = 'root', " + > > " 'password' = 'yss300377@ZT', " + > > " 'driver' = 'com.mysql.cj.jdbc.Driver', " + > > " 'url' = 'jdbc:mysql:// > > 192.168.100.104:3306/test?useSSL=False', " + > > " 'table-name' = 'bulk_index_sink')"; > > tEnv.executeSql(mysqlSink); > > tEnv.execute("mysql_sink_test"); > > > > 报错: > > org.apache.flink.table.api.SqlParserException: SQL parse failed. > > Encountered "not" at line 1, column 126. > > Was expecting one of: > > "DISABLE" ... > > "ENABLE" ... > > "NORELY" ... > > "NOVALIDATE" ... > > "RELY" ... > > "VALIDATE" ... > > ")" ... > > "," ... > > > > > > at > > > org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56) > > at > > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98) > > at > > > org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:195) > > at > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736) > > at > > > com.yss.datamiddle.index.c001.SharpeRatioTest.udfFlatMapTest(SharpeRatioTest.java:175) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > > > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > > at > > > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > > at > > > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > > at > > > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > > at > > > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > > at > > > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > > at > > > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > > at > > > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) > > at > > > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > > at > > > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220) > > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53) > > Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered > > "not" at line 1, column 126. > > > > > > > > xiao...@ysstech.com > > >