Hi Austin,

Many thanks, we indeed were using the Api incorrectly. Now in local tests
we can see the data population happened in the postgres.

Have a nice day!

Regards
Sambaran

On Tue, Apr 20, 2021 at 8:11 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi Sambaran,
>
> I'm not sure if this is the best approach, though I don't know your full
> use case/ implementation.
>
> What kind of error do you get when trying to map into a PreparedStatement?
> I assume you tried something like this?
>
> SingleOutputStreamOperator<Row> stream =
> env.fromElements(Row.of("YourProcedureA"), Row.of("YourProcedureB"));
>
> stream.addSink(JdbcSink.sink(
>    "EXEC ?",
>    (preparedStatement, row) -> {
>       // extend `preparedStatement` with row info
>       preparedStatement.setString(0, (String) row.getField(0));
>    },
>    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
>       .withUrl("jdbc:derby:memory:ebookshop")
>       .withDriverName("org.apache.derby.jdbc.EmbeddedDriver")
>       .build()));
>
> Best,
> Austin
>
> On Tue, Apr 20, 2021 at 12:42 PM Sambaran <sambaran2...@gmail.com> wrote:
>
>> Hi Austin,
>>
>> We are using this for jdbc interfacing to populate postgres tables based
>> on the data coming from an event source.
>>
>> We tried with the approach mentioned in the doc
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html
>> but did not find a suitable way to map SingleOutputStreamOperator<Row> .
>> Can you please let me know if this is the right approach and if yes, how do
>> we map the SingleOutputStreamOperator<Row> to the preparedstatement in
>> JdbcStatementBuilder?
>>
>> Thanks for your help!
>>
>> Regards
>> Sambaran
>>
>> On Tue, Apr 20, 2021 at 6:30 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Great, thanks for the clarification! I'm checking with others now. Are
>>> you using other parts of the Table/SQL APIs, or just this for JDBC
>>> interfacing?
>>>
>>> Best,
>>> Austin
>>>
>>> On Tue, Apr 20, 2021 at 12:20 PM Sambaran <sambaran2...@gmail.com>
>>> wrote:
>>>
>>>> Hi Austin,
>>>>
>>>> Thanks for replying. This is exactly as you mentioned here. Do we have
>>>> a way to execute the procedure with 1.11 or upper version as
>>>> JDBCAppendTableSink is no longer available with these?
>>>>
>>>> Regards
>>>> Sambaran
>>>>
>>>> On Tue, Apr 20, 2021 at 6:11 PM Austin Cawley-Edwards <
>>>> austin.caw...@gmail.com> wrote:
>>>>
>>>>> Hey Sambaran,
>>>>>
>>>>> I'm not too familiar with the 1.7 JDBCAppendTableSink, but to make
>>>>> sure I understand what you're current solution looks like, it's something
>>>>> like the following, where you're triggering a procedure on each element of
>>>>> a stream?
>>>>>
>>>>>       JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
>>>>>             .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
>>>>>             .setDBUrl("jdbc:derby:memory:ebookshop")
>>>>>             .setQuery("EXEC YourProcedure")
>>>>>             .build();
>>>>>
>>>>>         SingleOutputStreamOperator<Row> stream =
>>>>> env.fromElements(Row.of("a"), Row.of("b"));
>>>>>         sink.emitDataStream(stream);
>>>>>
>>>>> Or something else?
>>>>>
>>>>> Best,
>>>>> Austin
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Apr 20, 2021 at 11:10 AM Sambaran <sambaran2...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am currently using JDBCAppendTableSink to execute database stored
>>>>>> procedures from flink to populate data to external tables using
>>>>>>  SingleOutputStreamOperator (version 1.7). Now we are trying to update to
>>>>>> Flink 1.11/ later and found JDBCAppendTableSink has been removed, 
>>>>>> currently
>>>>>> when looking for an alternative I could not find any suitable approach
>>>>>> which would call database stored procedure. Is there any alternative
>>>>>> approach to resolve this?
>>>>>>
>>>>>> Regards
>>>>>> Sambaran
>>>>>>
>>>>>

Reply via email to