Hi Clay, could you maybe share the source code of com.maalka.flink.sinks.MaalkaPostgresSink with us? It seems that this sink uses a lambda which is not serializable. Maybe it holds a reference to some non Serializable class as part of its closure.
Cheers, Till On Tue, Feb 16, 2021 at 8:58 PM Clay Teeter <clay.tee...@maalka.com> wrote: > Thanks Till, the tickets and links were immensely useful. With that i was > able to make progress and even get things to compile. However, when i run > things a serializable exception is thrown. (see below) > > .addSink(JdbcSink.sink[SignableTableSchema]( >> addIntervalToInsertStatement(insertStatement, interval), >> (ps: PreparedStatement, rd: SignableTableSchema) => { >> ps.setString(1, rd.data_processing_id) >> ps.setTimestamp(2, rd.crc) >> ps.setString(3, rd.command) >> ps.setString(4, rd.result) >> ps.setOptionalString(5, rd.message) >> ps.setString(6, rd.arguments) >> ps.setOptionalString(7, rd.validatorUUID) >> }, >> getJdbcExecutionOptions, >> getJdbcOptions(interval, insertStatement) // <-- This is line 376 >> )) >> >> Where i set the executionOptions to behave in a bachfull way. > > def getJdbcExecutionOptions: JdbcExecutionOptions = { > JdbcExecutionOptions.builder() > .withBatchIntervalMs(1000) > .withBatchSize(1000) > .withMaxRetries(10) > .build > } > > > Any suggestions? > > [info] org.apache.flink.api.common.InvalidProgramException: The >> implementation of the AbstractJdbcOutputFormat is not serializable. The >> object probably contains or references non serializable fields. >> [info] at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164) >> [info] at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) >> [info] at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) >> [info] at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000) >> [info] at >> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203) >> [info] at >> org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243) >> [info] at >> org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1110) >> [info] at >> com.maalka.flink.sinks.MaalkaPostgresSink$.insertAnalyticData(MaalkaPostgresSink.scala:376) >> [info] at >> com.maalka.flink.sinks.MaalkaPostgresSink.process(MaalkaPostgresSink.scala:262) >> [info] at >> com.maalka.flink.sinks.MaalkaPostgresSink.process$(MaalkaPostgresSink.scala:250) >> [info] ... >> [info] Cause: java.io.NotSerializableException: Non-serializable lambda >> [info] at >> com.maalka.flink.sinks.MaalkaPostgresSink$$$Lambda$22459/0x0000000809678c40.writeObject(Unknown >> Source) >> [info] at >> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >> Method) >> [info] at >> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> [info] at >> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> [info] at java.base/java.lang.reflect.Method.invoke(Method.java:566) >> [info] at >> java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145) >> [info] at >> java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497) >> [info] at >> java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433) >> [info] at >> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) >> [info] at >> java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379) >> > > > On Tue, Feb 16, 2021 at 6:11 PM Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi Clay, >> >> I am not a Table API expert but let me try to answer your question: >> >> With FLINK-17748 [1] the community removed the registerTableSink in >> favour of the connect API. The connect API has been deprecated [2] because >> it was not well maintained. Now the recommended way for specifying sinks is >> to use Flink's DDL [3]. Unfortunately, I couldn't find an easy example on >> how to use the DDL. Maybe Timo or Jark can point you towards a good guide >> on how to register your jdbc table sink. >> >> [1] https://issues.apache.org/jira/browse/FLINK-17748 >> [2] https://issues.apache.org/jira/browse/FLINK-18416 >> [3] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sourceSinks.html >> >> Cheers, >> Till >> >> On Tue, Feb 16, 2021 at 4:42 PM Clay Teeter <clay.tee...@maalka.com> >> wrote: >> >>> Hey all. Hopefully this is an easy question. I'm porting my JDBC >>> postgres sink from 1.10 to 1.12 >>> >>> I'm using: >>> * StreamTableEnvironment >>> * JdbcUpsertTableSink >>> >>> What I'm having difficulty with is how to register the sink with the >>> streaming table environment. >>> >>> In 1.10: >>> >>> tableEnv.registerTableSink( >>>> s"${interval}_maalka_jdbc_output_table", >>>> jdbcTableSink) >>> >>> >>> This method doesn't exist in 1.12, what is the equivalent? >>> >>> Thanks! >>> Clay >>> >>>