Rion,

A given JdbcSink can only write to one table, but if the number of tables
involved isn't unreasonable, you could use a separate sink for each table,
and use side outputs [1] from a process function to steer each record to
the appropriate sink.

I suggest you avoid trying to implement a sink.

In general, custom sinks need to implement their own checkpointing, though
there is a generic two phase commit sink you can use as a starting point
for implementing a transactional sink. FYI, the JDBC sink has been reworked
for 1.13 to include exactly-once guarantees based on the XA standard [2].

Regards,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
[2]
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/jdbc/#jdbcsinkexactlyoncesink

On Fri, Mar 5, 2021 at 7:34 PM Rion Williams <rionmons...@gmail.com> wrote:

> Hi all,
>
> I’ve been playing around with a proof-of-concept application with Flink to
> assist a colleague of mine. The application is fairly simple (take in a
> single input and identify various attributes about it) with the goal of
> outputting those to separate tables in Postgres:
>
> object AttributeIdentificationJob {
>     @JvmStatic
>     fun main(args: Array<String>) {
>         val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>
>         stream
>             .addSource(ReadFromKafka())
>             .process(IdentifyAttributesFunction())
>             .addSink(DynamicJdbcHere())
>
>         // execute program
>         stream.execute("Attribute Identification")
>     }
> }
>
> Considering my attributes may be of varying types (all implementing an
> Attribute interface), I don't know if the existing JdbcSink functionality
> or some variant of it (i.e. one of the dynamic ones that I see listed)
> could handle this functionality. Essentially for a given "bundle" of
> records, I'd need to ensure that each respective type of attribute was
> upserted into its corresponding table within a Postgres database.
>
> Is that something that the connector can handle on it's own? Or would I
> need to implement my own RichSinkFunction<Collection<Attribute>> that
> could handle opening a connection to Postgres and dynamically generating
> the appropriate UPSERT statements to handle sending the records? As a
> follow up to that, if I did need to write my own RichSinkFunction, would I
> need to implement my own checkmarking for resilience purposes or does that
> come along for the ride for RichSinkFunctions?
>
> Any insight or approaches would be welcome!
>
> Thanks,
>
> Rion
>

Reply via email to