Hi,

Getting messages from pubsub and then saving it into hourly or other
interval files on gcs does not work on Cloud Dataflow. The job only writes
the files when I shut down the job. Is this not yet supported for the
Python SDK or am I doing something wrong?

Here is a snippet of my code:

p = beam.Pipeline(options=pipelineoptions)

messages = p | 'Read from topic: ' + topic >>
ReadFromPubSub(topic=input_topic).with_input_types(bytes)

windowed_lines = (
        messages
        | 'decode' >> beam.ParDo(DecodeAvro(), parsed_schema)
        | beam.WindowInto(
                window.FixedWindows(60),
                trigger=AfterWatermark(),
                accumulation_mode=AccumulationMode.DISCARDING
            )
        )

output = windowed_lines | 'write result' >> WriteToAvro(
    file_path_prefix='gs://BUCKET/streaming/tests/',
    shard_name_template=topic.split('.')[0] + '_' + str(uuid.uuid4())
+ '_SSSS-of-NNNN',
    schema=parsed_schema,
    file_name_suffix='.avro',
    num_shards=2)


Thank you for your help,
Marc

Reply via email to