Hello,
we are currently trying to implement a JDBC Sink in Stateful Functions
as documented here:
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/io-module/flink-connectors.html
However, when starting the application we are running into this error:
--------------------------------------------------------------------
java.lang.IllegalStateException: objects can not be reused with JDBC
sink function at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
at
org.apache.flink.connector.jdbc.JdbcSink.lambda$sink$97f3ed45$1(JdbcSink.java:67)
~[?:?]
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:131)
~[?:?]
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:113)
~[?:?] at
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:50)
~[?:?]
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
at
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
~[flink-dist_2.12-1.11.3.ja
r:1.11.3]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
~[flink-dist_2.12-1.11.3
.jar:1.11.3]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_275]
2021-02-04 13:59:49,121 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to
restart to recover the failed task 31284d56d1e2112b0f20099ee448a6a9_0.
2021-02-04 13:59:49,122 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - 8 tasks should be re
started to recover the failed task 31284d56d1e2112b0f20099ee448a6a9_0.
-------------------------------------------------------------------
We tested the same sink in a regular flink application under similar
circumstances (Protobuf Objects etc.) and it works just fine. As a
solution we have tried to set the parameter "pipeline.object-reuse" in
the flink-conf.yaml of the stateful functions application to true, but
that had no effect on the above error message. Stateful Functions
version is 2.2.2
Did anyone else see this problem before?
Relevant Application Code:
-------- MyMessageSink.java -----
public class MyMessageSink {
public static final EgressIdentifier<MyMessage> SINK_ID =
new EgressIdentifier<>("mynamespace", "MyMessageSink",
MyMessage.class);
public EgressSpec<MyMessage> getEgressSpec() {
JdbcConnectionOptions jdbcConnectionOptions = new
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("org.postgresql.Driver")
.withUrl("jdbc:postgresql://localhost:5432/mydb?user=foo&password=bar")
.build();
JdbcStatementBuilder<MyMessage> jdbcStatementBuilder =
(statementTemplate, myMessage) -> {
statementTemplate.setString(1, myMessage.getFirstField());
statementTemplate.setString(2,
accountSessionMessage.getSecondField());
};
SinkFunction<MyMessage> sinkFunction = JdbcSink.sink(
"INSERT INTO my_table (first_field, second_field)
VALUES( ?, ? ) ON CONFLICT (first_field, second_field) DO NOTHING;",
jdbcStatementBuilder,
jdbcConnectionOptions
);
return new SinkFunctionSpec<>(
SINK_ID,
sinkFunction
);
}
}
---------------------------------------
----------- Module.java ---------------
...
MyMessageSink myMessageSink = new MyMessageSink();
binder.bindEgress(myMessageSink.getEgressSpec());
...
----------------------------------------------
Best regards,
Jan
--
neuland – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen
Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de
https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi
Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501