This is an automated email from the ASF dual-hosted git repository. vterentev pushed a commit to branch cp-36014 in repository https://gitbox.apache.org/repos/asf/beam.git
commit 084c97f67f727110bb0cb4a40642b0887be4f4ae Author: Vitaly Terentyev <[email protected]> AuthorDate: Mon Sep 8 14:33:48 2025 +0400 Cherrypick --- sdks/python/apache_beam/io/jdbc.py | 92 +--------------------------- sdks/python/apache_beam/typehints/schemas.py | 92 ++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 90 deletions(-) diff --git a/sdks/python/apache_beam/io/jdbc.py b/sdks/python/apache_beam/io/jdbc.py index 79e6b3ce315..df5d7f21a34 100644 --- a/sdks/python/apache_beam/io/jdbc.py +++ b/sdks/python/apache_beam/io/jdbc.py @@ -87,7 +87,6 @@ # pytype: skip-file import contextlib -import datetime import typing import numpy as np @@ -96,10 +95,11 @@ from apache_beam.coders import RowCoder from apache_beam.transforms.external import BeamJarExpansionService from apache_beam.transforms.external import ExternalTransform from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder +from apache_beam.typehints.schemas import JdbcDateType # pylint: disable=unused-import +from apache_beam.typehints.schemas import JdbcTimeType # pylint: disable=unused-import from apache_beam.typehints.schemas import LogicalType from apache_beam.typehints.schemas import MillisInstant from apache_beam.typehints.schemas import typing_to_runner_api -from apache_beam.utils.timestamp import Timestamp __all__ = [ 'WriteToJdbc', @@ -399,91 +399,3 @@ class ReadFromJdbc(ExternalTransform): ), expansion_service or default_io_expansion_service(classpath), ) - - [email protected]_logical_type -class JdbcDateType(LogicalType[datetime.date, MillisInstant, str]): - """ - For internal use only; no backwards-compatibility guarantees. - - Support of Legacy JdbcIO DATE logical type. Deemed to change when Java JDBCIO - has been migrated to Beam portable logical types. - """ - def __init__(self, argument=""): - pass - - @classmethod - def representation_type(cls) -> type: - return MillisInstant - - @classmethod - def urn(cls): - return "beam:logical_type:javasdk_date:v1" - - @classmethod - def language_type(cls): - return datetime.date - - def to_representation_type(self, value: datetime.date) -> Timestamp: - return Timestamp.from_utc_datetime( - datetime.datetime.combine( - value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc)) - - def to_language_type(self, value: Timestamp) -> datetime.date: - return value.to_utc_datetime().date() - - @classmethod - def argument_type(cls): - return str - - def argument(self): - return "" - - @classmethod - def _from_typing(cls, typ): - return cls() - - [email protected]_logical_type -class JdbcTimeType(LogicalType[datetime.time, MillisInstant, str]): - """ - For internal use only; no backwards-compatibility guarantees. - - Support of Legacy JdbcIO TIME logical type. . Deemed to change when Java - JDBCIO has been migrated to Beam portable logical types. - """ - def __init__(self, argument=""): - pass - - @classmethod - def representation_type(cls) -> type: - return MillisInstant - - @classmethod - def urn(cls): - return "beam:logical_type:javasdk_time:v1" - - @classmethod - def language_type(cls): - return datetime.time - - def to_representation_type(self, value: datetime.date) -> Timestamp: - return Timestamp.from_utc_datetime( - datetime.datetime.combine( - datetime.datetime.utcfromtimestamp(0), - value, - tzinfo=datetime.timezone.utc)) - - def to_language_type(self, value: Timestamp) -> datetime.date: - return value.to_utc_datetime().time() - - @classmethod - def argument_type(cls): - return str - - def argument(self): - return "" - - @classmethod - def _from_typing(cls, typ): - return cls() diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 32dc2fd06ec..c21dde426fc 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -66,6 +66,7 @@ any backwards-compatibility guarantee. # pytype: skip-file +import datetime import decimal import logging from typing import Any @@ -1189,3 +1190,94 @@ class VariableString(PassThroughLogicalType[str, np.int32]): def argument(self): return self.max_length + + +# TODO: A temporary fix for missing jdbc logical types. +# See the discussion in https://github.com/apache/beam/issues/35738 for +# more detail. [email protected]_logical_type +class JdbcDateType(LogicalType[datetime.date, MillisInstant, str]): + """ + For internal use only; no backwards-compatibility guarantees. + + Support of Legacy JdbcIO DATE logical type. Deemed to change when Java JDBCIO + has been migrated to Beam portable logical types. + """ + def __init__(self, argument=""): + pass + + @classmethod + def representation_type(cls) -> type: + return MillisInstant + + @classmethod + def urn(cls): + return "beam:logical_type:javasdk_date:v1" + + @classmethod + def language_type(cls): + return datetime.date + + def to_representation_type(self, value: datetime.date) -> Timestamp: + return Timestamp.from_utc_datetime( + datetime.datetime.combine( + value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc)) + + def to_language_type(self, value: Timestamp) -> datetime.date: + return value.to_utc_datetime().date() + + @classmethod + def argument_type(cls): + return str + + def argument(self): + return "" + + @classmethod + def _from_typing(cls, typ): + return cls() + + [email protected]_logical_type +class JdbcTimeType(LogicalType[datetime.time, MillisInstant, str]): + """ + For internal use only; no backwards-compatibility guarantees. + + Support of Legacy JdbcIO TIME logical type. . Deemed to change when Java + JDBCIO has been migrated to Beam portable logical types. + """ + def __init__(self, argument=""): + pass + + @classmethod + def representation_type(cls) -> type: + return MillisInstant + + @classmethod + def urn(cls): + return "beam:logical_type:javasdk_time:v1" + + @classmethod + def language_type(cls): + return datetime.time + + def to_representation_type(self, value: datetime.time) -> Timestamp: + return Timestamp.from_utc_datetime( + datetime.datetime.combine( + datetime.datetime.utcfromtimestamp(0), + value, + tzinfo=datetime.timezone.utc)) + + def to_language_type(self, value: Timestamp) -> datetime.time: + return value.to_utc_datetime().time() + + @classmethod + def argument_type(cls): + return str + + def argument(self): + return "" + + @classmethod + def _from_typing(cls, typ): + return cls()
