Hi,

I've have this simple pipeline to test the kafka IO. It reads from kafka
and log it.
```
(
pipeline
| ReadFromKafka(
                    consumer_config={
                        "bootstrap.servers": bootstrap_servers,
                        "security.protocol": "SASL_SSL",
                        "sasl.mechanism": "SCRAM-SHA-512",
                        "sasl.jaas.config":
f'org.apache.kafka.common.security.scram.ScramLoginModule required
username="{sasl_username}" password="{sasl_password}";',
                    },
                    topics=["test_topic"],
                    with_metadata=False,
                    expansion_service=default_io_expansion_service(
                        append_args=[
                            '--defaultEnvironmentType=PROCESS',

"--defaultEnvironmentConfig={\"command\":\"/opt/apache/beam/boot\"}",
                        ]
                    ),
| "logging" >> beam.Map(lambda args: logging.info(args))
```
However, there's no log line for the event in task manager, and I also
found that there are multiple log lines saying:
```
Resetting offset for partition test_topic-1 to offset 8706.
```
the reset offset keeps the same all the time, which looks like the kafka IO
never actually progress? But I also unable to find any other error log as
well. Wondering if anyone can help me with this issue ? Thanks


Sincerely,
Lydian Lee

Reply via email to