alexmreis opened a new issue, #22809:
URL: https://github.com/apache/beam/issues/22809

   ### What happened?
   
   Consider the trivial example pipeline below:
   
   ```python
   """
   Reproduce the KafkaIO + Unbounded source + streaming mode bug.
   """
   
   import logging
   import os
   
   import apache_beam as beam
   from apache_beam.io.external import kafka
   from apache_beam.options.pipeline_options import PipelineOptions
   from apache_beam.transforms import window
   
   logging.getLogger().setLevel(logging.DEBUG)
   
   def kafka_consumer_config():
       """
       Returns config for the KafkaIO source.
       """
       return {
               "bootstrap.servers": os.getenv("KAFKA_BROKER_URL"),
               "auto.offset.reset": "latest",
               "security.protocol": "SASL_SSL",
               "sasl.mechanism": "PLAIN",
               "group.id": os.getenv("KAFKA_GROUP_ID"),
               "enable.auto.commit": "true",
               "sasl.jaas.config": 
f"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"{os.getenv('KAFKA_SASL_USERNAME')}\" 
password=\"{os.getenv('KAFKA_SASL_PASSWORD')}\";",
               }
   
   with beam.Pipeline(options=PipelineOptions()) as pipeline:
       _ = (
           pipeline
           | "Read from kafka" >> kafka.ReadFromKafka(
               kafka_consumer_config(),
               [os.getenv("KAFKA_TOPIC")])
           | "Fixed window 5s" >> beam.WindowInto(window.FixedWindows(5))
           | "Group by key" >> beam.GroupByKey()
           | "Print" >> beam.Map(lambda t: logging.warning("%s - %s", t[0], 
t[1]))
       )
   ```
   
   When this pipeline is run at least in these 3 environments:
   - `DataflowRunner` (streaming mode)
   - `FlinkRunner` (streaming mode, locally, not on cluster, haven't tested 
with cluster)
   - `DirectRunner` (streaming mode)
   
   All of them get stuck on the `GroupByKey` PTransform. The trigger is never 
fired apparently, though it is impossible to see it from the logging I get.
   
   When adding `max_num_records` to the `ReadFromKafka` step, effectively 
transforming the source collection into a bounded collection, this works, both 
in batch and streaming mode, in all of the environments listed above.
   
   Data is timestamped in Kafka using process time, although it is unclear from 
the documentation whether the KafkaIO adapter in Beam automatically timestamps 
entries in the source PCollection it generates.
   
   I have also tried timestamping them manually using `with_metadata` and the 
`msg.timestamp` property returned, to no avail.
   
   If I look at the Beam test suite, I see the `ReadFromKafka` PTransform is 
only tested without windowing and without grouping in the test suite. Should 
this maybe be added?
   
   This impacts all python workloads running on Kafka, and it seems rather 
surprising that no one else has run into this yet.
   
   
   ### Issue Priority
   
   Priority: 3
   
   ### Issue Component
   
   Component: io-java-kafka


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to