shunping opened a new issue, #36013:
URL: https://github.com/apache/beam/issues/36013
### What happened?
When we calls jdbc related schema transform in a pipeline and specify
`environment_type` as `DOCKER`, the pipeline failed to run.
Here is the code to reproduce (required a postgres database instance):
```
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.external import SchemaAwareExternalTransform
from apache_beam.transforms.external import BeamJarExpansionService
logging.basicConfig(level=logging.INFO)
options = PipelineOptions([
"--streaming",
"--runner=PrismRunner",
"--environment_type=DOCKER",
"--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.12_sdk:latest",
"--job_server_timeout=3600",
"--prism_log_level=debug",
])
URN = "beam:schematransform:org.apache.beam:postgres_read:v1"
from apache_beam.io.jdbc import JdbcDateType
from apache_beam.io.jdbc import JdbcTimeType
expansion_service = BeamJarExpansionService(
"sdks:java:io:google-cloud-platform:expansion-service:shadowJar")
with beam.Pipeline(options=options) as p:
_ = (
p | SchemaAwareExternalTransform(
identifier=URN,
expansion_service=expansion_service,
rearrange_based_on_discovery=True,
jdbc_url="jdbc:postgresql://localhost:5432/<db_name>",
location="<table_name>",
username="postgres",
password="",
)
| beam.LogElements(level=logging.WARN))
```
The error message is as follows.
```
Traceback (most recent call last):
File "sdks/python/playpen/xlang/run_xlang.py", line 37, in <module>
with beam.Pipeline(options=options) as p:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "sdks/python/apache_beam/pipeline.py", line 663, in __exit__
self.result.wait_until_finish()
File "sdks/python/apache_beam/runners/portability/portable_runner.py",
line 568, in wait_until_finish
raise self._runtime_exception
RuntimeError: Pipeline job-001 failed in state FAILED: bundle inst004
stage-005 failed:Traceback (most recent call last):
File
"/usr/local/lib/python3.12/site-packages/apache_beam/typehints/schemas.py",
line 573, in named_tuple_from_schema
field_py_type = self.typing_from_runner_api(field.type)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/typehints/schemas.py",
line 491, in typing_from_runner_api
base = self.typing_from_runner_api(base_type)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/typehints/schemas.py",
line 554, in typing_from_runner_api
return LogicalType.from_runner_api(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/typehints/schemas.py",
line 813, in from_runner_api
raise ValueError(
ValueError: No logical type registered for URN
'beam:logical_type:javasdk_date:v1'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 313, in _execute
response = task()
^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 387, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 659, in do_instruction
return getattr(self, request_type)(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 690, in process_bundle
bundle_processor = self.bundle_processor_cache.get(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 511, in get
processor = bundle_processor.BundleProcessor(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1133, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1193, in create_execution_tree
get_operation(transform_id))) for transform_id in sorted(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1038, in wrapper
result = cache[args] = func(*args)
^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1168, in get_operation
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1038, in wrapper
result = cache[args] = func(*args)
^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1178, in get_operation
return transform_factory.create_operation(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1497, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1828, in create_par_do
return _create_pardo_operation(
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1912, in _create_pardo_operation
output_coders = factory.get_output_coders(transform_proto)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1540, in get_output_coders
tag: self.get_windowed_coder(pcoll_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1526, in get_windowed_coder
coder = self.get_coder(self.descriptor.pcollections[pcoll_id].coder_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1519, in get_coder
return self.context.coders.get_by_id(coder_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/runners/pipeline_context.py",
line 106, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/coders/coders.py", line
368, in from_runner_api
return constructor(
^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/coders/row_coder.py", line
107, in from_runner_api_parameter
return RowCoder(schema)
^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/coders/row_coder.py", line
66, in __init__
self._type_hint = named_tuple_from_schema(self.schema)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/typehints/schemas.py",
line 616, in named_tuple_from_schema
schema_registry=schema_registry).named_tuple_from_schema(schema)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/apache_beam/typehints/schemas.py",
line 577, in named_tuple_from_schema
raise ValueError(
ValueError: Failed to decode schema due to an issue with Field proto:
name: "event_date"
type {
nullable: true
logical_type {
urn: "beam:logical_type:javasdk_date:v1"
payload:
"\202SNAPPY\000\000\000\000\001\000\000\000\001\000\000\002d\303\007\360@\254\355\000\005sr\000*org.apache.beam.sdk.io.jdbc.LogicalTypes$2M\352\236\036h\3034/\002\000\000xr\000?N9\000
schemas.l\t9Dtypes.PassThroughL\t\030\001Q\270\210\324\331\211\313P\033\263\002\000\004L\000\010argumentt\000\022Ljava/lang/Object;L\000\014a\r
\001:\034t\000.Lorg/\t\266\000/\001\266\020/sdk/\r}\004/S\005\205\024$Field\0010\020;L\000\tf\021\rDq\000~\000\003L\000\nidentifier6s\000<String;xpt\000\000sr\0006n\346\000$AutoValue_\ts\000_\025sh9\304m\364S\243\227P\002\000\010L\000\025collectionEle\001\346\001\226\000q\t\211\000\013-+\001\023\010t\0000\216\331\000\000L9E$;L\000\nmapKey\001@\rS\014\014map\005\227\035\024,\010metadatat\000\017)aXutil/Map;L\000\010nullablet\000\023\t\035%~4Boolean;L\000\trow\t\343\010t\000$\212\243\000\001T!\374\030Namet\000-\2122\000\000$\001\254\001/\020;xr\000,nu\001\000S:\336\0010\013PLl[\357\3103\002\000\000xp\001\001\014sr\000\036AC\030.util.C5|Ds$EmptyMapY
6\024\205Z\334\347\320\0053\014sr\000\021\005/\001\364\000.\r\3648\315
r\200\325\234\372\356\002\000\001Z\000\005v!\344\034xp\000p~r\000+\212\234\000\021\314\000\000\r\001\000\022e1\000\016\031f\014Enum\r\034\005\035(pt\000\006STRINGs!\304\000\007\001\307\001\t\000\020\001\005\010\022p~\001\007H\023t\000\010DATETIMEt\000\004DATE"
representation {
logical_type {
urn: "beam:logical_type:millis_instant:v1"
representation {
atomic_type: INT64
}
}
}
argument_type {
atomic_type: STRING
}
argument {
atomic_value {
string: ""
}
}
}
}
id: 1
encoding_position: 1
```
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [ ] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Infrastructure
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]