Hi Clay,

could you maybe share the source code of
com.maalka.flink.sinks.MaalkaPostgresSink with us? It seems that this
sink uses a lambda which is not serializable. Maybe it holds a reference to
some non Serializable class as part of its closure.

Cheers,
Till

On Tue, Feb 16, 2021 at 8:58 PM Clay Teeter <clay.tee...@maalka.com> wrote:

> Thanks Till, the tickets and links were immensely useful.  With that i was
> able to make progress and even get things to compile.  However, when i run
> things a serializable exception is thrown. (see below)
>
> .addSink(JdbcSink.sink[SignableTableSchema](
>>   addIntervalToInsertStatement(insertStatement, interval),
>>   (ps: PreparedStatement, rd: SignableTableSchema) => {
>>     ps.setString(1, rd.data_processing_id)
>>     ps.setTimestamp(2, rd.crc)
>>     ps.setString(3, rd.command)
>>     ps.setString(4, rd.result)
>>     ps.setOptionalString(5, rd.message)
>>     ps.setString(6, rd.arguments)
>>     ps.setOptionalString(7, rd.validatorUUID)
>>   },
>>   getJdbcExecutionOptions,
>>   getJdbcOptions(interval, insertStatement) // <-- This is line 376
>> ))
>>
>>  Where i set the executionOptions to behave in a bachfull way.
>
> def getJdbcExecutionOptions: JdbcExecutionOptions = {
>   JdbcExecutionOptions.builder()
>     .withBatchIntervalMs(1000)
>     .withBatchSize(1000)
>     .withMaxRetries(10)
>     .build
> }
>
>
> Any suggestions?
>
> [info]   org.apache.flink.api.common.InvalidProgramException: The
>> implementation of the AbstractJdbcOutputFormat is not serializable. The
>> object probably contains or references non serializable fields.
>> [info]   at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
>> [info]   at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
>> [info]   at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
>> [info]   at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
>> [info]   at
>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
>> [info]   at
>> org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243)
>> [info]   at
>> org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1110)
>> [info]   at
>> com.maalka.flink.sinks.MaalkaPostgresSink$.insertAnalyticData(MaalkaPostgresSink.scala:376)
>> [info]   at
>> com.maalka.flink.sinks.MaalkaPostgresSink.process(MaalkaPostgresSink.scala:262)
>> [info]   at
>> com.maalka.flink.sinks.MaalkaPostgresSink.process$(MaalkaPostgresSink.scala:250)
>> [info]   ...
>> [info]   Cause: java.io.NotSerializableException: Non-serializable lambda
>> [info]   at
>> com.maalka.flink.sinks.MaalkaPostgresSink$$$Lambda$22459/0x0000000809678c40.writeObject(Unknown
>> Source)
>> [info]   at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> [info]   at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> [info]   at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> [info]   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>> [info]   at
>> java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145)
>> [info]   at
>> java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497)
>> [info]   at
>> java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
>> [info]   at
>> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
>> [info]   at
>> java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
>>
>
>
> On Tue, Feb 16, 2021 at 6:11 PM Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Hi Clay,
>>
>> I am not a Table API expert but let me try to answer your question:
>>
>> With FLINK-17748 [1] the community removed the registerTableSink in
>> favour of the connect API. The connect API has been deprecated [2] because
>> it was not well maintained. Now the recommended way for specifying sinks is
>> to use Flink's DDL [3]. Unfortunately, I couldn't find an easy example on
>> how to use the DDL. Maybe Timo or Jark can point you towards a good guide
>> on how to register your jdbc table sink.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-17748
>> [2] https://issues.apache.org/jira/browse/FLINK-18416
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sourceSinks.html
>>
>> Cheers,
>> Till
>>
>> On Tue, Feb 16, 2021 at 4:42 PM Clay Teeter <clay.tee...@maalka.com>
>> wrote:
>>
>>> Hey all.  Hopefully this is an easy question.  I'm porting my JDBC
>>> postgres sink from 1.10 to 1.12
>>>
>>> I'm using:
>>> * StreamTableEnvironment
>>> * JdbcUpsertTableSink
>>>
>>> What I'm having difficulty with is how to register the sink with the
>>> streaming table environment.
>>>
>>> In 1.10:
>>>
>>>     tableEnv.registerTableSink(
>>>>       s"${interval}_maalka_jdbc_output_table",
>>>>       jdbcTableSink)
>>>
>>>
>>> This method doesn't exist in 1.12, what is the equivalent?
>>>
>>> Thanks!
>>> Clay
>>>
>>>

Reply via email to