jonathaningram opened a new issue, #35318:
URL: https://github.com/apache/beam/issues/35318

   ### What happened?
   
   I updated my Beam YAML pipeline in Dataflow which was a noop change in the 
YAML itself, but since doing it, my pipeline is unable to progress due to this 
error.
   
   <img width="983" alt="Image" 
src="https://github.com/user-attachments/assets/40e0bb65-730e-4a9d-a382-f41af945cde4";
 />
   
   Here's the Terraform diff showing what was actually changed in the update 
(read: nothing).
   
   ```
                         format: JSON
                         schema:
                           type: object
                 -         # NOTE(jingram): `encoding_position` is required in 
each property
                 -         # otherwise we get the following error when the 
pipeline is run:
                 -         #
                 -         # ```
                 -         # ValueError: Schema with id <some uuid> has 
encoding_positions_set=True, but not all fields have encoding_position set
                 -         # ```
                           properties:
                             id:
                               type: string
                             app_id:
                               type: string
                             index:
                               type: string
                             event_name:
                               type: string
                             event_type:
                               type: string
                             user_token:
                               type: string
                             object_ids:
                               type: array
                               items: { type: string }
                   
   ```
   
   Don't be fooled by the mention of `encoding_position` in the YAML comment. 
This was just me cleaning up an old comment from days ago and pushing the 
update to prod. The comment I added at the time was me trying to fix the exact 
error I'm experiencing in this bug report by specifying `encoding_position` in 
the schema fields (which I thought worked, but later realised the issue still 
persisted).
   
   Anyway, in the Terraform diff, all I did is remove a YAML comment. I didn't 
change any schema. I didn't change the Pub/Sub message payload or attributes. I 
didn't change the BQ table schema but for some reason Beam. 
   
   Prior to updating the YAML with the above, my pipeline was suffering from 
this error, which I haven't yet investigated:
   
   ```
   Error message from worker: generic::unknown: 
org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: 
Duplicate values for b8b0bcee58104e8ba2650ed437816a44-000
        
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
        
redacted.MyProvider$MyTransform$MergeThingsFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:815)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
        
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
        
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:211)
        
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:231)
        
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:528)
        
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
        
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
        
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
        
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        java.base/java.lang.Thread.run(Thread.java:840)
   Caused by: java.lang.IllegalArgumentException: Duplicate values for 
b8b0bcee58104e8ba2650ed437816a44-000
        
org.apache.beam.sdk.values.PCollectionViews$MultimapViewToMapAdapter.get(PCollectionViews.java:2030)
        
java.base/java.util.Collections$UnmodifiableMap.get(Collections.java:1502)
        
redacted.MyProvider$MyTransform$MergeThingsFn.processElement(MyTransformProvider.java:264)
   
   passed through:
   ==>
       dist_proc/dax/workflow/worker/fnapi_service_impl.cc:1341
   ```
   
   `b8b0bcee58104e8ba2650ed437816a44-000` is an event ID and I have 
`id_attribute: eventID` specified in my pipeline's `ReadFromPubSub`.
   
   Anyway, I'm not sure this error caused the subsequent pipeline update to 
bork and the YAML change is just a red herring.
   
   Expand this for example stack of the `encoding_positions_set` error.
   
   <details>
   
   ```
   Error message from worker: generic::unknown: Traceback (most recent call 
last):
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 313, in _execute
       response = task()
                  ^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 388, in <lambda>
       lambda: self.create_worker().do_instruction(request), request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 658, in do_instruction
       return getattr(self, request_type)(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 689, in process_bundle
       bundle_processor = self.bundle_processor_cache.get(
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 512, in get
       processor = bundle_processor.BundleProcessor(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1133, in __init__
       self.ops = self.create_execution_tree(self.process_bundle_descriptor)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1190, in create_execution_tree
       return collections.OrderedDict([(
                                      ^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1193, in <listcomp>
       get_operation(transform_id))) for transform_id in sorted(
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1038, in wrapper
       result = cache[args] = func(*args)
                              ^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1167, in get_operation
       transform_consumers = {
                             ^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in <dictcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in <listcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
             ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1038, in wrapper
       result = cache[args] = func(*args)
                              ^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1167, in get_operation
       transform_consumers = {
                             ^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in <dictcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in <listcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
             ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1038, in wrapper
       result = cache[args] = func(*args)
                              ^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1167, in get_operation
       transform_consumers = {
                             ^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in <dictcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in <listcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
             ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1038, in wrapper
       result = cache[args] = func(*args)
                              ^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1167, in get_operation
       transform_consumers = {
                             ^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in <dictcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in <listcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
             ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1038, in wrapper
       result = cache[args] = func(*args)
                              ^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1167, in get_operation
       transform_consumers = {
                             ^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in <dictcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in <listcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
             ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1038, in wrapper
       result = cache[args] = func(*args)
                              ^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1178, in get_operation
       return transform_factory.create_operation(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1498, in create_operation
       return creator(self, transform_id, transform_proto, payload, consumers)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1614, in create_sink_runner
       return DataOutputOperation(
              ^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 146, in __init__
       self.windowed_coder_impl = windowed_coder.get_impl()
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 
232, in get_impl
       self._impl = self._create_impl()
                    ^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 
1429, in _create_impl
       self.wrapped_value_coder.get_impl(),
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 
232, in get_impl
       self._impl = self._create_impl()
                    ^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 
1536, in _create_impl
       return coder_impl.LengthPrefixCoderImpl(self._value_coder.get_impl())
                                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 
232, in get_impl
       self._impl = self._create_impl()
                    ^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/coders/row_coder.py", line 
79, in _create_impl
       return RowCoderImpl(self.schema, self.components)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "apache_beam/coders/coder_impl.py", line 1836, in 
apache_beam.coders.coder_impl.RowCoderImpl.__init__
   ValueError: Schema with id 298ab171-007f-4a07-bef1-2231611f4734 has 
encoding_positions_set=True,
               but not all fields have encoding_position set
   
   passed through:
   ==>
       dist_proc/dax/workflow/worker/fnapi_service_impl.cc:1341
   ```
   
   </details>
   
   I have increased the priority of this ticket, please adjust as required on 
your end.
   
   The only workaround I've got to work over the week of getting this error is 
to just wipe my pipeline entirely and recreate it from scratch.
   
   ### Issue Priority
   
   Priority: 1 (data loss / total loss of function)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [x] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to