I need to bootstrap a keyed process function. So, I was hoping to use the Table SQL API because I thought it could parallelize the work more efficiently via partitioning. I need to boot strap keyed state for a keyed process function, with Flnk 1.12.1, thus I think I am required to use the DataSet API.
Is my only option JdbcInputFormat? ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment batchTableEnv = BatchTableEnvironment.create(batchEnv); batchTableEnv.executeSql(" CREATE TABLE my_table ( .... ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = '?', 'connector.username' = '?', 'connector.password' = '?', 'connector.table' = 'my_table' )"); Table table = batchTableEnv.sqlQuery("SELECT name, step FROM my_table"); DataSet<Row> rowDataSet = batchTableEnv.toDataSet(table, Row.class); rowDataSet.print(); This ends up throwing this exception: org.apache.flink.table.api.TableException: Only BatchTableSource and InputFormatTableSource are supported in BatchTableEnvironment. at org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:116) at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:580) at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:555) at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:537) at org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:101) On Thu, Jun 17, 2021 at 12:51 AM Timo Walther <twal...@apache.org> wrote: > Hi Marco, > > which operations do you want to execute in the bootstrap pipeline? > > Maybe you don't need to use SQL and old planner. At least this would > simplify the friction by going through another API layer. > > The JDBC connector can be directly be used in DataSet API as well. > > Regards, > Timo > > > > On 17.06.21 07:33, Marco Villalobos wrote: > > Thank you very much! > > > > I tried using Flink's SQL JDBC connector, and ran into issues. > > According to the flink documentation, only the old planner is compatible > > with the DataSet API. > > > > When I connect to the table: > > > > CREATE TABLE my_table ( > > .... > > ) WITH ( > > 'connector.type' = 'jdbc', > > 'connector.url' = '?', > > 'connector.username' = '?', > > 'connector.password' = '?', > > 'connector.table' = 'my_table' > > ) > > > > It creates a JdbcTableSource, but only BatchTableSource and > > InputFormatTableSource are supported in BatchTableEnvironment. > > > > By the way, it was very challenging to figure out how to create that > > connection string, because its a different format than what is in the > > documentation. I had to comb through JdbcTableSourceSinkFactory to > > figure out how to connect. > > > > Is it even possible to use the DataSet API with the Table SQL api in > > Flink 1.12.1? > > > > > > On Wed, Jun 16, 2021 at 4:55 AM Robert Metzger <rmetz...@apache.org > > <mailto:rmetz...@apache.org>> wrote: > > > > Hi Marco, > > > > The DataSet API will not run out of memory, as it spills to disk if > > the data doesn't fit anymore. > > Load is distributed by partitioning data. > > > > Giving you advice depends a bit on the use-case. I would explore two > > major options: > > a) reading the data from postgres using Flink's SQL JDBC connector > > [1]. 200 GB is not much data. A 1gb network link needs ~30 minutes > > to transfer that (125 megabytes / second) > > b) Using the DataSet API and state processor API. I would first try > > to see how much effort it is to read the data using the DataSet API > > (could be less convenient than the Flink SQL JDBC connector). > > > > [1] > > > https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/ > > < > https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/ > > > > > > > > On Wed, Jun 16, 2021 at 6:50 AM Marco Villalobos > > <mvillalo...@kineteque.com <mailto:mvillalo...@kineteque.com>> > wrote: > > > > I must bootstrap state from postgres (approximately 200 GB of > > data) and I notice that the state processor API requires the > > DataSet API in order to bootstrap state for the Stream API. > > > > I wish there was a way to use the SQL API and use a partitioned > > scan, but I don't know if that is even possible with the DataSet > > API. > > > > I never used the DataSet API, and I am unsure how it manages > > memory, or distributes load, when handling large state. > > > > Would it run out of memory if I map data from a JDBCInputFormat > > into a large DataSet and then use that to bootstrap state for my > > stream job? > > > > Any advice on how I should proceed with this would be greatly > > appreciated. > > > > Thank you. > > > >