Please use

On Thu, Jul 25, 2024 at 2:48 AM Dhirendra Singh <>

> Hello Beam Devs,
> Thank you for your help.
> I have been trying to connect to the Kafka Enabled Azure event hub
> using Dataflow beam code using Python.
> Apache Beam version: 2.56.0
> Python version: 3.10.13 (main, Feb  1 2024, 05:36:44) [GCC 12.2.0]
> The code is as show below:
> import argparse
> import logging
> import sys
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions,
> SetupOptions, StandardOptions
> from import ReadFromKafka
> from apache_beam.transforms import window
> from apache_beam.transforms import trigger
> from apache_beam.transforms.trigger import AfterProcessingTime,
> Repeatedly, AccumulationMode
> class KafkaToGCSOptions(PipelineOptions):
>     @classmethod
>     def _add_argparse_args(cls, parser):
>         #parser.add_argument('--bootstrapServer', required=True,
> help='Kafka bootstrap servers')
>         parser.add_argument('--inputTopic', required=True, help='Input
> Kafka topic')
>         parser.add_argument('--output', required=True, help='Output
> GCS bucket path')
>         parser.add_argument('--output2', required=True, help='Output
> GCS bucket path2')
>         #parser.add_argument('--authenticationString', required=True,
> help='SASL JAAS config for Kafka authentication')
> def run(argv=None):
>     #options = PipelineOptions(argv)
>     #pipeline_options = options.view_as(KafkaToGCSOptions)
>     parser = argparse.ArgumentParser()
>     known_args, pipeline_args = parser.parse_known_args(argv)
>     pipeline_options = PipelineOptions(pipeline_args,
> save_main_session=True)
>     options = pipeline_options.view_as(KafkaToGCSOptions)
>     options.view_as(StandardOptions).streaming = True
>     kafka_config = {
>         'bootstrap.servers': '',
>         'security.protocol': 'SASL_SSL',
>         'sasl.mechanism': 'PLAIN',
>         'sasl.jaas.config':
> ' required
> username="$ConnectionString"
> password="Endpoint=sb://
>         'auto.offset.reset': 'earliest'
>     }
>     wordsList = ["1", "2", "3", "4"]
>     with beam.Pipeline(options=options) as pipeline:
>         msg_kv_bytes  = (
>             pipeline
>             | 'Read from Kafka' >> ReadFromKafka(
>                 topics=[options.inputTopic],
>                 consumer_config=kafka_config,
>                 # Only for Direct Runner and when testing
>                 #max_num_records=10,
>                 with_metadata=True
>             )
>             # | 'Fixed window 5s' >> beam.WindowInto(
>             #     window.FixedWindows(5),
>             #     trigger=Repeatedly(AfterProcessingTime(5)),
>             #     accumulation_mode=AccumulationMode.DISCARDING
>             # )
>         )
>         (msg_kv_bytes | 'Print results' >> beam.Map(lambda x:
> (x)))
>         (msg_kv_bytes | 'Write to GCS' >>
>         (msg_kv_bytes | 'Write to GCS2' >>
>     pipeline_result =
>     pipeline_result.wait_until_finish()
> if __name__ == '__main__':
>     print("Streaming Pub/Sub messages to BigQuery...")
>     print(f"Python version: {sys.version}")
>     print(f"Apache Beam version: {beam.__version__}")
>     print("running to BigQuery...")
>     logging.getLogger().setLevel(logging.INFO)
>     run()
> ########################################################################
> Every time I try to run this using Dataflow runner  I am getting the
> error attached in the email
> ValueError: GroupByKey cannot be applied to an unbounded PCollection
> with global windowing and a default trigger
> I tried troubleshooting from my end, looked at various links:
> Is this issue still pending or am I doing something wrong?
> Appreciate your help.
> --
> Warm Regards
> Dhiren

Reply via email to