Hi tony, 完整代码,是从hive取数据,执行flatmap, aggregate操作后再下沉到mysql。 由于篇幅, 中间的udf function定义过程不完整贴出了,您可以参考下,非常感谢您的帮助,麻烦啦。
代码: # 执行环境 env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings Settings = EnvironmentSettings.newInstance().inBatchMode().build(); tEnv = StreamTableEnvironment.create(env, Settings); # hive源 tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); String confSite = "src\\main\\resources"; String version = "3.1.2"; String defaultDatabase = "fund_analysis"; HiveCatalog hiveCat = new HiveCatalog("hive", defaultDatabase, confSite, confSite, version); tEnv.registerCatalog("hive", hiveCat); tEnv.useCatalog("hive"); # hive 取数SQL String biz_date = "20211130"; String tblSource = String.format("select " + "coalesce(a.rate,0) as yldrate, " + "coalesce(c.rate,0) as riskless_yldrate, " + "a.ccy_type, " + "a.biz_date, " + "b.is_exch_dt, " + "a.pf_id " + "from " + "ts_pf_yldrate a " + "inner join td_gl_day b on b.dt = a.biz_date " + "inner join ts_pf_bm_yldrate c on c.biz_date = a.biz_date and c.pf_id = a.pf_id " + "where a.biz_date <= '%s'", biz_date); Table table = tEnv.sqlQuery(tblSource); // 注册flatmap函数 tEnv.createTemporarySystemFunction("RowFlatMap", SharpeRatioFlatMap.class); // 注册聚合函数 tEnv.createTemporarySystemFunction("SharpeRatioAgg", SharpeRatioAggregate.class); // 执行flatmap操作 Table tagTbl = table.flatMap(call("RowFlatMap",$("yldrate"), $("riskless_yldrate"),$("ccy_type"),$("biz_date"), $("is_exch_dt"),$("pf_id"), biz_date)); // 切换catalog,并注册表 tEnv.useCatalog("default_catalog"); tEnv.createTemporaryView("tagTable",tagTbl); // 调用函数SharpeRatioAgg 计算结果 Table result = tEnv.sqlQuery(String.format("select '%s' as biz_date, dmo_index_code, SharpeRatioAgg(yldrate, yldrate_riskless, dmo_index_code) as index_value from tagTable group by dmo_index_code", biz_date)); // result.execute().print(); (--> 该步 result 可成功打印) // 下沉操作 String mysqlSink = "create table bulk_index_sink(" + " biz_date string, " + " dmo_index_code string, " + " index_value string" + ") with (" + " 'connector' = 'jdbc', " + " 'username' = 'root', " + " 'password' = 'xxxxxxx', " + " 'driver' = 'com.mysql.cj.jdbc.Driver', " + " 'url' = 'jdbc:mysql://hadoop104:3306/test?useSSL=False', " + " 'table-name' = 'bulk_index_sink')"; tEnv.executeSql(mysqlSink); result.select("biz_date,dmo_index_code,index_value").insertInto("bulk_index_sink"); tEnv.execute("mysql_sink_test"); xiao...@ysstech.com 发件人: Tony Wei 发送时间: 2022-02-25 14:13 收件人: user-zh 主题: Re: flink1.14 注册mysql connector报错 Hi xiaoyue, 請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件? 我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。 public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings Settings = EnvironmentSettings.newInstance().inBatchMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, Settings); String mysqlSink = "create table bulk_index_sink(" + " biz_date string, " + " dmo_index_code string, " + " index_value string, " + " primary key(dmo_index_code) not enforced) " + " with (" + " 'connector' = 'jdbc', " + " 'username' = 'root', " + " 'password' = 'yss300377@ZT', " + " 'driver' = 'com.mysql.cj.jdbc.Driver', " + " 'url' = 'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " + " 'table-name' = 'bulk_index_sink')"; tEnv.executeSql(mysqlSink).print(); // tEnv.execute("mysql_sink_test"); } 輸出的結果為: +--------+ | result | +--------+ | OK | +--------+ 1 row in set best regards, xiaoyue <xiao...@ysstech.com> 於 2022年2月25日 週五 下午1:37寫道: > flink1.14 注册mysql下车Connector报错,检查多次未发现语法错误,求助! > > 代码: > env = StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings Settings = > EnvironmentSettings.newInstance().inBatchMode().build(); > tEnv = StreamTableEnvironment.create(env, Settings); > > String mysqlSink = "create table bulk_index_sink(" + > " biz_date string, " + > " dmo_index_code string, " + > " index_value string, " + > " primary key(dmo_index_code) not enforced) " + > " with (" + > " 'connector' = 'jdbc', " + > " 'username' = 'root', " + > " 'password' = 'yss300377@ZT', " + > " 'driver' = 'com.mysql.cj.jdbc.Driver', " + > " 'url' = 'jdbc:mysql:// > 192.168.100.104:3306/test?useSSL=False', " + > " 'table-name' = 'bulk_index_sink')"; > tEnv.executeSql(mysqlSink); > tEnv.execute("mysql_sink_test"); > > 报错: > org.apache.flink.table.api.SqlParserException: SQL parse failed. > Encountered "not" at line 1, column 126. > Was expecting one of: > "DISABLE" ... > "ENABLE" ... > "NORELY" ... > "NOVALIDATE" ... > "RELY" ... > "VALIDATE" ... > ")" ... > "," ... > > > at > org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98) > at > org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:195) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736) > at > com.yss.datamiddle.index.c001.SharpeRatioTest.udfFlatMapTest(SharpeRatioTest.java:175) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53) > Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered > "not" at line 1, column 126. > > > > xiao...@ysstech.com >