senegalo edited a comment on pull request #12056: URL: https://github.com/apache/flink/pull/12056#issuecomment-629626611
@aljoscha & @austince So i pushed some new changes and let me explain my endeavors for the last 3 hours ! ### My Goal * combine body / correlation id parsing one go * conform with the changes made by #12093 * having a 1 to N relation between an AMQP Delivery and the parsed records that would be passed to the collector. ### Why i failed to implement the suggestion I did exactly what you described above: * i passed the collector to the interface `RMQDeserializationSchema` * When processMessage is called it would extract the record(s) and the correlation id then call the collector newly implemented method`collect(OUT records, String correlationID)` * The collector would then stash the correlationId in a private var for me to use it in the `synchronized` block. The problem is that "or at least to my understanding" if you call `collect` on the collector the data is already out of the source operator. This would constitute a problem if the `autoAck` is false because we need to decide if we are going to add the record(s) to the collector or not based on if we've seen this ID before. Since we were doing both in one go it was impossible without a lot of hacking around in the code. ### Alternative solution pushed * `RMQDeserializationSchema` would in one go deserialize both the record(s) and the correlation ID then return an instance of `RMQDeserializedMessage` which is just a wrapper class around those values. * the `RMQSource` would call it's `parseMessage` method that would decide which method to use to deserialize the message either the old way or using the `RMQDeserializationSchema` then return for either one of those an instance of `RMQDeserializedMessage`. * If `autoAck` is false then the synchronized block could easily access the `correlationID` from the `RMQDeserializedMessage` using the `getCorrelationID` metohd. * The collector collects the record(s) from the `RMQDeserializedMessage#getMessages` which returns a `List<T>` * Finally the `RMQCollector` now has a `collect(List<OUT> records)` where i just iterate over the records produced by the single AMQP delivery and call the normal `collect(OUT record)` Hope that solution makes sense. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org