senegalo edited a comment on pull request #12056:
URL: https://github.com/apache/flink/pull/12056#issuecomment-629626611


   @aljoscha  & @austince 
   So i pushed some new changes and let me explain my endeavors for the last 3 
hours !
   
   ### My Goal
   * combine body / correlation id parsing one go
   * conform with the changes made by #12093 
   * having a 1 to N relation between an AMQP Delivery and the parsed records 
that would be passed to the collector.
   
   ### Why i failed to implement the suggestion
   
   I did exactly what you described above: 
   * i passed the collector to the interface `RMQDeserializationSchema` 
   * When processMessage is called it would extract the record(s) and the 
correlation id then call the collector newly implemented method`collect(OUT 
records, String correlationID)` 
   * The collector would then stash the correlationId in a private var for me 
to use it in the `synchronized` block. 
   
   The problem is that "or at least to my understanding" if you call `collect` 
on the collector the data is already out of the source operator. 
   This would constitute a problem if the `autoAck` is false because we need to 
decide if we are going to add the record(s) to the collector or not based on if 
we've seen this ID before.
   Since we were doing both in one go it was impossible without a lot of 
hacking around in the code.
   
   ### Alternative solution pushed
   
   *  `RMQDeserializationSchema` would in one go deserialize both the record(s) 
and the correlation ID then return an instance of `RMQDeserializedMessage` 
which is just a wrapper class around those values.
   * the `RMQSource` would call it's `parseMessage` method that would decide 
which method to use to deserialize the message either the old way or using the 
`RMQDeserializationSchema` then return for either one of those an instance of 
`RMQDeserializedMessage`. 
   * If `autoAck` is false then the synchronized block could easily access the 
`correlationID` from the `RMQDeserializedMessage` using the `getCorrelationID` 
metohd.
   * The collector collects the record(s) from the 
`RMQDeserializedMessage#getMessages` which returns a `List<T>`
   * Finally the `RMQCollector` now has a `collect(List<OUT> records)` where i 
just iterate over the records produced by the single AMQP delivery and call the 
normal `collect(OUT record)`
   
   Hope that solution makes sense.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to