gomerudo opened a new issue, #31400:
URL: https://github.com/apache/beam/issues/31400

   ### What happened?
   
   
   When running a streaming job (with DirectRunner locally and with 
DataflowRunner on GCP) that uses the apache_beam.io.kafka.ReadFromKafka 
connector without `max_num_records`, the job does not process any information 
and instead gets trapped in an infinite loop of creating consumers that 
subscribe and get assigned a partition and offset but do not process any 
information. We are forcing `auto.offset.reset = earliest`.
   
   We verified that when setting `max_num_records` the job runs and process the 
information correctly both locally and on DataFlow. All of this makes us 
conclude that this is not a GCP issue but rather a Beam one.
   
   We noticed the infinite loop in the logs and we also noticed that Lenses 
never reports active members of the consumer group:
   
   
![image](https://github.com/apache/beam/assets/5495942/aeb6c311-0d53-410d-8c12-db0b24114a44)
   
![image](https://github.com/apache/beam/assets/5495942/46a0f513-f48c-448c-a264-7e100b463b65)
   
   We have tried the default Kafka configurations as well as custom ones. I'm 
just sharing the latest:
   
   ```
               pipeline
               | "ReadFromStream" >> apache_beam.io.kafka.ReadFromKafka(
                   consumer_config={  # Also tested with a single broker
                       "bootstrap.servers": 
"kafka-1782273228-1-1908664276.wmt-uscentral.kafka-v2-qarth-reg-stg.ms-df-messaging.prod-wmt-uscentral-4.prod.walmart.com:9092,kafka-1782274279-1-1908664354.wmt-uscentral.kafka-v2-qarth-reg-stg.ms-df-messaging.prod-wmt-uscentral-5.prod.walmart.com:9092,kafka-1782274320-1-1908664432.wmt-uscentral.kafka-v2-qarth-reg-stg.ms-df-messaging.prod-wmt-uscentral-6.prod.walmart.com:9092",
                       "auto.offset.reset": "earliest",
                       "fetch.max.bytes": "52428800",
                       "fetch.min.bytes": "1",
                       "fetch.max.wait.ms": "1000",
                       "max.poll.interval.ms": "20000",
                       "max.poll.records": "10",
                       "request.timeout.ms": "30000",
                       "session.timeout.ms": "45000",
                       "timeout.ms": "10000",
                       "group.id": "test-group-id",
                       "heartbeat.interval.ms": "200",
                       "reconnect.backoff.ms": "100",
                       "reconnect.backoff.max.ms": "10000",
                   },
                   topics=["some-topic-i-cannot-share"],
                   with_metadata=True,
                   # max_num_records=1000  # For testing only
   ``
   
   This does not seem to be a problem of our Kafka Topic, since custom python 
clients (that use kafka-python) run successfully with the exact same Kafka 
configuration.
   
   Beam SDK language: Python
   Beam SDK version: 2.52.0
   
   Any feedback is greatly appreciated.
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to