This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 53f8367adf6 Portable Date Type (Python changes) (#38078)
53f8367adf6 is described below

commit 53f8367adf658c231bbcab6ebbcba620c4b30659
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Mon Apr 13 10:10:18 2026 -0700

    Portable Date Type (Python changes) (#38078)
    
    * portable date python changes
    
    * trigger ITs
    
    * typo
    
    * add todo link
    
    * disable managed transforms
---
 .../beam_PostCommit_Python_Xlang_Gcp_Dataflow.json |  2 +-
 .../beam_PostCommit_Python_Xlang_Gcp_Direct.json   |  2 +-
 .../beam_PostCommit_Python_Xlang_IO_Dataflow.json  |  2 +-
 .../beam_PostCommit_Python_Xlang_IO_Direct.json    |  2 +-
 sdks/python/apache_beam/io/jdbc.py                 |  5 ++++
 sdks/python/apache_beam/portability/common_urns.py |  1 +
 .../transforms/managed_iceberg_it_test.py          | 23 +++++++++++------
 sdks/python/apache_beam/typehints/schemas.py       | 29 +++++++++++++++++++++-
 sdks/python/apache_beam/typehints/schemas_test.py  |  2 ++
 9 files changed, 56 insertions(+), 12 deletions(-)

diff --git 
a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json 
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
index bb5da04014e..e3d6056a5de 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-  "modification": 15
+  "modification": 1
 }
diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json 
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
index 83346d34aee..e3d6056a5de 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-  "modification": 16
+  "modification": 1
 }
diff --git 
a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json 
b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json
index b60f5c4cc3c..e3d6056a5de 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-  "modification": 0
+  "modification": 1
 }
diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json 
b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json
index b2683333323..e3d6056a5de 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-  "modification": 2
+  "modification": 1
 }
