Please use beam.io.fileio.WriteToFiles On Thu, Jul 25, 2024 at 2:48 AM Dhirendra Singh <dheeru.k.si...@gmail.com> wrote:
> Hello Beam Devs, > > Thank you for your help. > I have been trying to connect to the Kafka Enabled Azure event hub > using Dataflow beam code using Python. > Apache Beam version: 2.56.0 > Python version: 3.10.13 (main, Feb 1 2024, 05:36:44) [GCC 12.2.0] > > > The code is as show below: > > import argparse > import logging > import sys > > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions, > SetupOptions, StandardOptions > from apache_beam.io.kafka import ReadFromKafka > from apache_beam.transforms import window > from apache_beam.transforms import trigger > from apache_beam.transforms.trigger import AfterProcessingTime, > Repeatedly, AccumulationMode > > class KafkaToGCSOptions(PipelineOptions): > @classmethod > def _add_argparse_args(cls, parser): > #parser.add_argument('--bootstrapServer', required=True, > help='Kafka bootstrap servers') > parser.add_argument('--inputTopic', required=True, help='Input > Kafka topic') > parser.add_argument('--output', required=True, help='Output > GCS bucket path') > parser.add_argument('--output2', required=True, help='Output > GCS bucket path2') > #parser.add_argument('--authenticationString', required=True, > help='SASL JAAS config for Kafka authentication') > > def run(argv=None): > #options = PipelineOptions(argv) > #pipeline_options = options.view_as(KafkaToGCSOptions) > parser = argparse.ArgumentParser() > known_args, pipeline_args = parser.parse_known_args(argv) > > pipeline_options = PipelineOptions(pipeline_args, > save_main_session=True) > options = pipeline_options.view_as(KafkaToGCSOptions) > options.view_as(StandardOptions).streaming = True > > kafka_config = { > 'bootstrap.servers': 'custom.servicebus.windows.net:9093', > 'security.protocol': 'SASL_SSL', > 'sasl.mechanism': 'PLAIN', > 'sasl.jaas.config': > 'org.apache.kafka.common.security.plain.PlainLoginModule required > username="$ConnectionString" > password="Endpoint=sb:// > custom.servicebus.windows.net/;SharedAccessKeyName=;SharedAccessKey=";', > 'auto.offset.reset': 'earliest' > } > wordsList = ["1", "2", "3", "4"] > with beam.Pipeline(options=options) as pipeline: > msg_kv_bytes = ( > pipeline > | 'Read from Kafka' >> ReadFromKafka( > topics=[options.inputTopic], > consumer_config=kafka_config, > # Only for Direct Runner and when testing > #max_num_records=10, > with_metadata=True > ) > # | 'Fixed window 5s' >> beam.WindowInto( > # window.FixedWindows(5), > # trigger=Repeatedly(AfterProcessingTime(5)), > # accumulation_mode=AccumulationMode.DISCARDING > # ) > ) > (msg_kv_bytes | 'Print results' >> beam.Map(lambda x: logging.info > (x))) > (msg_kv_bytes | 'Write to GCS' >> > beam.io.textio.WriteToText(options.output)) > (msg_kv_bytes | 'Write to GCS2' >> > beam.io.textio.WriteToText(options.output2)) > > > pipeline_result = pipeline.run() > pipeline_result.wait_until_finish() > > > > if __name__ == '__main__': > print("Streaming Pub/Sub messages to BigQuery...") > print(f"Python version: {sys.version}") > print(f"Apache Beam version: {beam.__version__}") > print("running to BigQuery...") > logging.getLogger().setLevel(logging.INFO) > run() > > ######################################################################## > > Every time I try to run this using Dataflow runner I am getting the > error attached in the email > > ValueError: GroupByKey cannot be applied to an unbounded PCollection > with global windowing and a default trigger > > I tried troubleshooting from my end, looked at various links: > > https://www.googlecloudcommunity.com/gc/Data-Analytics/kafka-to-parquet/m-p/646836 > https://github.com/apache/beam/issues/25598 > https://beam.apache.org/releases/pydoc/current/apache_beam.io.kafka.html > > Is this issue still pending or am I doing something wrong? > > Appreciate your help. > > -- > Warm Regards > Dhiren >