Beam doesn't use Flink's sink API. I recall from a very long time ago that we attached a noop sink to each PCollection to avoid this error. +Kyle Weaver <[email protected]> might know something about how this applies to Python on Flink.
Kenn On Wed, Jun 9, 2021 at 4:41 PM Trevor Kramer <[email protected]> wrote: > Hello Beam community, > > I am getting the following error running a Beam pipeline on Flink. > > RuntimeError: Pipeline BeamApp failed in state FAILED: > java.lang.RuntimeException: No data sinks have been created yet. A program > needs at least one sink that consumes data. Examples are writing the data > set or printing it. > > Here is my pipeline which I believe has a sink at the end of it. What am > I missing? > > with beam.Pipeline(options=options) as p: > (p > | 'Read SDF' >> ParseSDF('s3://some-path.sdf') > | 'Sample' >> beam.combiners.Sample.FixedSizeGlobally(1000) > | 'Flatten' >> beam.FlatMap(lambda x: x) > | 'Standardize' >> beam.Map(standardize) > | 'Make FPs' >> beam.Map(calculate_fps) > | 'Make Dict' >> beam.Map(lambda x: {'fp': x}) > | 'Write Parquet' >> WriteToParquet('s3://some-path', pyarrow.schema( > [('fp', pyarrow.list_(pyarrow.int64(), 2048))] > )) > ) > > > Thanks, > > > Trevor > >
