Hello Beam community,

I am getting the following error running a Beam pipeline on Flink.

RuntimeError: Pipeline BeamApp failed in state FAILED:
java.lang.RuntimeException: No data sinks have been created yet. A program
needs at least one sink that consumes data. Examples are writing the data
set or printing it.

Here is my pipeline which I believe has a sink at the end of it. What am
I missing?

with beam.Pipeline(options=options) as p:
    (p
     | 'Read SDF' >> ParseSDF('s3://some-path.sdf')
     | 'Sample' >> beam.combiners.Sample.FixedSizeGlobally(1000)
     | 'Flatten' >> beam.FlatMap(lambda x: x)
     | 'Standardize' >> beam.Map(standardize)
     | 'Make FPs' >> beam.Map(calculate_fps)
     | 'Make Dict' >> beam.Map(lambda x: {'fp': x})
     | 'Write Parquet' >> WriteToParquet('s3://some-path', pyarrow.schema(
                [('fp', pyarrow.list_(pyarrow.int64(), 2048))]
            ))
     )


Thanks,


Trevor

Reply via email to