Hi all,

I’ve been playing around with a proof-of-concept application with Flink to
assist a colleague of mine. The application is fairly simple (take in a
single input and identify various attributes about it) with the goal of
outputting those to separate tables in Postgres:

object AttributeIdentificationJob {
    @JvmStatic
    fun main(args: Array<String>) {
        val stream = StreamExecutionEnvironment.getExecutionEnvironment()

        stream
            .addSource(ReadFromKafka())
            .process(IdentifyAttributesFunction())
            .addSink(DynamicJdbcHere())

        // execute program
        stream.execute("Attribute Identification")
    }
}

Considering my attributes may be of varying types (all implementing an
Attribute interface), I don't know if the existing JdbcSink functionality
or some variant of it (i.e. one of the dynamic ones that I see listed)
could handle this functionality. Essentially for a given "bundle" of
records, I'd need to ensure that each respective type of attribute was
upserted into its corresponding table within a Postgres database.

Is that something that the connector can handle on it's own? Or would I
need to implement my own RichSinkFunction<Collection<Attribute>> that could
handle opening a connection to Postgres and dynamically generating the
appropriate UPSERT statements to handle sending the records? As a follow up
to that, if I did need to write my own RichSinkFunction, would I need to
implement my own checkmarking for resilience purposes or does that come
along for the ride for RichSinkFunctions?

Any insight or approaches would be welcome!

Thanks,

Rion

Reply via email to