Hello,

we are currently trying to implement a JDBC Sink in Stateful Functions as documented here: https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/io-module/flink-connectors.html

However, when starting the application we are running into this error:

--------------------------------------------------------------------

java.lang.IllegalStateException: objects can not be reused with JDBC sink function at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) ~[flink-dist_2.12-1.11.3.jar:1.11.3]         at org.apache.flink.connector.jdbc.JdbcSink.lambda$sink$97f3ed45$1(JdbcSink.java:67) ~[?:?]         at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:131) ~[?:?]         at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:113) ~[?:?]                                  at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:50) ~[?:?]         at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) ~[flink-dist_2.12-1.11.3.jar:1.11.3]         at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) ~[flink-dist_2.12-1.11.3.jar:1.11.3]         at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48) ~[flink-dist_2.12-1.11.3.jar:1.11.3]         at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) ~[flink-dist_2.12-1.11.3.ja
r:1.11.3]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506) ~[flink-dist_2.12-1.11.3.jar:1.11.3]         at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) ~[flink-dist_2.12-1.11.3
.jar:1.11.3]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) ~[flink-dist_2.12-1.11.3.jar:1.11.3]         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526) ~[flink-dist_2.12-1.11.3.jar:1.11.3]         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) ~[flink-dist_2.12-1.11.3.jar:1.11.3]         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) ~[flink-dist_2.12-1.11.3.jar:1.11.3]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_275]
2021-02-04 13:59:49,121 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to
 restart to recover the failed task 31284d56d1e2112b0f20099ee448a6a9_0.
2021-02-04 13:59:49,122 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 8 tasks should be re
started to recover the failed task 31284d56d1e2112b0f20099ee448a6a9_0.

-------------------------------------------------------------------

We tested the same sink in a regular flink application under similar circumstances (Protobuf Objects etc.) and it works just fine. As a solution we have tried to set the parameter "pipeline.object-reuse" in the flink-conf.yaml of the stateful functions application to true, but that had no effect on the above error message. Stateful Functions version is 2.2.2


Did anyone else see this problem before?


Relevant Application Code:

-------- MyMessageSink.java -----

public class MyMessageSink {
    public static final EgressIdentifier<MyMessage> SINK_ID =
            new EgressIdentifier<>("mynamespace", "MyMessageSink", MyMessage.class);

    public EgressSpec<MyMessage> getEgressSpec() {
        JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withDriverName("org.postgresql.Driver")
.withUrl("jdbc:postgresql://localhost:5432/mydb?user=foo&password=bar")
                .build();
        JdbcStatementBuilder<MyMessage> jdbcStatementBuilder = (statementTemplate, myMessage) -> {
            statementTemplate.setString(1, myMessage.getFirstField());
            statementTemplate.setString(2, accountSessionMessage.getSecondField());
        };
        SinkFunction<MyMessage> sinkFunction = JdbcSink.sink(
                "INSERT INTO my_table (first_field, second_field) VALUES( ?, ? ) ON CONFLICT (first_field, second_field) DO NOTHING;",
                jdbcStatementBuilder,
                jdbcConnectionOptions
        );
        return new SinkFunctionSpec<>(
                SINK_ID,
                sinkFunction
        );
    }

}

---------------------------------------


----------- Module.java ---------------

...

MyMessageSink myMessageSink = new MyMessageSink();
binder.bindEgress(myMessageSink.getEgressSpec());

...

----------------------------------------------


Best regards,

Jan

--
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501

Reply via email to