Hi all, I’ve been playing around with a proof-of-concept application with Flink to assist a colleague of mine. The application is fairly simple (take in a single input and identify various attributes about it) with the goal of outputting those to separate tables in Postgres:
object AttributeIdentificationJob { @JvmStatic fun main(args: Array<String>) { val stream = StreamExecutionEnvironment.getExecutionEnvironment() stream .addSource(ReadFromKafka()) .process(IdentifyAttributesFunction()) .addSink(DynamicJdbcHere()) // execute program stream.execute("Attribute Identification") } } Considering my attributes may be of varying types (all implementing an Attribute interface), I don't know if the existing JdbcSink functionality or some variant of it (i.e. one of the dynamic ones that I see listed) could handle this functionality. Essentially for a given "bundle" of records, I'd need to ensure that each respective type of attribute was upserted into its corresponding table within a Postgres database. Is that something that the connector can handle on it's own? Or would I need to implement my own RichSinkFunction<Collection<Attribute>> that could handle opening a connection to Postgres and dynamically generating the appropriate UPSERT statements to handle sending the records? As a follow up to that, if I did need to write my own RichSinkFunction, would I need to implement my own checkmarking for resilience purposes or does that come along for the ride for RichSinkFunctions? Any insight or approaches would be welcome! Thanks, Rion