Beam doesn't use Flink's sink API. I recall from a very long time ago that
we attached a noop sink to each PCollection to avoid this error. +Kyle
Weaver <[email protected]> might know something about how this applies to
Python on Flink.

Kenn

On Wed, Jun 9, 2021 at 4:41 PM Trevor Kramer <[email protected]>
wrote:

> Hello Beam community,
>
> I am getting the following error running a Beam pipeline on Flink.
>
> RuntimeError: Pipeline BeamApp failed in state FAILED:
> java.lang.RuntimeException: No data sinks have been created yet. A program
> needs at least one sink that consumes data. Examples are writing the data
> set or printing it.
>
> Here is my pipeline which I believe has a sink at the end of it. What am
> I missing?
>
> with beam.Pipeline(options=options) as p:
>     (p
>      | 'Read SDF' >> ParseSDF('s3://some-path.sdf')
>      | 'Sample' >> beam.combiners.Sample.FixedSizeGlobally(1000)
>      | 'Flatten' >> beam.FlatMap(lambda x: x)
>      | 'Standardize' >> beam.Map(standardize)
>      | 'Make FPs' >> beam.Map(calculate_fps)
>      | 'Make Dict' >> beam.Map(lambda x: {'fp': x})
>      | 'Write Parquet' >> WriteToParquet('s3://some-path', pyarrow.schema(
>                 [('fp', pyarrow.list_(pyarrow.int64(), 2048))]
>             ))
>      )
>
>
> Thanks,
>
>
> Trevor
>
>

Reply via email to