I need to bootstrap a keyed process function.

So, I was hoping to use the Table SQL API because I thought it could
parallelize the work more efficiently via partitioning.
I need to boot strap keyed state for a keyed process function, with
Flnk 1.12.1, thus I think I am required to use the DataSet API.

Is my only option JdbcInputFormat?

ExecutionEnvironment batchEnv =
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment batchTableEnv =
BatchTableEnvironment.create(batchEnv);
batchTableEnv.executeSql("
CREATE TABLE my_table (
....
) WITH (
   'connector.type' = 'jdbc',
   'connector.url' = '?',
   'connector.username' = '?',
   'connector.password' = '?',
   'connector.table' = 'my_table'
)");

Table table = batchTableEnv.sqlQuery("SELECT name, step FROM my_table");
DataSet<Row> rowDataSet = batchTableEnv.toDataSet(table, Row.class);
rowDataSet.print();

This ends up throwing this exception:

org.apache.flink.table.api.TableException: Only BatchTableSource and
InputFormatTableSource are supported in BatchTableEnvironment.
at
org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:116)
at
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:580)
at
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:555)
at
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:537)
at
org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:101)

On Thu, Jun 17, 2021 at 12:51 AM Timo Walther <twal...@apache.org> wrote:

> Hi Marco,
>
> which operations do you want to execute in the bootstrap pipeline?
>
> Maybe you don't need to use SQL and old planner. At least this would
> simplify the friction by going through another API layer.
>
> The JDBC connector can be directly be used in DataSet API as well.
>
> Regards,
> Timo
>
>
>
> On 17.06.21 07:33, Marco Villalobos wrote:
> > Thank you very much!
> >
> > I tried using Flink's SQL JDBC connector, and ran into issues.
> > According to the flink documentation, only the old planner is compatible
> > with the DataSet API.
> >
> > When I connect to the table:
> >
> > CREATE TABLE my_table (
> > ....
> > ) WITH (
> >     'connector.type' = 'jdbc',
> >     'connector.url' = '?',
> >     'connector.username' = '?',
> >     'connector.password' = '?',
> >     'connector.table' = 'my_table'
> > )
> >
> > It creates a JdbcTableSource, but only BatchTableSource and
> > InputFormatTableSource are supported in BatchTableEnvironment.
> >
> > By the way, it was very challenging to figure out how to create that
> > connection string, because its a different format than what is in the
> > documentation. I had to comb through JdbcTableSourceSinkFactory to
> > figure out how to connect.
> >
> > Is it even possible to use the DataSet API with the Table SQL api in
> > Flink 1.12.1?
> >
> >
> > On Wed, Jun 16, 2021 at 4:55 AM Robert Metzger <rmetz...@apache.org
> > <mailto:rmetz...@apache.org>> wrote:
> >
> >     Hi Marco,
> >
> >     The DataSet API will not run out of memory, as it spills to disk if
> >     the data doesn't fit anymore.
> >     Load is distributed by partitioning data.
> >
> >     Giving you advice depends a bit on the use-case. I would explore two
> >     major options:
> >     a) reading the data from postgres using Flink's SQL JDBC connector
> >     [1]. 200 GB is not much data. A 1gb network link needs ~30 minutes
> >     to transfer that (125 megabytes / second)
> >     b) Using the DataSet API and state processor API. I would first try
> >     to see how much effort it is to read the data using the DataSet API
> >     (could be less convenient than the Flink SQL JDBC connector).
> >
> >     [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/
> >     <
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/
> >
> >
> >
> >     On Wed, Jun 16, 2021 at 6:50 AM Marco Villalobos
> >     <mvillalo...@kineteque.com <mailto:mvillalo...@kineteque.com>>
> wrote:
> >
> >         I must bootstrap state from postgres (approximately 200 GB of
> >         data) and I notice that the state processor API requires the
> >         DataSet API in order to bootstrap state for the Stream API.
> >
> >         I wish there was a way to use the SQL API and use a partitioned
> >         scan, but I don't know if that is even possible with the DataSet
> >         API.
> >
> >         I never used the DataSet API, and I am unsure how it manages
> >         memory, or distributes load, when handling large state.
> >
> >         Would it run out of memory if I map data from a JDBCInputFormat
> >         into a large DataSet and then use that to bootstrap state for my
> >         stream job?
> >
> >         Any advice on how I should proceed with this would be greatly
> >         appreciated.
> >
> >         Thank you.
> >
>
>

Reply via email to