Hi Vignesh, I'm adding Aljoscha to the thread, he might have an idea how to solve this with the existing Flink APIs (the closest idea I had was the N-ary stream operator, but I guess that doesn't support backpressuring individual upstream operators -- side inputs would be needed for that?)
The only somewhat feasible idea I came up with, which only makes sense if you don't need any exactly once guarantees, is implementing your own Kafka connector (or forking the existing Kafka connector in Flink (then you could also get exactly once)). In this custom Kafka connector, you could, conceptually have two Kafka consumers each feeding messages into their bounded queue. A third thread is always emptying the messages from the queue with priority. Best, Robert On Thu, Oct 29, 2020 at 1:29 PM Vignesh Ramesh <vicki.ram...@gmail.com> wrote: > Hi, > > I have a flink pipeline which reads from a kafka topic does a map > operation(builds an ElasticSearch model) and sinks it to Elasticsearch > > *Pipeline-1:* > > Flink-Kafka-Connector-Consumer(topic1) (parallelism 8) -> Map (parallelism > 8) -> Flink-Es-connector-Sink(es1) (parallelism 8) > > Now i want some messages to be prioritized(processed quickly not > necessarily in any order). I am okay in creating a new topic and placing > the priority messages in it (or) do a partition based buckets(Ex: > https://github.com/riferrei/bucket-priority-pattern i don't think it's > possible in flink kafka connector since partition assignment is present > inside FlinkKafkaConsumerBase ). > > *I tried the below solution:* > > I created another topic (topic2 in which i placed the priority messages) > and with it a new Flink pipeline > > *Pipeline-2:* > > Flink-Kafka-Connector-Consumer(topic2) (parallelism 8) -> Map (parallelism > 8) -> Flink-Es-connector-Sink(es1) (parallelism 8) > > But the problem is, I want to consume topic2 as soon as possible. I can > have a delay/slowness in topic1 because of that. If there is no message in > topic2 then topic1 should be given more priority. But in the above case > both the pipelines are getting processed equally. Increasing the > parallelism of pipeline-2 to a big number doesn't help as when there is no > message in topic2 then topic1 is still very slow(parallelism of topic 2 is > wasted). > > How can i achieve this using Flink Kafka connector? Is it possible to > achieve it in any other way? > > > Regards, > > Vignesh >