Please use beam.io.fileio.WriteToFiles

On Thu, Jul 25, 2024 at 2:48 AM Dhirendra Singh <dheeru.k.si...@gmail.com>
wrote:

> Hello Beam Devs,
>
> Thank you for your help.
> I have been trying to connect to the Kafka Enabled Azure event hub
> using Dataflow beam code using Python.
> Apache Beam version: 2.56.0
> Python version: 3.10.13 (main, Feb  1 2024, 05:36:44) [GCC 12.2.0]
>
>
> The code is as show below:
>
> import argparse
> import logging
> import sys
>
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions,
> SetupOptions, StandardOptions
> from apache_beam.io.kafka import ReadFromKafka
> from apache_beam.transforms import window
> from apache_beam.transforms import trigger
> from apache_beam.transforms.trigger import AfterProcessingTime,
> Repeatedly, AccumulationMode
>
> class KafkaToGCSOptions(PipelineOptions):
>     @classmethod
>     def _add_argparse_args(cls, parser):
>         #parser.add_argument('--bootstrapServer', required=True,
> help='Kafka bootstrap servers')
>         parser.add_argument('--inputTopic', required=True, help='Input
> Kafka topic')
>         parser.add_argument('--output', required=True, help='Output
> GCS bucket path')
>         parser.add_argument('--output2', required=True, help='Output
> GCS bucket path2')
>         #parser.add_argument('--authenticationString', required=True,
> help='SASL JAAS config for Kafka authentication')
>
> def run(argv=None):
>     #options = PipelineOptions(argv)
>     #pipeline_options = options.view_as(KafkaToGCSOptions)
>     parser = argparse.ArgumentParser()
>     known_args, pipeline_args = parser.parse_known_args(argv)
>
>     pipeline_options = PipelineOptions(pipeline_args,
> save_main_session=True)
>     options = pipeline_options.view_as(KafkaToGCSOptions)
>     options.view_as(StandardOptions).streaming = True
>
>     kafka_config = {
>         'bootstrap.servers': 'custom.servicebus.windows.net:9093',
>         'security.protocol': 'SASL_SSL',
>         'sasl.mechanism': 'PLAIN',
>         'sasl.jaas.config':
> 'org.apache.kafka.common.security.plain.PlainLoginModule required
> username="$ConnectionString"
> password="Endpoint=sb://
> custom.servicebus.windows.net/;SharedAccessKeyName=;SharedAccessKey=";',
>         'auto.offset.reset': 'earliest'
>     }
>     wordsList = ["1", "2", "3", "4"]
>     with beam.Pipeline(options=options) as pipeline:
>         msg_kv_bytes  = (
>             pipeline
>             | 'Read from Kafka' >> ReadFromKafka(
>                 topics=[options.inputTopic],
>                 consumer_config=kafka_config,
>                 # Only for Direct Runner and when testing
>                 #max_num_records=10,
>                 with_metadata=True
>             )
>             # | 'Fixed window 5s' >> beam.WindowInto(
>             #     window.FixedWindows(5),
>             #     trigger=Repeatedly(AfterProcessingTime(5)),
>             #     accumulation_mode=AccumulationMode.DISCARDING
>             # )
>         )
>         (msg_kv_bytes | 'Print results' >> beam.Map(lambda x: logging.info
> (x)))
>         (msg_kv_bytes | 'Write to GCS' >>
> beam.io.textio.WriteToText(options.output))
>         (msg_kv_bytes | 'Write to GCS2' >>
> beam.io.textio.WriteToText(options.output2))
>
>
>     pipeline_result = pipeline.run()
>     pipeline_result.wait_until_finish()
>
>
>
> if __name__ == '__main__':
>     print("Streaming Pub/Sub messages to BigQuery...")
>     print(f"Python version: {sys.version}")
>     print(f"Apache Beam version: {beam.__version__}")
>     print("running to BigQuery...")
>     logging.getLogger().setLevel(logging.INFO)
>     run()
>
> ########################################################################
>
> Every time I try to run this using Dataflow runner  I am getting the
> error attached in the email
>
> ValueError: GroupByKey cannot be applied to an unbounded PCollection
> with global windowing and a default trigger
>
> I tried troubleshooting from my end, looked at various links:
>
> https://www.googlecloudcommunity.com/gc/Data-Analytics/kafka-to-parquet/m-p/646836
> https://github.com/apache/beam/issues/25598
> https://beam.apache.org/releases/pydoc/current/apache_beam.io.kafka.html
>
> Is this issue still pending or am I doing something wrong?
>
> Appreciate your help.
>
> --
> Warm Regards
> Dhiren
>

Reply via email to