Dear Team,

I am facing the following error while running a streaming pipeline (python)
in Apache Beam on Flinkrunner. The pipeline contains a GCP pub/sub io
source and pub/sub target.

WARNING:root:Make sure that locally built Python SDK docker image has
Python 3.6 interpreter.

ERROR:root:java.lang.IllegalArgumentException: PCollectionNodes
[PCollectionNode{id=ref_PCollection_PCollection_1,
PCollection=unique_name: "23 Read from Pub/Sub/Read.None"
coder_id: "ref_Coder_BytesCoder_1"
is_bounded: UNBOUNDED
windowing_strategy_id: "ref_Windowing_Windowing_1"
}] were consumed but never produced
Traceback (most recent call last):
  File "<stdin>", line 5, in <module>
  File "/usr/local/lib64/python3.6/site-packages/apache_beam/pipeline.py",
line 586, in __exit__
    self.result.wait_until_finish()
  File 
"/usr/local/lib64/python3.6/site-packages/apache_beam/runners/portability/portable_runner.py",
line 599, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline
BeamApp-swarna0kpaul-0712135603-763999c_45da372e-757d-4690-8e25-1a5ed0a5cc84
failed in state FAILED: java.lang.IllegalArgumentException:
PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1,
PCollection=unique_name: "23 Read from Pub/Sub/Read.None"
coder_id: "ref_Coder_BytesCoder_1"
is_bounded: UNBOUNDED
windowing_strategy_id: "ref_Windowing_Windowing_1"
}] were consumed but never produced

I am trying to run the following code in Python I am trying to run using 2
pub/sub topics I created in my GCP account ({input topic},{output topic})
The topics are in this format - projects/{project name}/topics/{topic name}

import apache_beam as beamfrom apache_beam.options.pipeline_options
import PipelineOptions
input_topic=<input topic>
output_topic=<output topic>
options = PipelineOptions(["--runner=FlinkRunner",
"--checkpointing_interval=1000","--streaming"])with
beam.Pipeline(options=options ) as pipeline:
  input1 = pipeline | " Read from Pub/Sub" >>
beam.io.ReadFromPubSub(topic=input_topic).with_output_types(bytes)
  output = (input1
            |beam.WindowInto(beam.transforms.window.FixedWindows(5))
            |"Write to Pub/Sub"
>>beam.io.WriteToPubSub(topic=output_topic,
with_attributes=False).with_input_types(bytes))

The following versions of the software are available in the system

Python 3.6.8
apache_beam [gcp]==2.30.0
java -version
openjdk version "1.8.0_292"
OpenJDK Runtime Environment (build 1.8.0_292-b10)
OpenJDK 64-Bit Server VM (build 25.292-b10, mixed mode)

I tried running this with flink cluster and portable flink runner as per
specification in this
<https://beam.apache.org/documentation/runners/flink/> page,
but got the same error


The same code is running fine when I used the following options

options = PipelineOptions(["--streaming"])

Any help is highly appreciated.

Below is the link for stackoverflow question


https://stackoverflow.com/questions/68342095/error-while-running-beam-streaming-pipeline-python-with-pub-sub-io-in-embedded


Thanks -

swarna

Reply via email to