Hello Trevor, Kenn is correct that Beam creates no-op Flink sinks. So if a sink isn't being created, it's possibly a bug in Beam.
Is this a batch or streaming pipeline? Which Flink version are you using? Kyle On Thu, Jun 10, 2021 at 3:19 PM Kenneth Knowles <[email protected]> wrote: > Beam doesn't use Flink's sink API. I recall from a very long time ago that > we attached a noop sink to each PCollection to avoid this error. +Kyle > Weaver <[email protected]> might know something about how this applies > to Python on Flink. > > Kenn > > On Wed, Jun 9, 2021 at 4:41 PM Trevor Kramer <[email protected]> > wrote: > >> Hello Beam community, >> >> I am getting the following error running a Beam pipeline on Flink. >> >> RuntimeError: Pipeline BeamApp failed in state FAILED: >> java.lang.RuntimeException: No data sinks have been created yet. A program >> needs at least one sink that consumes data. Examples are writing the data >> set or printing it. >> >> Here is my pipeline which I believe has a sink at the end of it. What am >> I missing? >> >> with beam.Pipeline(options=options) as p: >> (p >> | 'Read SDF' >> ParseSDF('s3://some-path.sdf') >> | 'Sample' >> beam.combiners.Sample.FixedSizeGlobally(1000) >> | 'Flatten' >> beam.FlatMap(lambda x: x) >> | 'Standardize' >> beam.Map(standardize) >> | 'Make FPs' >> beam.Map(calculate_fps) >> | 'Make Dict' >> beam.Map(lambda x: {'fp': x}) >> | 'Write Parquet' >> WriteToParquet('s3://some-path', pyarrow.schema( >> [('fp', pyarrow.list_(pyarrow.int64(), 2048))] >> )) >> ) >> >> >> Thanks, >> >> >> Trevor >> >>
