Hi Sambaran, I'm not sure if this is the best approach, though I don't know your full use case/ implementation.
What kind of error do you get when trying to map into a PreparedStatement? I assume you tried something like this? SingleOutputStreamOperator<Row> stream = env.fromElements(Row.of("YourProcedureA"), Row.of("YourProcedureB")); stream.addSink(JdbcSink.sink( "EXEC ?", (preparedStatement, row) -> { // extend `preparedStatement` with row info preparedStatement.setString(0, (String) row.getField(0)); }, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:derby:memory:ebookshop") .withDriverName("org.apache.derby.jdbc.EmbeddedDriver") .build())); Best, Austin On Tue, Apr 20, 2021 at 12:42 PM Sambaran <sambaran2...@gmail.com> wrote: > Hi Austin, > > We are using this for jdbc interfacing to populate postgres tables based > on the data coming from an event source. > > We tried with the approach mentioned in the doc > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html > but did not find a suitable way to map SingleOutputStreamOperator<Row> . > Can you please let me know if this is the right approach and if yes, how do > we map the SingleOutputStreamOperator<Row> to the preparedstatement in > JdbcStatementBuilder? > > Thanks for your help! > > Regards > Sambaran > > On Tue, Apr 20, 2021 at 6:30 PM Austin Cawley-Edwards < > austin.caw...@gmail.com> wrote: > >> Great, thanks for the clarification! I'm checking with others now. Are >> you using other parts of the Table/SQL APIs, or just this for JDBC >> interfacing? >> >> Best, >> Austin >> >> On Tue, Apr 20, 2021 at 12:20 PM Sambaran <sambaran2...@gmail.com> wrote: >> >>> Hi Austin, >>> >>> Thanks for replying. This is exactly as you mentioned here. Do we have a >>> way to execute the procedure with 1.11 or upper version as >>> JDBCAppendTableSink is no longer available with these? >>> >>> Regards >>> Sambaran >>> >>> On Tue, Apr 20, 2021 at 6:11 PM Austin Cawley-Edwards < >>> austin.caw...@gmail.com> wrote: >>> >>>> Hey Sambaran, >>>> >>>> I'm not too familiar with the 1.7 JDBCAppendTableSink, but to make sure >>>> I understand what you're current solution looks like, it's something like >>>> the following, where you're triggering a procedure on each element of a >>>> stream? >>>> >>>> JDBCAppendTableSink sink = JDBCAppendTableSink.builder() >>>> .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") >>>> .setDBUrl("jdbc:derby:memory:ebookshop") >>>> .setQuery("EXEC YourProcedure") >>>> .build(); >>>> >>>> SingleOutputStreamOperator<Row> stream = >>>> env.fromElements(Row.of("a"), Row.of("b")); >>>> sink.emitDataStream(stream); >>>> >>>> Or something else? >>>> >>>> Best, >>>> Austin >>>> >>>> >>>> >>>> >>>> On Tue, Apr 20, 2021 at 11:10 AM Sambaran <sambaran2...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> I am currently using JDBCAppendTableSink to execute database stored >>>>> procedures from flink to populate data to external tables using >>>>> SingleOutputStreamOperator (version 1.7). Now we are trying to update to >>>>> Flink 1.11/ later and found JDBCAppendTableSink has been removed, >>>>> currently >>>>> when looking for an alternative I could not find any suitable approach >>>>> which would call database stored procedure. Is there any alternative >>>>> approach to resolve this? >>>>> >>>>> Regards >>>>> Sambaran >>>>> >>>>