jonathaningram opened a new issue, #35318: URL: https://github.com/apache/beam/issues/35318
### What happened? I updated my Beam YAML pipeline in Dataflow which was a noop change in the YAML itself, but since doing it, my pipeline is unable to progress due to this error. <img width="983" alt="Image" src="https://github.com/user-attachments/assets/40e0bb65-730e-4a9d-a382-f41af945cde4" /> Here's the Terraform diff showing what was actually changed in the update (read: nothing). ``` format: JSON schema: type: object - # NOTE(jingram): `encoding_position` is required in each property - # otherwise we get the following error when the pipeline is run: - # - # ``` - # ValueError: Schema with id <some uuid> has encoding_positions_set=True, but not all fields have encoding_position set - # ``` properties: id: type: string app_id: type: string index: type: string event_name: type: string event_type: type: string user_token: type: string object_ids: type: array items: { type: string } ``` Don't be fooled by the mention of `encoding_position` in the YAML comment. This was just me cleaning up an old comment from days ago and pushing the update to prod. The comment I added at the time was me trying to fix the exact error I'm experiencing in this bug report by specifying `encoding_position` in the schema fields (which I thought worked, but later realised the issue still persisted). Anyway, in the Terraform diff, all I did is remove a YAML comment. I didn't change any schema. I didn't change the Pub/Sub message payload or attributes. I didn't change the BQ table schema but for some reason Beam. Prior to updating the YAML with the above, my pipeline was suffering from this error, which I haven't yet investigated: ``` Error message from worker: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Duplicate values for b8b0bcee58104e8ba2650ed437816a44-000 org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) redacted.MyProvider$MyTransform$MergeThingsFn$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:815) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275) org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:211) org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:231) org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:528) org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150) org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115) java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) java.base/java.lang.Thread.run(Thread.java:840) Caused by: java.lang.IllegalArgumentException: Duplicate values for b8b0bcee58104e8ba2650ed437816a44-000 org.apache.beam.sdk.values.PCollectionViews$MultimapViewToMapAdapter.get(PCollectionViews.java:2030) java.base/java.util.Collections$UnmodifiableMap.get(Collections.java:1502) redacted.MyProvider$MyTransform$MergeThingsFn.processElement(MyTransformProvider.java:264) passed through: ==> dist_proc/dax/workflow/worker/fnapi_service_impl.cc:1341 ``` `b8b0bcee58104e8ba2650ed437816a44-000` is an event ID and I have `id_attribute: eventID` specified in my pipeline's `ReadFromPubSub`. Anyway, I'm not sure this error caused the subsequent pipeline update to bork and the YAML change is just a red herring. Expand this for example stack of the `encoding_positions_set` error. <details> ``` Error message from worker: generic::unknown: Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in _execute response = task() ^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 388, in <lambda> lambda: self.create_worker().do_instruction(request), request) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 658, in do_instruction return getattr(self, request_type)( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 689, in process_bundle bundle_processor = self.bundle_processor_cache.get( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 512, in get processor = bundle_processor.BundleProcessor( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1133, in __init__ self.ops = self.create_execution_tree(self.process_bundle_descriptor) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1190, in create_execution_tree return collections.OrderedDict([( ^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1193, in <listcomp> get_operation(transform_id))) for transform_id in sorted( ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper result = cache[args] = func(*args) ^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation transform_consumers = { ^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <dictcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper result = cache[args] = func(*args) ^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation transform_consumers = { ^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <dictcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper result = cache[args] = func(*args) ^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation transform_consumers = { ^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <dictcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper result = cache[args] = func(*args) ^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation transform_consumers = { ^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <dictcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper result = cache[args] = func(*args) ^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation transform_consumers = { ^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <dictcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper result = cache[args] = func(*args) ^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1178, in get_operation return transform_factory.create_operation( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1498, in create_operation return creator(self, transform_id, transform_proto, payload, consumers) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1614, in create_sink_runner return DataOutputOperation( ^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 146, in __init__ self.windowed_coder_impl = windowed_coder.get_impl() ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 232, in get_impl self._impl = self._create_impl() ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 1429, in _create_impl self.wrapped_value_coder.get_impl(), ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 232, in get_impl self._impl = self._create_impl() ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 1536, in _create_impl return coder_impl.LengthPrefixCoderImpl(self._value_coder.get_impl()) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 232, in get_impl self._impl = self._create_impl() ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/row_coder.py", line 79, in _create_impl return RowCoderImpl(self.schema, self.components) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "apache_beam/coders/coder_impl.py", line 1836, in apache_beam.coders.coder_impl.RowCoderImpl.__init__ ValueError: Schema with id 298ab171-007f-4a07-bef1-2231611f4734 has encoding_positions_set=True, but not all fields have encoding_position set passed through: ==> dist_proc/dax/workflow/worker/fnapi_service_impl.cc:1341 ``` </details> I have increased the priority of this ticket, please adjust as required on your end. The only workaround I've got to work over the week of getting this error is to just wipe my pipeline entirely and recreate it from scratch. ### Issue Priority Priority: 1 (data loss / total loss of function) ### Issue Components - [ ] Component: Python SDK - [ ] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [ ] Component: IO connector - [x] Component: Beam YAML - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Infrastructure - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [ ] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org