Hey.

As far as I see, you're not overriding functions like open,
setRuntimeContext, snapshotState, initializeState - the calls needs to
be passed to the inner sink function.

pon., 2 sie 2021 o 19:31 Rion Williams <rionmons...@gmail.com> napisał(a):
>
> Hi again Maciek (and all),
>
> I just recently returned to start investigating this approach, however I 
> can't seem to get the underlying invocation to work as I would normally 
> expect. I'll try to share a bit more as what I currently have and perhaps I'm 
> just missing something minor that someone may be able to spot.
>
> To reiterate - what I'm attempting to do is take a stream of events flowing 
> through, specific types of entities are extracted from these events into 
> multiple side-outputs, and these side-outputs are passed to a sync that will 
> write them via JDBC using logic specific to that entity. What I am aiming to 
> achieve is being able to capture a single record that may be problematic and 
> avoid a poison pill to throw onto a dead-letter queue (Kafka). I understand 
> this would mean limiting batching sizes to a single record, however I'm 
> assuming that the connections themselves could be pooled possibly to avoid 
> opening up a new connection per call. If this isn't the case, is there a way 
> to handle that (or would I need to implement my own sync).
>
> ```
> val users = Tags.users
>         parsedChangelogs
>             .getSideOutput(users)
>             .addSink(PostgresSink.fromEntityType(users.typeInfo, parameters))
>             .uid("sync-${users.id}-to-postgres")
>             .name("sync-${users.id}-to-postgres")
>
> val addresses = Tags.addresses
>         parsedChangelogs
>             .getSideOutput(addresses)
>             .addSink(PostgresSink.fromEntityType(addresses.typeInfo, 
> parameters))
>             .uid("sync-${addresses.id}-to-postgres")
>             .name("sync-${addresses.id}-to-postgres")
> ```
>
> And the dynamic sink (that would associate a given entity to the necessary 
> calls made to the database) looks a bit like this:
>
> ```
> fun <T: Any> fromEntityType(typeInfo: TypeInformation<T>, parameters: 
> ParameterTool): SinkFunction<T> {
>         val metadata = getQueryMetadataFromType(typeInfo)
>
>         return JdbcSink
>             .sink(
>                 metadata.query,
>                 metadata.statement,
>                 getJdbcExecutionOptions(parameters),
>                 JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
>                     .withDriverName("org.postgresql.Driver")
>                     .withUrl(buildConnectionString(parameters))
>                     .build(),
>             )
>     }
> ```
>
> I've tried several, a naive wrapper approach that I attempted looked 
> something like this:
>
> ```
> class DlqWrapper<T>(private val sink: SinkFunction<T>, val parameters: 
> ParameterTool): SinkFunction<T> {
>     private val logger = LoggerFactory.getLogger(DlqSink::class.java)
>     private val dlqSink: SinkFunction<String> = ...
>
>     override fun invoke(value: T, context: SinkFunction.Context) {
>         try {
>             sink.invoke(value, context)
>         }
>         catch (ex: Exception) {
>             logger.error("Encountered sink exception. Sending message to dead 
> letter queue. Value: $value. Exception: ${ex.message}")
>             val payload = Gson().toJsonTree(value).asJsonObject
>             payload.addProperty("exception", ex.message)
>
>             dlqSink.invoke("$payload", context)
>         }
>     }
> }
> ```
>
> After doing this, it doesn't look like when the invoke calls are made that 
> it's actually attempting to perform the JDBC calls to insert the records into 
> those sources. I'm not entirely sure if this is related specifically for how 
> the JdbcSink is wrapped (via the GenericJdbcSink, etc.).
>
> I had seen several posts around involving the use of an 
> InvocationHandler/Proxy, etc. but I'm not sure if that should be necessary 
> for handling this type of functionality. Any ideas/thoughts/examples would be 
> greatly appreciated.
>
> Thanks,
>
> Rion
>
> On 2021/07/14 15:47:18, Maciej Bryński <mac...@brynski.pl> wrote:
> > This is the idea.
> > Of course you need to wrap more functions like: open, close,
> > notifyCheckpointComplete, snapshotState, initializeState and
> > setRuntimeContext.
> >
> > The problem is that if you want to catch problematic record you need
> > to set batch size to 1, which gives very bad performance.
> >
> > Regards,
> > Maciek
> >
> > śr., 14 lip 2021 o 17:31 Rion Williams <rionmons...@gmail.com> napisał(a):
> > >
> > > Hi Maciej,
> > >
> > > Thanks for the quick response. I wasn't aware of the idea of using a 
> > > SinkWrapper, but I'm not quite certain that it would suit this specific 
> > > use case (as a SinkFunction / RichSinkFunction doesn't appear to support 
> > > side-outputs). Essentially, what I'd hope to accomplish would be to pick 
> > > up when a bad record could not be written to the sink and then offload 
> > > that via a side-output somewhere else.
> > >
> > > Something like this, which is a very, very naive idea:
> > >
> > > class PostgresSinkWrapper<T>(private val sink: SinkFunction<T>): 
> > > RichSinkFunction<T>() {
> > >     private val logger = 
> > > LoggerFactory.getLogger(PostgresSinkWrapper::class.java)
> > >
> > >     override fun invoke(value: T, context: SinkFunction.Context) {
> > >         try {
> > >             sink.invoke(value, context)
> > >         }
> > >         catch (exception: Exception){
> > >             logger.error("Encountered a bad record, offloading to 
> > > dead-letter-queue")
> > >             // Offload bad record to DLQ
> > >         }
> > >     }
> > > }
> > >
> > > But I think that's basically the gist of it. I'm just not sure how I 
> > > could go about doing this aside from perhaps writing a custom process 
> > > function that wraps another sink function (or just completely rewriting 
> > > my own JdbcSink?)
> > >
> > > Thanks,
> > >
> > > Rion
> > >
> > >
> > >
> > >
> > >
> > > On Wed, Jul 14, 2021 at 9:56 AM Maciej Bryński <mac...@brynski.pl> wrote:
> > >>
> > >> Hi Rion,
> > >> We have implemented such a solution with Sink Wrapper.
> > >>
> > >>
> > >> Regards,
> > >> Maciek
> > >>
> > >> śr., 14 lip 2021 o 16:21 Rion Williams <rionmons...@gmail.com> 
> > >> napisał(a):
> > >> >
> > >> > Hi all,
> > >> >
> > >> > Recently I've been encountering an issue where some external 
> > >> > dependencies or process causes writes within my JDBCSink to fail (e.g. 
> > >> > something is being inserted with an explicit constraint that never 
> > >> > made it's way there). I'm trying to see if there's a pattern or 
> > >> > recommendation for handling this similar to a dead-letter queue.
> > >> >
> > >> > Basically - if I experience a given number of failures (> max retry 
> > >> > attempts) when writing to my JDBC destination, I'd like to take the 
> > >> > record that was attempted and throw it into a Kafka topic or some 
> > >> > other destination so that it can be evaluated at a later time.
> > >> >
> > >> > Are there any well defined patterns or recommended approaches around 
> > >> > this?
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Rion
> > >>
> > >>
> > >>
> > >> --
> > >> Maciek Bryński
> >
> >
> >
> > --
> > Maciek Bryński
> >

Reply via email to