Hi tony,
   完整代码,是从hive取数据,执行flatmap, aggregate操作后再下沉到mysql。 由于篇幅, 中间的udf 
function定义过程不完整贴出了,您可以参考下,非常感谢您的帮助,麻烦啦。

代码:
        # 执行环境
        env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings Settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
        tEnv = StreamTableEnvironment.create(env, Settings);
        
        # hive源
        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

        String confSite = "src\\main\\resources";

        String version = "3.1.2";

        String defaultDatabase = "fund_analysis";

        HiveCatalog hiveCat = new HiveCatalog("hive", defaultDatabase, 
confSite, confSite, version);

        tEnv.registerCatalog("hive", hiveCat);

        tEnv.useCatalog("hive");
        # hive 取数SQL
        String biz_date = "20211130";
        String tblSource = String.format("select " +
                "coalesce(a.rate,0) as yldrate, " +
                "coalesce(c.rate,0) as riskless_yldrate, " +
                "a.ccy_type, " +
                "a.biz_date, " +
                "b.is_exch_dt, " +
                "a.pf_id " +
                "from " +
                "ts_pf_yldrate a " +
                "inner join td_gl_day b on b.dt = a.biz_date " +
                "inner join ts_pf_bm_yldrate c on c.biz_date = a.biz_date and 
c.pf_id = a.pf_id " +
                "where a.biz_date <= '%s'", biz_date);
        Table table = tEnv.sqlQuery(tblSource);
        
        // 注册flatmap函数
        tEnv.createTemporarySystemFunction("RowFlatMap", 
SharpeRatioFlatMap.class);
        // 注册聚合函数
        tEnv.createTemporarySystemFunction("SharpeRatioAgg", 
SharpeRatioAggregate.class);
        
        // 执行flatmap操作
        Table tagTbl = table.flatMap(call("RowFlatMap",$("yldrate"),
                $("riskless_yldrate"),$("ccy_type"),$("biz_date"),
                $("is_exch_dt"),$("pf_id"), biz_date));
       
         // 切换catalog,并注册表
        tEnv.useCatalog("default_catalog");
        tEnv.createTemporaryView("tagTable",tagTbl);
        
        // 调用函数SharpeRatioAgg 计算结果
         Table result = tEnv.sqlQuery(String.format("select '%s' as biz_date, 
dmo_index_code, SharpeRatioAgg(yldrate, yldrate_riskless, dmo_index_code) as 
index_value from tagTable group by dmo_index_code", biz_date));
        // result.execute().print(); (--> 该步 result 可成功打印)
        
        // 下沉操作
        String mysqlSink = "create table bulk_index_sink(" +
                "  biz_date string, " +
                "  dmo_index_code string, " +
                "  index_value string" +
                ") with (" +
                "   'connector' = 'jdbc', " +
                "   'username' = 'root', " +
                "   'password' = 'xxxxxxx', " +
                "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
                "   'url' = 'jdbc:mysql://hadoop104:3306/test?useSSL=False', " +
                "   'table-name' = 'bulk_index_sink')";
        tEnv.executeSql(mysqlSink);

        
result.select("biz_date,dmo_index_code,index_value").insertInto("bulk_index_sink");
        tEnv.execute("mysql_sink_test");


xiao...@ysstech.com
 
发件人: Tony Wei
发送时间: 2022-02-25 14:13
收件人: user-zh
主题: Re: flink1.14 注册mysql connector报错
Hi xiaoyue,
 
請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件?
我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。
 
    public static void main(String[] args) {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings Settings =
EnvironmentSettings.newInstance().inBatchMode().build();
        StreamTableEnvironment tEnv =
StreamTableEnvironment.create(env, Settings);
 
        String mysqlSink = "create table bulk_index_sink(" +
                "  biz_date string, " +
                "  dmo_index_code string, " +
                "  index_value string, " +
                "  primary key(dmo_index_code) not enforced) " +
                "  with (" +
                "   'connector' = 'jdbc', " +
                "   'username' = 'root', " +
                "   'password' = 'yss300377@ZT', " +
                "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
                "   'url' =
'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " +
                "   'table-name' = 'bulk_index_sink')";
        tEnv.executeSql(mysqlSink).print();
//        tEnv.execute("mysql_sink_test");
    }
 
輸出的結果為:
+--------+
| result |
+--------+
|     OK |
+--------+
1 row in set
 
best regards,
 
xiaoyue <xiao...@ysstech.com> 於 2022年2月25日 週五 下午1:37寫道:
 
> flink1.14  注册mysql下车Connector报错,检查多次未发现语法错误,求助!
>
> 代码:
>         env = StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
>         tEnv = StreamTableEnvironment.create(env, Settings);
>
>        String mysqlSink = "create table bulk_index_sink(" +
>                 "  biz_date string, " +
>                 "  dmo_index_code string, " +
>                 "  index_value string, " +
>                 "  primary key(dmo_index_code) not enforced) " +
>                 "  with (" +
>                 "   'connector' = 'jdbc', " +
>                 "   'username' = 'root', " +
>                 "   'password' = 'yss300377@ZT', " +
>                 "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
>                 "   'url' = 'jdbc:mysql://
> 192.168.100.104:3306/test?useSSL=False', " +
>                 "   'table-name' = 'bulk_index_sink')";
>          tEnv.executeSql(mysqlSink);
>          tEnv.execute("mysql_sink_test");
>
> 报错:
>         org.apache.flink.table.api.SqlParserException: SQL parse failed.
> Encountered "not" at line 1, column 126.
> Was expecting one of:
>     "DISABLE" ...
>     "ENABLE" ...
>     "NORELY" ...
>     "NOVALIDATE" ...
>     "RELY" ...
>     "VALIDATE" ...
>     ")" ...
>     "," ...
>
>
> at
> org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
> at
> org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:195)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
> at
> com.yss.datamiddle.index.c001.SharpeRatioTest.udfFlatMapTest(SharpeRatioTest.java:175)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered
> "not" at line 1, column 126.
>
>
>
> xiao...@ysstech.com
>

回复