This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 53f8367adf6 Portable Date Type (Python changes) (#38078)
53f8367adf6 is described below
commit 53f8367adf658c231bbcab6ebbcba620c4b30659
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Mon Apr 13 10:10:18 2026 -0700
Portable Date Type (Python changes) (#38078)
* portable date python changes
* trigger ITs
* typo
* add todo link
* disable managed transforms
---
.../beam_PostCommit_Python_Xlang_Gcp_Dataflow.json | 2 +-
.../beam_PostCommit_Python_Xlang_Gcp_Direct.json | 2 +-
.../beam_PostCommit_Python_Xlang_IO_Dataflow.json | 2 +-
.../beam_PostCommit_Python_Xlang_IO_Direct.json | 2 +-
sdks/python/apache_beam/io/jdbc.py | 5 ++++
sdks/python/apache_beam/portability/common_urns.py | 1 +
.../transforms/managed_iceberg_it_test.py | 23 +++++++++++------
sdks/python/apache_beam/typehints/schemas.py | 29 +++++++++++++++++++++-
sdks/python/apache_beam/typehints/schemas_test.py | 2 ++
9 files changed, 56 insertions(+), 12 deletions(-)
diff --git
a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
index bb5da04014e..e3d6056a5de 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run",
- "modification": 15
+ "modification": 1
}
diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
index 83346d34aee..e3d6056a5de 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run",
- "modification": 16
+ "modification": 1
}
diff --git
a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json
b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json
index b60f5c4cc3c..e3d6056a5de 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run",
- "modification": 0
+ "modification": 1
}
diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json
b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json
index b2683333323..e3d6056a5de 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run",
- "modification": 2
+ "modification": 1
}
diff --git a/sdks/python/apache_beam/io/jdbc.py
b/sdks/python/apache_beam/io/jdbc.py
index df5d7f21a34..e6646443bfb 100644
--- a/sdks/python/apache_beam/io/jdbc.py
+++ b/sdks/python/apache_beam/io/jdbc.py
@@ -360,6 +360,11 @@ class ReadFromJdbc(ExternalTransform):
of the output PCollection elements. This bypasses automatic
schema inference during pipeline construction.
"""
+ # override new portable Date type with the current Jdbc type
+ # TODO(https://github.com/apache/beam/issues/28359):
+ # switch JdbcIO to return portable Date type
+ LogicalType.register_logical_type(JdbcDateType)
+
classpath = classpath or DEFAULT_JDBC_CLASSPATH
dataSchema = None
diff --git a/sdks/python/apache_beam/portability/common_urns.py
b/sdks/python/apache_beam/portability/common_urns.py
index 74d9a39bb05..7777f63ffbe 100644
--- a/sdks/python/apache_beam/portability/common_urns.py
+++ b/sdks/python/apache_beam/portability/common_urns.py
@@ -92,3 +92,4 @@ fixed_bytes = LogicalTypes.Enum.FIXED_BYTES
var_bytes = LogicalTypes.Enum.VAR_BYTES
fixed_char = LogicalTypes.Enum.FIXED_CHAR
var_char = LogicalTypes.Enum.VAR_CHAR
+date = LogicalTypes.Enum.DATE
diff --git a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py
b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py
index b1e53a79bd4..458855c4b96 100644
--- a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py
+++ b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py
@@ -15,6 +15,7 @@
# limitations under the License.
#
+import datetime
import os
import unittest
import uuid
@@ -33,13 +34,13 @@ from apache_beam.testing.util import equal_to
"EXPANSION_JARS environment var is not provided, "
"indicating that jars have not been built")
class ManagedIcebergIT(unittest.TestCase):
- WAREHOUSE = "gs://temp-storage-for-end-to-end-tests/xlang-python-using-java"
+ WAREHOUSE = "gs://temp-storage-for-end-to-end-tests"
def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
self.args = self.test_pipeline.get_full_options_as_args()
self.args.extend([
- '--experiments=enable_managed_transforms',
+ # '--experiments=enable_managed_transforms',
])
def _create_row(self, num: int):
@@ -49,16 +50,24 @@ class ManagedIcebergIT(unittest.TestCase):
bytes_=bytes(num),
bool_=(num % 2 == 0),
float_=(num + float(num) / 100),
- arr_=[num, num, num])
+ arr_=[num, num, num],
+ date_=datetime.date.today() - datetime.timedelta(days=num))
def test_write_read_pipeline(self):
+ biglake_catalog_props = {
+ 'type': 'rest',
+ 'uri': 'https://biglake.googleapis.com/iceberg/v1/restcatalog',
+ 'warehouse': self.WAREHOUSE,
+ 'header.x-goog-user-project': 'apache-beam-testing',
+ 'rest.auth.type': 'google',
+ 'io-impl': 'org.apache.iceberg.gcp.gcs.GCSFileIO',
+ 'header.X-Iceberg-Access-Delegation': 'vended-credentials',
+ 'rest-metrics-reporting-enabled': 'false'
+ }
iceberg_config = {
"table": "test_iceberg_write_read.test_" + uuid.uuid4().hex,
"catalog_name": "default",
- "catalog_properties": {
- "type": "hadoop",
- "warehouse": self.WAREHOUSE,
- }
+ "catalog_properties": biglake_catalog_props
}
rows = [self._create_row(i) for i in range(100)]
diff --git a/sdks/python/apache_beam/typehints/schemas.py
b/sdks/python/apache_beam/typehints/schemas.py
index d2c4db8cabc..9e337f080fb 100644
--- a/sdks/python/apache_beam/typehints/schemas.py
+++ b/sdks/python/apache_beam/typehints/schemas.py
@@ -34,6 +34,7 @@ Imposes a mapping between common Python types and Beam
portable schemas
bytes <-----> BYTES
ByteString ------> BYTES
Timestamp <-----> LogicalType(urn="beam:logical_type:micros_instant:v1")
+ datetime.date <---> LogicalType(urn="beam:logical_type:date:v1")
Decimal <-----> LogicalType(urn="beam:logical_type:fixed_decimal:v1")
Mapping <-----> MapType
Sequence <-----> ArrayType
@@ -1004,6 +1005,33 @@ class MicrosInstant(NoArgumentLogicalType[Timestamp,
return Timestamp(seconds=int(value.seconds), micros=int(value.micros))
+@LogicalType._register_internal
+class Date(NoArgumentLogicalType[datetime.date, np.int64]):
+ """Date logical type that handles ``datetime.date``, days since epoch."""
+ EPOCH = datetime.date(1970, 1, 1)
+
+ @classmethod
+ def urn(cls):
+ return common_urns.date.urn
+
+ @classmethod
+ def representation_type(cls):
+ # type: () -> type
+ return np.int64
+
+ @classmethod
+ def language_type(cls):
+ return datetime.date
+
+ def to_representation_type(self, value):
+ # type: (datetime.date) -> np.int64
+ return (value - self.EPOCH).days
+
+ def to_language_type(self, value):
+ # type: (np.int64) -> datetime.date
+ return self.EPOCH + datetime.timedelta(days=value)
+
+
@LogicalType._register_internal
class PythonCallable(NoArgumentLogicalType[PythonCallableWithSource, str]):
"""A logical type for PythonCallableSource objects."""
@@ -1244,7 +1272,6 @@ class VariableString(PassThroughLogicalType[str,
np.int32]):
# TODO: A temporary fix for missing jdbc logical types.
# See the discussion in https://github.com/apache/beam/issues/35738 for
# more detail.
-@LogicalType._register_internal
class JdbcDateType(LogicalType[datetime.date, MillisInstant, str]):
"""
For internal use only; no backwards-compatibility guarantees.
diff --git a/sdks/python/apache_beam/typehints/schemas_test.py
b/sdks/python/apache_beam/typehints/schemas_test.py
index 5a5d7396ab3..d70bf0c47d3 100644
--- a/sdks/python/apache_beam/typehints/schemas_test.py
+++ b/sdks/python/apache_beam/typehints/schemas_test.py
@@ -20,6 +20,7 @@
# pytype: skip-file
import dataclasses
+import datetime
import itertools
import pickle
import unittest
@@ -105,6 +106,7 @@ class ComplexSchema(NamedTuple):
optional_array: Optional[Sequence[np.float32]]
array_optional: Sequence[Optional[bool]]
timestamp: Timestamp
+ date: datetime.date
def get_test_beam_fieldtype_protos():