Hi yelun,

Currently, there are no direct ways to dynamically load data and do join in
Flink-SQL, as a workaround you can implement your logic with an udtf. In
the udtf, you can load the data into a cache and update it according to
your requirement.

Best, Hequn

On Wed, Nov 14, 2018 at 10:34 AM yelun <986463...@qq.com> wrote:

> hi,
>
> I want to use flink sql to left join static dimension table from mysql
> currently, so I converted the mysql table into data stream to join with
> datastream which has converted to flink table. While I found that the
> real-time stream data is not joined correctly with mysql data  at the
> beginning, but the latter stream can be joined correctly. So I want to ask
> that is there any good way to make real-time stream can join with mysql
> data with table api which has loaded and supporting dynamicly loading mysql
> data into memory once each hour. Thanks a lot.
>
> The following is the some example code:
>
> public static JDBCInputFormatBuilder inputBuilder =
> JDBCInputFormat.buildJDBCInputFormat()
> .setDrivername(DRIVER_CLASS)
> .setDBUrl(DB_URL)
> .setUsername(USER_NAME)
> .setPassword(USER_PASS)
> .setQuery(SELECT_ALL_PERSONS)
> .setRowTypeInfo(ROW_TYPE_INFO);
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment();
> StreamTableEnvironment  tEnv = TableEnvironment.getTableEnvironment(env);
>
> DataStream<Order> orderA = env.addSource(new OrderFunction());
> tEnv.registerDataStream("tableA", orderA, "name, product, amount");
>
> DataStream<Row> mysql_table = env.createInput(inputBuilder.finish());
> String[] dim_table_fileds = {"id","name","age","address"};
>
> tEnv.registerDataStream("tableB",mysql_table);
> Table result = tEnv.sqlQuery("SELECT
> tableA.name,tableA.amount,tableB.age,tableB.address FROM tableB  join
> tableA on tableA.name = tableB.name" );
> tEnv.toRetractStream(result, ROW_TYPE_INFO_OUT).print();
> env.execute();
>
> Thanks a lot.
>

Reply via email to