diff --git a/sdks/python/apache_beam/io/jdbc.py 
b/sdks/python/apache_beam/io/jdbc.py
index df5d7f21a34..e6646443bfb 100644
--- a/sdks/python/apache_beam/io/jdbc.py
+++ b/sdks/python/apache_beam/io/jdbc.py
@@ -360,6 +360,11 @@ class ReadFromJdbc(ExternalTransform):
                    of the output PCollection elements. This bypasses automatic
                    schema inference during pipeline construction.
     """
+    # override new portable Date type with the current Jdbc type
+    # TODO(https://github.com/apache/beam/issues/28359):
+    #  switch JdbcIO to return portable Date type
+    LogicalType.register_logical_type(JdbcDateType)
+
     classpath = classpath or DEFAULT_JDBC_CLASSPATH
 
     dataSchema = None
diff --git a/sdks/python/apache_beam/portability/common_urns.py 
b/sdks/python/apache_beam/portability/common_urns.py
index 74d9a39bb05..7777f63ffbe 100644
--- a/sdks/python/apache_beam/portability/common_urns.py
+++ b/sdks/python/apache_beam/portability/common_urns.py
@@ -92,3 +92,4 @@ fixed_bytes = LogicalTypes.Enum.FIXED_BYTES
 var_bytes = LogicalTypes.Enum.VAR_BYTES
 fixed_char = LogicalTypes.Enum.FIXED_CHAR
 var_char = LogicalTypes.Enum.VAR_CHAR
+date = LogicalTypes.Enum.DATE
diff --git a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py 
b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py
index b1e53a79bd4..458855c4b96 100644
--- a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py
+++ b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 
+import datetime
 import os
 import unittest
 import uuid
@@ -33,13 +34,13 @@ from apache_beam.testing.util import equal_to
     "EXPANSION_JARS environment var is not provided, "
     "indicating that jars have not been built")
 class ManagedIcebergIT(unittest.TestCase):
-  WAREHOUSE = "gs://temp-storage-for-end-to-end-tests/xlang-python-using-java"
+  WAREHOUSE = "gs://temp-storage-for-end-to-end-tests"
 
   def setUp(self):
     self.test_pipeline = TestPipeline(is_integration_test=True)
     self.args = self.test_pipeline.get_full_options_as_args()
     self.args.extend([
-        '--experiments=enable_managed_transforms',
+        # '--experiments=enable_managed_transforms',
     ])
 
   def _create_row(self, num: int):
@@ -49,16 +50,24 @@ class ManagedIcebergIT(unittest.TestCase):
         bytes_=bytes(num),
         bool_=(num % 2 == 0),
         float_=(num + float(num) / 100),
-        arr_=[num, num, num])
+        arr_=[num, num, num],
+        date_=datetime.date.today() - datetime.timedelta(days=num))
 
   def test_write_read_pipeline(self):
+    biglake_catalog_props = {
+        'type': 'rest',
+        'uri': 'https://biglake.googleapis.com/iceberg/v1/restcatalog',
+        'warehouse': self.WAREHOUSE,
+        'header.x-goog-user-project': 'apache-beam-testing',
+        'rest.auth.type': 'google',
+        'io-impl': 'org.apache.iceberg.gcp.gcs.GCSFileIO',
+        'header.X-Iceberg-Access-Delegation': 'vended-credentials',
+        'rest-metrics-reporting-enabled': 'false'
+    }
     iceberg_config = {
         "table": "test_iceberg_write_read.test_" + uuid.uuid4().hex,
         "catalog_name": "default",
-        "catalog_properties": {
-            "type": "hadoop",
-            "warehouse": self.WAREHOUSE,
-        }
+        "catalog_properties": biglake_catalog_props
     }
 
     rows = [self._create_row(i) for i in range(100)]
diff --git a/sdks/python/apache_beam/typehints/schemas.py 
b/sdks/python/apache_beam/typehints/schemas.py
index d2c4db8cabc..9e337f080fb 100644
--- a/sdks/python/apache_beam/typehints/schemas.py
+++ b/sdks/python/apache_beam/typehints/schemas.py
@@ -34,6 +34,7 @@ Imposes a mapping between common Python types and Beam 
portable schemas
   bytes       <-----> BYTES
   ByteString  ------> BYTES
   Timestamp   <-----> LogicalType(urn="beam:logical_type:micros_instant:v1")
+  datetime.date <---> LogicalType(urn="beam:logical_type:date:v1")
   Decimal     <-----> LogicalType(urn="beam:logical_type:fixed_decimal:v1")
   Mapping     <-----> MapType
   Sequence    <-----> ArrayType
@@ -1004,6 +1005,33 @@ class MicrosInstant(NoArgumentLogicalType[Timestamp,
     return Timestamp(seconds=int(value.seconds), micros=int(value.micros))
 
 
+@LogicalType._register_internal
+class Date(NoArgumentLogicalType[datetime.date, np.int64]):
+  """Date logical type that handles ``datetime.date``, days since epoch."""
+  EPOCH = datetime.date(1970, 1, 1)
+
+  @classmethod
+  def urn(cls):
+    return common_urns.date.urn
+
+  @classmethod
+  def representation_type(cls):
+    # type: () -> type
+    return np.int64
+
+  @classmethod
+  def language_type(cls):
+    return datetime.date
+
+  def to_representation_type(self, value):
+    # type: (datetime.date) -> np.int64
+    return (value - self.EPOCH).days
+
+  def to_language_type(self, value):
+    # type: (np.int64) -> datetime.date
+    return self.EPOCH + datetime.timedelta(days=value)
+
+
 @LogicalType._register_internal
 class PythonCallable(NoArgumentLogicalType[PythonCallableWithSource, str]):
   """A logical type for PythonCallableSource objects."""
@@ -1244,7 +1272,6 @@ class VariableString(PassThroughLogicalType[str, 
np.int32]):
 # TODO: A temporary fix for missing jdbc logical types.
 # See the discussion in https://github.com/apache/beam/issues/35738 for
 # more detail.
-@LogicalType._register_internal
 class JdbcDateType(LogicalType[datetime.date, MillisInstant, str]):
   """
   For internal use only; no backwards-compatibility guarantees.
diff --git a/sdks/python/apache_beam/typehints/schemas_test.py 
b/sdks/python/apache_beam/typehints/schemas_test.py
index 5a5d7396ab3..d70bf0c47d3 100644
--- a/sdks/python/apache_beam/typehints/schemas_test.py
+++ b/sdks/python/apache_beam/typehints/schemas_test.py
@@ -20,6 +20,7 @@
 # pytype: skip-file
 
 import dataclasses
+import datetime
 import itertools
 import pickle
 import unittest
@@ -105,6 +106,7 @@ class ComplexSchema(NamedTuple):
   optional_array: Optional[Sequence[np.float32]]
   array_optional: Sequence[Optional[bool]]
   timestamp: Timestamp
+  date: datetime.date
 
 
 def get_test_beam_fieldtype_protos():

Reply via email to