shunping opened a new issue, #36013:
URL: https://github.com/apache/beam/issues/36013

   ### What happened?
   
   When we calls jdbc related schema transform in a pipeline and specify 
`environment_type` as `DOCKER`, the pipeline failed to run.
   
   Here is the code to reproduce (required a postgres database instance):
   ```
   import logging
   
   import apache_beam as beam
   from apache_beam.options.pipeline_options import PipelineOptions
   from apache_beam.transforms.external import SchemaAwareExternalTransform
   from apache_beam.transforms.external import BeamJarExpansionService
   
   logging.basicConfig(level=logging.INFO)
   
   options = PipelineOptions([
       "--streaming",
       "--runner=PrismRunner",
       "--environment_type=DOCKER",
       
"--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.12_sdk:latest",
       "--job_server_timeout=3600",
       "--prism_log_level=debug",
   ])
   
   URN = "beam:schematransform:org.apache.beam:postgres_read:v1"
   
   from apache_beam.io.jdbc import JdbcDateType
   from apache_beam.io.jdbc import JdbcTimeType
   
   expansion_service = BeamJarExpansionService(
       "sdks:java:io:google-cloud-platform:expansion-service:shadowJar")
   with beam.Pipeline(options=options) as p:
     _ = (
         p | SchemaAwareExternalTransform(
             identifier=URN,
             expansion_service=expansion_service,
             rearrange_based_on_discovery=True,
             jdbc_url="jdbc:postgresql://localhost:5432/<db_name>",
             location="<table_name>",
             username="postgres",
             password="",
         )
         | beam.LogElements(level=logging.WARN))
   
   ```
   
   
   The error message is as follows.
   ```
   Traceback (most recent call last):
     File "sdks/python/playpen/xlang/run_xlang.py", line 37, in <module>
       with beam.Pipeline(options=options) as p:
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "sdks/python/apache_beam/pipeline.py", line 663, in __exit__
       self.result.wait_until_finish()
     File "sdks/python/apache_beam/runners/portability/portable_runner.py", 
line 568, in wait_until_finish
       raise self._runtime_exception
   RuntimeError: Pipeline job-001 failed in state FAILED: bundle inst004 
stage-005 failed:Traceback (most recent call last):
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/typehints/schemas.py", 
line 573, in named_tuple_from_schema
       field_py_type = self.typing_from_runner_api(field.type)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/typehints/schemas.py", 
line 491, in typing_from_runner_api
       base = self.typing_from_runner_api(base_type)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/typehints/schemas.py", 
line 554, in typing_from_runner_api
       return LogicalType.from_runner_api(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/typehints/schemas.py", 
line 813, in from_runner_api
       raise ValueError(
   ValueError: No logical type registered for URN 
'beam:logical_type:javasdk_date:v1'
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 313, in _execute
       response = task()
                  ^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 387, in <lambda>
       lambda: self.create_worker().do_instruction(request), request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 659, in do_instruction
       return getattr(self, request_type)(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 690, in process_bundle
       bundle_processor = self.bundle_processor_cache.get(
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 511, in get
       processor = bundle_processor.BundleProcessor(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1133, in __init__
       self.ops = self.create_execution_tree(self.process_bundle_descriptor)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1193, in create_execution_tree
       get_operation(transform_id))) for transform_id in sorted(
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1038, in wrapper
       result = cache[args] = func(*args)
                              ^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in get_operation
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
             ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1038, in wrapper
       result = cache[args] = func(*args)
                              ^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1178, in get_operation
       return transform_factory.create_operation(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1497, in create_operation
       return creator(self, transform_id, transform_proto, payload, consumers)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1828, in create_par_do
       return _create_pardo_operation(
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1912, in _create_pardo_operation
       output_coders = factory.get_output_coders(transform_proto)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1540, in get_output_coders
       tag: self.get_windowed_coder(pcoll_id)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1526, in get_windowed_coder
       coder = self.get_coder(self.descriptor.pcollections[pcoll_id].coder_id)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1519, in get_coder
       return self.context.coders.get_by_id(coder_id)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/pipeline_context.py",
 line 106, in get_by_id
       self._id_to_obj[id] = self._obj_type.from_runner_api(
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/coders/coders.py", line 
368, in from_runner_api
       return constructor(
              ^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/coders/row_coder.py", line 
107, in from_runner_api_parameter
       return RowCoder(schema)
              ^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/coders/row_coder.py", line 
66, in __init__
       self._type_hint = named_tuple_from_schema(self.schema)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/typehints/schemas.py", 
line 616, in named_tuple_from_schema
       schema_registry=schema_registry).named_tuple_from_schema(schema)
                                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/apache_beam/typehints/schemas.py", 
line 577, in named_tuple_from_schema
       raise ValueError(
   ValueError: Failed to decode schema due to an issue with Field proto:
   
   name: "event_date"
   type {
     nullable: true
     logical_type {
       urn: "beam:logical_type:javasdk_date:v1"
       payload: 
"\202SNAPPY\000\000\000\000\001\000\000\000\001\000\000\002d\303\007\360@\254\355\000\005sr\000*org.apache.beam.sdk.io.jdbc.LogicalTypes$2M\352\236\036h\3034/\002\000\000xr\000?N9\000
 
schemas.l\t9Dtypes.PassThroughL\t\030\001Q\270\210\324\331\211\313P\033\263\002\000\004L\000\010argumentt\000\022Ljava/lang/Object;L\000\014a\r
 
\001:\034t\000.Lorg/\t\266\000/\001\266\020/sdk/\r}\004/S\005\205\024$Field\0010\020;L\000\tf\021\rDq\000~\000\003L\000\nidentifier6s\000<String;xpt\000\000sr\0006n\346\000$AutoValue_\ts\000_\025sh9\304m\364S\243\227P\002\000\010L\000\025collectionEle\001\346\001\226\000q\t\211\000\013-+\001\023\010t\0000\216\331\000\000L9E$;L\000\nmapKey\001@\rS\014\014map\005\227\035\024,\010metadatat\000\017)aXutil/Map;L\000\010nullablet\000\023\t\035%~4Boolean;L\000\trow\t\343\010t\000$\212\243\000\001T!\374\030Namet\000-\2122\000\000$\001\254\001/\020;xr\000,nu\001\000S:\336\0010\013PLl[\357\3103\002\000\000xp\001\001\014sr\000\036AC\030.util.C5|Ds$EmptyMapY
 6\024\205Z\334\347\320\0053\014sr\000\021\005/\001\364\000.\r\3648\315 
r\200\325\234\372\356\002\000\001Z\000\005v!\344\034xp\000p~r\000+\212\234\000\021\314\000\000\r\001\000\022e1\000\016\031f\014Enum\r\034\005\035(pt\000\006STRINGs!\304\000\007\001\307\001\t\000\020\001\005\010\022p~\001\007H\023t\000\010DATETIMEt\000\004DATE"
       representation {
         logical_type {
           urn: "beam:logical_type:millis_instant:v1"
           representation {
             atomic_type: INT64
           }
         }
       }
       argument_type {
         atomic_type: STRING
       }
       argument {
         atomic_value {
           string: ""
         }
       }
     }
   }
   id: 1
   encoding_position: 1
   ```
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to