Hi Wouter, Dataflow fuses your operations into several stages during execution time. The downstream stage can have inputs only when upstream finishes at least one bundle(a small portion of work that Dataflow gives to the worker). In your case, it seems like your _ConsumeKafkaTopic runs forever and never stops. Thus the operations in the same stage of _ConsumeKafkaTopic can see the output from _ConsumeKafkaTopic immediately but the downstream stages can never have the input there.
One workaround I can come up is to change you _ConsumeKafkaTopic to a stateful dofn. You can give _ConsumeKafkaTopic a variable to identify how many records _ConsumeKafkaTopic outputs once. Let's say the number is 1000. When your output messages exceed 1000, you can save the current consumer position into ValueState and set a now() + 5s processing time timer. In the timer callback, you can output another 1000 records again and set next timer. Does it sound work for you? On Thu, Apr 8, 2021 at 3:47 AM Wouter Zorgdrager <[email protected]> wrote: > Dear Beam community, > > I'm currently trying to set up a Beam pipeline using the PythonSDK. I need > to read from an unbounded Kafka source. My flow looks like this: > [image: flow.png] > It reads event from a Kafka topic, using a (stateless) router the events > get to different PTransforms and the final results get written to a Kafka > topic again. I use side outputs to 'route' and beam.Flatten() to merge > PCollections again. > > In my first attempt, I used the built-in Kafka IO which uses an > ExternalTransform [1]. However, no data was being consumed from my Kafka > topic. I think this was also discussed in the mailing list before [2] and > has to do with this issue [3]. > > In my second attempt, I used this Kafka connector from an external source > [4]. This connector worked initially, where I did not fully implement the > flow as described above. When using this connector, I see messages get > 'stuck' in the dataflow and not being fully processed until the end. I.e. > they get processed until a certain DoFn in the pipeline, but not any > further. Oddly enough, when I get rid of the beam.Flatten() (and therefore > I can't merge collections and just use a linear pipeline) it does work. > Moreover, when a substitute my kafka consumer with a simple beam.Create() > the full pipeline works as well. > > I think the problem is that the Kafka connector I'm using is not a > Splittable DoFn and just blocks completely while consuming [5]. However, > I'm confused that this does work for a linear pipeline (without > flattening). > > To give some more context; I'm planning to deploy this using a > FlinkRunner. Moreover, I have a 'hard' dependency on using Python and > Kafka. In other words, I can't move to another IO framework or programming > language. > > I hope you can help me out, provide some suggestions or workarounds. > > Thanks in advance! > Regards, > Wouter > > [1] - > https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.kafka.html > [2] - > https://lists.apache.org/x/thread.html/r9c74a8a7efa4b14f2f1d5f77ce8f12128cf1071861c1627f00702415@%3Cuser.beam.apache.org%3E > > [3] - https://issues.apache.org/jira/browse/BEAM-11998 > [4] - https://github.com/mohaseeb/beam-nuggets > [5] - > https://github.com/mohaseeb/beam-nuggets/blob/39d2493b161ebbcbff9f4857115d527f6fefda77/beam_nuggets/io/kafkaio.py#L76 > >
