jonathaningram opened a new issue, #35179:
URL: https://github.com/apache/beam/issues/35179

   ### What would you like to happen?
   
   I would like to be able to have `ReadFromPubSub` understand that an incoming 
field in my message payload might be null, and that's OK.
   
   I want this to work:
   
   ```yaml
   pipeline:
     source:
       type: ReadFromPubSub
       config:
         subscription: "sub"
         format: JSON
         schema:
           type: object
           properties:
             id: { type: string }
             event_subtype: { type: string } # <-- this causes a failure if 
message does not have this field
   ```
   
   But it fails with `KeyError: 'event_subtype'`. Expand below to see the stack 
trace.
   
   <details>
   
   ```
   Error message from worker: generic::unknown: Traceback (most recent call 
last):
     File "apache_beam/runners/common.py", line 1495, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 684, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/transforms/core.py", line 
2086, in <lambda>
       wrapper = lambda x: [fn(x)]
                            ^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/yaml/yaml_io.py", line 
380, in mapper
       values = parser(msg.data).as_dict()
                ^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/yaml/json_utils.py", line 
224, in parse
       return to_row(o)
              ^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/yaml/json_utils.py", line 
191, in <lambda>
       {name: convert(value[name])
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/yaml/json_utils.py", line 
191, in <dictcomp>
       {name: convert(value[name])
                      ~~~~~^^^^^^
   KeyError: 'event_subtype'
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 313, in _execute
       response = task()
                  ^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 388, in <lambda>
       lambda: self.create_worker().do_instruction(request), request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 658, in do_instruction
       return getattr(self, request_type)(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 696, in process_bundle
       bundle_processor.process_bundle(instruction_id))
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1274, in process_bundle
       input_op_by_transform_id[element.transform_id].process_encoded(
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 237, in process_encoded
       self.output(decoded_value)
     File "apache_beam/runners/worker/operations.py", line 566, in 
apache_beam.runners.worker.operations.Operation.output
     File "apache_beam/runners/worker/operations.py", line 568, in 
apache_beam.runners.worker.operations.Operation.output
     File "apache_beam/runners/worker/operations.py", line 259, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 262, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 949, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/worker/operations.py", line 950, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/common.py", line 1497, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 1585, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam/runners/common.py", line 1495, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 683, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
     File "apache_beam/runners/common.py", line 1680, in 
apache_beam.runners.common._OutputHandler.handle_process_outputs
     File "apache_beam/runners/common.py", line 1793, in 
apache_beam.runners.common._OutputHandler._write_value_to_tag
     File "apache_beam/runners/worker/operations.py", line 262, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 949, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/worker/operations.py", line 950, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/common.py", line 1497, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 1585, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam/runners/common.py", line 1495, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 683, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
     File "apache_beam/runners/common.py", line 1680, in 
apache_beam.runners.common._OutputHandler.handle_process_outputs
     File "apache_beam/runners/common.py", line 1793, in 
apache_beam.runners.common._OutputHandler._write_value_to_tag
     File "apache_beam/runners/worker/operations.py", line 262, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 949, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/worker/operations.py", line 950, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/common.py", line 1497, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 1606, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam/runners/common.py", line 1495, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 684, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/transforms/core.py", line 
2086, in <lambda>
       wrapper = lambda x: [fn(x)]
                            ^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/yaml/yaml_io.py", line 
380, in mapper
       values = parser(msg.data).as_dict()
                ^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/yaml/json_utils.py", line 
224, in parse
       return to_row(o)
              ^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/yaml/json_utils.py", line 
191, in <lambda>
       {name: convert(value[name])
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/yaml/json_utils.py", line 
191, in <dictcomp>
       {name: convert(value[name])
                      ~~~~~^^^^^^
   KeyError: "event_subtype [while running 
'read_from_pubsub/ParseMessage-ptransform-44']"
   
   passed through:
   ==>
       dist_proc/dax/workflow/worker/fnapi_service_impl.cc:1334
   ```
   </details>
   
   I've tried two things to make this work:
   
   ```yaml
   event_subtype: { type: ["string", "null"] }
   ```
   
   Which fails with `ValueError: Error applying transform "ReadFromPubSub" at 
line 5: unhashable type: 'list'`.
   
   And:
   
   ```yaml
   event_subtype:
     oneOf:
       - type: "string"
       - type: "null"
   ```
   
   Which fails with `ValueError: Error applying transform "ReadFromPubSub" at 
line 5: Malformed type {'oneOf': [{'type': 'string'}, {'type': None}]}.`.
   
   In my case, when publishing to Pub/Sub `event_subtype` is actually omitted 
from the JSON bytes if it's empty (`json:"event_subtype,omitempty"` in Go). So 
in this case it's not actually `null`, it's just not there but I thought I'd 
try the above anyway.
   
   Is there an existing solution here or do I have to solve this by changing 
the shape of the data at publish time?
   
   ### Issue Priority
   
   Priority: 2 (default / most feature requests should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [x] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to