This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new cce1db3fe8 [python] fix pypaimin timestamp non free time zone issue
(#6750)
cce1db3fe8 is described below
commit cce1db3fe82998316a3be4b29f25f3a70ec6e67e
Author: XiaoHongbo <[email protected]>
AuthorDate: Sat Dec 6 19:50:26 2025 +0800
[python] fix pypaimin timestamp non free time zone issue (#6750)
---
.../apache/paimon/manifest/ManifestFileTest.java | 34 +++++
.../paimon/table/system/ManifestsTableTest.java | 46 ++++++
paimon-python/pypaimon/data/__init__.py | 21 +++
paimon-python/pypaimon/data/timestamp.py | 167 +++++++++++++++++++++
.../pypaimon/manifest/manifest_file_manager.py | 22 ++-
.../pypaimon/manifest/schema/data_file_meta.py | 74 ++++++++-
.../pypaimon/tests/file_store_commit_test.py | 122 ++++++++-------
.../manifest/manifest_entry_identifier_test.py | 3 +-
.../pypaimon/tests/py36/rest_ao_read_write_test.py | 19 ++-
paimon-python/pypaimon/tests/reader_base_test.py | 19 ++-
.../pypaimon/tests/reader_primary_key_test.py | 43 ++++++
paimon-python/pypaimon/write/file_store_commit.py | 4 +-
paimon-python/pypaimon/write/writer/blob_writer.py | 6 +-
.../pypaimon/write/writer/data_blob_writer.py | 6 +-
paimon-python/pypaimon/write/writer/data_writer.py | 6 +-
15 files changed, 514 insertions(+), 78 deletions(-)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
index 142ebc6a47..2e45d904a5 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
@@ -32,6 +32,7 @@ import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.FileStorePathFactory;
import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
@@ -86,6 +87,39 @@ public class ManifestFileTest {
}
}
+ @Test
+ void testManifestCreationTimeTimestamp() {
+ List<ManifestEntry> entries = generateData();
+ ManifestFile manifestFile = createManifestFile(tempDir.toString());
+
+ List<ManifestFileMeta> actualMetas = manifestFile.write(entries);
+ List<ManifestEntry> actualEntries =
+ actualMetas.stream()
+ .flatMap(m -> manifestFile.read(m.fileName(),
m.fileSize()).stream())
+ .collect(Collectors.toList());
+
+ int creationTimesFound = 0;
+ for (ManifestEntry entry : actualEntries) {
+ if (entry.file().creationTime() != null) {
+ creationTimesFound++;
+ org.apache.paimon.data.Timestamp creationTime =
entry.file().creationTime();
+ assertThat(creationTime).isNotNull();
+ long epochMillis = entry.file().creationTimeEpochMillis();
+ assertThat(epochMillis).isPositive();
+ long expectedEpochMillis = creationTime.getMillisecond();
+ java.time.ZoneId systemZone = java.time.ZoneId.systemDefault();
+ java.time.ZoneOffset offset =
+ systemZone
+ .getRules()
+
.getOffset(java.time.Instant.ofEpochMilli(expectedEpochMillis));
+ expectedEpochMillis = expectedEpochMillis -
(offset.getTotalSeconds() * 1000L);
+ assertThat(epochMillis).isEqualTo(expectedEpochMillis);
+ }
+ }
+
+ assertThat(creationTimesFound).isPositive();
+ }
+
private List<ManifestEntry> generateData() {
List<ManifestEntry> entries = new ArrayList<>();
for (int i = 0; i < 100; i++) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
index c614ca2c27..7afd079b0d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
@@ -28,10 +28,13 @@ import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.types.DataTypes;
@@ -161,6 +164,49 @@ public class ManifestsTableTest extends TableTestBase {
"Specified parameter scan.snapshot-id = 3 is not exist, you
can set it in range from 1 to 2");
}
+ @Test
+ void testManifestCreationTimeTimestamp() throws Exception {
+ Identifier identifier = identifier("T_CreationTime");
+ Schema schema =
+ Schema.newBuilder()
+ .column("pk", DataTypes.INT())
+ .column("pt", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .option("bucket", "1")
+ .build();
+ catalog.createTable(identifier, schema, true);
+ Table testTable = catalog.getTable(identifier);
+
+ write(testTable, GenericRow.of(1, 1, 1), GenericRow.of(2, 2, 2));
+
+ FileStoreScan scan = ((FileStoreTable) testTable).store().newScan();
+ FileStoreScan.Plan plan = scan.plan();
+ List<ManifestEntry> entries = plan.files();
+
+ int creationTimesFound = 0;
+ for (org.apache.paimon.manifest.ManifestEntry entry : entries) {
+ if (entry.file().creationTime() != null) {
+ creationTimesFound++;
+ org.apache.paimon.data.Timestamp creationTime =
entry.file().creationTime();
+ assertThat(creationTime).isNotNull();
+ long epochMillis = entry.file().creationTimeEpochMillis();
+ assertThat(epochMillis).isPositive();
+ long expectedEpochMillis = creationTime.getMillisecond();
+ java.time.ZoneId systemZone = java.time.ZoneId.systemDefault();
+ java.time.ZoneOffset offset =
+ systemZone
+ .getRules()
+
.getOffset(java.time.Instant.ofEpochMilli(expectedEpochMillis));
+ expectedEpochMillis = expectedEpochMillis -
(offset.getTotalSeconds() * 1000L);
+ assertThat(epochMillis).isEqualTo(expectedEpochMillis);
+ }
+ }
+
+ assertThat(creationTimesFound).isPositive();
+ }
+
private List<InternalRow> getExpectedResult(long snapshotId) {
if (!snapshotManager.snapshotExists(snapshotId)) {
return Collections.emptyList();
diff --git a/paimon-python/pypaimon/data/__init__.py
b/paimon-python/pypaimon/data/__init__.py
new file mode 100644
index 0000000000..9f453647bb
--- /dev/null
+++ b/paimon-python/pypaimon/data/__init__.py
@@ -0,0 +1,21 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pypaimon.data.timestamp import Timestamp
+
+__all__ = ['Timestamp']
diff --git a/paimon-python/pypaimon/data/timestamp.py
b/paimon-python/pypaimon/data/timestamp.py
new file mode 100644
index 0000000000..ace4c1e1c6
--- /dev/null
+++ b/paimon-python/pypaimon/data/timestamp.py
@@ -0,0 +1,167 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from datetime import datetime, timedelta
+
+
+class Timestamp:
+ """
+ An internal data structure representing data of TimestampType.
+
+ This data structure is immutable and consists of a milliseconds and
nanos-of-millisecond since
+ 1970-01-01 00:00:00. It might be stored in a compact representation (as a
long value) if
+ values are small enough.
+
+ This class represents timezone-free timestamps
+ """
+
+ # the number of milliseconds in a day
+ MILLIS_PER_DAY = 86400000 # = 24 * 60 * 60 * 1000
+
+ MICROS_PER_MILLIS = 1000
+ NANOS_PER_MICROS = 1000
+
+ NANOS_PER_HOUR = 3_600_000_000_000
+ NANOS_PER_MINUTE = 60_000_000_000
+ NANOS_PER_SECOND = 1_000_000_000
+ NANOS_PER_MICROSECOND = 1_000
+
+ def __init__(self, millisecond: int, nano_of_millisecond: int = 0):
+ if not (0 <= nano_of_millisecond <= 999_999):
+ raise ValueError(
+ f"nano_of_millisecond must be between 0 and 999,999, got
{nano_of_millisecond}"
+ )
+ self._millisecond = millisecond
+ self._nano_of_millisecond = nano_of_millisecond
+
+ def get_millisecond(self) -> int:
+ """Returns the number of milliseconds since 1970-01-01 00:00:00."""
+ return self._millisecond
+
+ def get_nano_of_millisecond(self) -> int:
+ """
+ Returns the number of nanoseconds (the nanoseconds within the
milliseconds).
+ The value range is from 0 to 999,999.
+ """
+ return self._nano_of_millisecond
+
+ def to_local_date_time(self) -> datetime:
+ """Converts this Timestamp object to a datetime (timezone-free)."""
+ epoch = datetime(1970, 1, 1)
+ days = self._millisecond // self.MILLIS_PER_DAY
+ time_millis = self._millisecond % self.MILLIS_PER_DAY
+ if time_millis < 0:
+ days -= 1
+ time_millis += self.MILLIS_PER_DAY
+
+ microseconds = time_millis * 1000 + self._nano_of_millisecond // 1000
+ return epoch + timedelta(days=days, microseconds=microseconds)
+
+ def to_millis_timestamp(self) -> 'Timestamp':
+ return Timestamp.from_epoch_millis(self._millisecond)
+
+ def to_micros(self) -> int:
+ """Converts this Timestamp object to micros."""
+ micros = self._millisecond * self.MICROS_PER_MILLIS
+ return micros + self._nano_of_millisecond // self.NANOS_PER_MICROS
+
+ def __eq__(self, other):
+ if not isinstance(other, Timestamp):
+ return False
+ return (self._millisecond == other._millisecond and
+ self._nano_of_millisecond == other._nano_of_millisecond)
+
+ def __lt__(self, other):
+ if not isinstance(other, Timestamp):
+ return NotImplemented
+ if self._millisecond != other._millisecond:
+ return self._millisecond < other._millisecond
+ return self._nano_of_millisecond < other._nano_of_millisecond
+
+ def __le__(self, other):
+ return self == other or self < other
+
+ def __gt__(self, other):
+ if not isinstance(other, Timestamp):
+ return NotImplemented
+ if self._millisecond != other._millisecond:
+ return self._millisecond > other._millisecond
+ return self._nano_of_millisecond > other._nano_of_millisecond
+
+ def __ge__(self, other):
+ return self == other or self > other
+
+ def __hash__(self):
+ return hash((self._millisecond, self._nano_of_millisecond))
+
+ def __repr__(self):
+ return f"Timestamp(millisecond={self._millisecond},
nano_of_millisecond={self._nano_of_millisecond})"
+
+ def __str__(self):
+ return self.to_local_date_time().strftime("%Y-%m-%d %H:%M:%S.%f")
+
+ @staticmethod
+ def now() -> 'Timestamp':
+ """Creates an instance of Timestamp for now."""
+ return Timestamp.from_local_date_time(datetime.now())
+
+ @staticmethod
+ def from_epoch_millis(milliseconds: int, nanos_of_millisecond: int = 0) ->
'Timestamp':
+ """
+ Creates an instance of Timestamp from milliseconds.
+ Args:
+ milliseconds: the number of milliseconds since 1970-01-01 00:00:00
+ nanos_of_millisecond: the nanoseconds within the millisecond, from
0 to 999,999
+ """
+ return Timestamp(milliseconds, nanos_of_millisecond)
+
+ @staticmethod
+ def from_local_date_time(date_time: datetime) -> 'Timestamp':
+ """
+ Creates an instance of Timestamp from a datetime (timezone-free).
+
+ Args:
+ date_time: a datetime object (should be naive, without timezone)
+ """
+ if date_time.tzinfo is not None:
+ raise ValueError("datetime must be naive (no timezone)")
+
+ epoch_date = datetime(1970, 1, 1).date()
+ date_time_date = date_time.date()
+
+ epoch_day = (date_time_date - epoch_date).days
+ time_part = date_time.time()
+
+ nano_of_day = (
+ time_part.hour * Timestamp.NANOS_PER_HOUR
+ + time_part.minute * Timestamp.NANOS_PER_MINUTE
+ + time_part.second * Timestamp.NANOS_PER_SECOND
+ + time_part.microsecond * Timestamp.NANOS_PER_MICROSECOND
+ )
+
+ millisecond = epoch_day * Timestamp.MILLIS_PER_DAY + nano_of_day //
1_000_000
+ nano_of_millisecond = int(nano_of_day % 1_000_000)
+
+ return Timestamp(millisecond, nano_of_millisecond)
+
+ @staticmethod
+ def from_micros(micros: int) -> 'Timestamp':
+ """Creates an instance of Timestamp from micros."""
+ mills = micros // Timestamp.MICROS_PER_MILLIS
+ nanos = (micros - mills * Timestamp.MICROS_PER_MILLIS) *
Timestamp.NANOS_PER_MICROS
+ return Timestamp.from_epoch_millis(mills, int(nanos))
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index 5aefd122b0..f6ae41e3d3 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -21,6 +21,8 @@ from typing import List
import fastavro
+from datetime import datetime
+
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA,
ManifestEntry)
@@ -105,6 +107,22 @@ class ManifestFileManager:
max_values=BinaryRow(value_dict['_MAX_VALUES'], fields),
null_counts=value_dict['_NULL_COUNTS'],
)
+ # fastavro returns UTC-aware datetime for timestamp-millis, we
need to convert properly
+ from pypaimon.data.timestamp import Timestamp
+ creation_time_value = file_dict['_CREATION_TIME']
+ creation_time_ts = None
+ if creation_time_value is not None:
+ if isinstance(creation_time_value, datetime):
+ if creation_time_value.tzinfo:
+ epoch_millis = int(creation_time_value.timestamp() *
1000)
+ creation_time_ts =
Timestamp.from_epoch_millis(epoch_millis)
+ else:
+ creation_time_ts =
Timestamp.from_local_date_time(creation_time_value)
+ elif isinstance(creation_time_value, (int, float)):
+ creation_time_ts =
Timestamp.from_epoch_millis(int(creation_time_value))
+ else:
+ raise ValueError(f"Unexpected creation_time type:
{type(creation_time_value)}")
+
file_meta = DataFileMeta(
file_name=file_dict['_FILE_NAME'],
file_size=file_dict['_FILE_SIZE'],
@@ -118,7 +136,7 @@ class ManifestFileManager:
schema_id=file_dict['_SCHEMA_ID'],
level=file_dict['_LEVEL'],
extra_files=file_dict['_EXTRA_FILES'],
- creation_time=file_dict['_CREATION_TIME'],
+ creation_time=creation_time_ts,
delete_row_count=file_dict['_DELETE_ROW_COUNT'],
embedded_index=file_dict['_EMBEDDED_FILE_INDEX'],
file_source=file_dict['_FILE_SOURCE'],
@@ -187,7 +205,7 @@ class ManifestFileManager:
"_SCHEMA_ID": entry.file.schema_id,
"_LEVEL": entry.file.level,
"_EXTRA_FILES": entry.file.extra_files,
- "_CREATION_TIME": entry.file.creation_time,
+ "_CREATION_TIME":
entry.file.creation_time.get_millisecond() if entry.file.creation_time else
None,
"_DELETE_ROW_COUNT": entry.file.delete_row_count,
"_EMBEDDED_FILE_INDEX": entry.file.embedded_index,
"_FILE_SOURCE": entry.file.file_source,
diff --git a/paimon-python/pypaimon/manifest/schema/data_file_meta.py
b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
index d7942d1e9f..239c63dbba 100644
--- a/paimon-python/pypaimon/manifest/schema/data_file_meta.py
+++ b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
@@ -19,7 +19,9 @@
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional
+import time
+from pypaimon.data.timestamp import Timestamp
from pypaimon.manifest.schema.simple_stats import (KEY_STATS_SCHEMA,
VALUE_STATS_SCHEMA,
SimpleStats)
from pypaimon.table.row.generic_row import GenericRow
@@ -40,7 +42,7 @@ class DataFileMeta:
level: int
extra_files: List[str]
- creation_time: Optional[datetime] = None
+ creation_time: Optional[Timestamp] = None
delete_row_count: Optional[int] = None
embedded_index: Optional[bytes] = None
file_source: Optional[int] = None
@@ -52,6 +54,76 @@ class DataFileMeta:
# not a schema field, just for internal usage
file_path: str = None
+ def get_creation_time(self) -> Optional[Timestamp]:
+ return self.creation_time
+
+ def creation_time_epoch_millis(self) -> Optional[int]:
+ if self.creation_time is None:
+ return None
+ local_dt = self.creation_time.to_local_date_time()
+ local_time_struct = local_dt.timetuple()
+ local_timestamp = time.mktime(local_time_struct)
+ utc_timestamp = time.mktime(time.gmtime(local_timestamp))
+ tz_offset_seconds = int(local_timestamp - utc_timestamp)
+ return int((local_timestamp - tz_offset_seconds) * 1000)
+
+ def creation_time_as_datetime(self) -> Optional[datetime]:
+ if self.creation_time is None:
+ return None
+ return self.creation_time.to_local_date_time()
+
+ @classmethod
+ def create(
+ cls,
+ file_name: str,
+ file_size: int,
+ row_count: int,
+ min_key: GenericRow,
+ max_key: GenericRow,
+ key_stats: SimpleStats,
+ value_stats: SimpleStats,
+ min_sequence_number: int,
+ max_sequence_number: int,
+ schema_id: int,
+ level: int,
+ extra_files: List[str],
+ creation_time: Optional[Timestamp] = None,
+ delete_row_count: Optional[int] = None,
+ embedded_index: Optional[bytes] = None,
+ file_source: Optional[int] = None,
+ value_stats_cols: Optional[List[str]] = None,
+ external_path: Optional[str] = None,
+ first_row_id: Optional[int] = None,
+ write_cols: Optional[List[str]] = None,
+ file_path: Optional[str] = None,
+ ) -> 'DataFileMeta':
+ if creation_time is None:
+ creation_time = Timestamp.now()
+
+ return cls(
+ file_name=file_name,
+ file_size=file_size,
+ row_count=row_count,
+ min_key=min_key,
+ max_key=max_key,
+ key_stats=key_stats,
+ value_stats=value_stats,
+ min_sequence_number=min_sequence_number,
+ max_sequence_number=max_sequence_number,
+ schema_id=schema_id,
+ level=level,
+ extra_files=extra_files,
+ creation_time=creation_time,
+ delete_row_count=delete_row_count,
+ embedded_index=embedded_index,
+ file_source=file_source,
+ value_stats_cols=value_stats_cols,
+ external_path=external_path,
+ first_row_id=first_row_id,
+ write_cols=write_cols,
+ file_path=file_path,
+ )
+
def set_file_path(self, table_path: str, partition: GenericRow, bucket:
int):
path_builder = table_path.rstrip('/')
partition_dict = partition.to_dict()
diff --git a/paimon-python/pypaimon/tests/file_store_commit_test.py
b/paimon-python/pypaimon/tests/file_store_commit_test.py
index f1fe4de3d2..438ff6aeb8 100644
--- a/paimon-python/pypaimon/tests/file_store_commit_test.py
+++ b/paimon-python/pypaimon/tests/file_store_commit_test.py
@@ -61,20 +61,24 @@ class TestFileStoreCommit(unittest.TestCase):
file_store_commit = self._create_file_store_commit()
# Create test data
- creation_time = datetime(2024, 1, 15, 10, 30, 0)
- file_meta = DataFileMeta(
+ creation_time_dt = datetime(2024, 1, 15, 10, 30, 0)
+ from pypaimon.data.timestamp import Timestamp
+ from pypaimon.table.row.generic_row import GenericRow
+ from pypaimon.manifest.schema.simple_stats import SimpleStats
+ creation_time = Timestamp.from_local_date_time(creation_time_dt)
+ file_meta = DataFileMeta.create(
file_name="test_file_1.parquet",
file_size=1024 * 1024, # 1MB
row_count=10000,
- min_key=None,
- max_key=None,
- key_stats=None,
- value_stats=None,
+ min_key=GenericRow([], []),
+ max_key=GenericRow([], []),
+ key_stats=SimpleStats.empty_stats(),
+ value_stats=SimpleStats.empty_stats(),
min_sequence_number=1,
max_sequence_number=100,
schema_id=0,
level=0,
- extra_files=None,
+ extra_files=[],
creation_time=creation_time,
external_path=None,
first_row_id=None,
@@ -99,7 +103,8 @@ class TestFileStoreCommit(unittest.TestCase):
self.assertEqual(stat.record_count, 10000)
self.assertEqual(stat.file_count, 1)
self.assertEqual(stat.file_size_in_bytes, 1024 * 1024)
- self.assertEqual(stat.last_file_creation_time,
int(creation_time.timestamp() * 1000))
+ expected_time = file_meta.creation_time_epoch_millis()
+ self.assertEqual(stat.last_file_creation_time, expected_time)
def test_generate_partition_statistics_multiple_files_same_partition(
self, mock_manifest_list_manager, mock_manifest_file_manager,
mock_snapshot_manager):
@@ -107,38 +112,41 @@ class TestFileStoreCommit(unittest.TestCase):
# Create FileStoreCommit instance
file_store_commit = self._create_file_store_commit()
- creation_time_1 = datetime(2024, 1, 15, 10, 30, 0)
- creation_time_2 = datetime(2024, 1, 15, 11, 30, 0) # Later time
+ from pypaimon.data.timestamp import Timestamp
+ from pypaimon.table.row.generic_row import GenericRow
+ from pypaimon.manifest.schema.simple_stats import SimpleStats
+ creation_time_1 = Timestamp.from_local_date_time(datetime(2024, 1, 15,
10, 30, 0))
+ creation_time_2 = Timestamp.from_local_date_time(datetime(2024, 1, 15,
11, 30, 0)) # Later time
- file_meta_1 = DataFileMeta(
+ file_meta_1 = DataFileMeta.create(
file_name="test_file_1.parquet",
file_size=1024 * 1024, # 1MB
row_count=10000,
- min_key=None,
- max_key=None,
- key_stats=None,
- value_stats=None,
+ min_key=GenericRow([], []),
+ max_key=GenericRow([], []),
+ key_stats=SimpleStats.empty_stats(),
+ value_stats=SimpleStats.empty_stats(),
min_sequence_number=1,
max_sequence_number=100,
schema_id=0,
level=0,
- extra_files=None,
+ extra_files=[],
creation_time=creation_time_1
)
- file_meta_2 = DataFileMeta(
+ file_meta_2 = DataFileMeta.create(
file_name="test_file_2.parquet",
file_size=2 * 1024 * 1024, # 2MB
row_count=15000,
- min_key=None,
- max_key=None,
- key_stats=None,
- value_stats=None,
+ min_key=GenericRow([], []),
+ max_key=GenericRow([], []),
+ key_stats=SimpleStats.empty_stats(),
+ value_stats=SimpleStats.empty_stats(),
min_sequence_number=101,
max_sequence_number=200,
schema_id=0,
level=0,
- extra_files=None,
+ extra_files=[],
creation_time=creation_time_2
)
@@ -159,8 +167,8 @@ class TestFileStoreCommit(unittest.TestCase):
self.assertEqual(stat.record_count, 25000) # 10000 + 15000
self.assertEqual(stat.file_count, 2)
self.assertEqual(stat.file_size_in_bytes, 3 * 1024 * 1024) # 1MB + 2MB
- # Should have the latest creation time
- self.assertEqual(stat.last_file_creation_time,
int(creation_time_2.timestamp() * 1000))
+ expected_time = file_meta_2.creation_time_epoch_millis()
+ self.assertEqual(stat.last_file_creation_time, expected_time)
def test_generate_partition_statistics_multiple_partitions(
self, mock_manifest_list_manager, mock_manifest_file_manager,
mock_snapshot_manager):
@@ -168,22 +176,26 @@ class TestFileStoreCommit(unittest.TestCase):
# Create FileStoreCommit instance
file_store_commit = self._create_file_store_commit()
- creation_time = datetime(2024, 1, 15, 10, 30, 0)
+ creation_time_dt = datetime(2024, 1, 15, 10, 30, 0)
+ from pypaimon.data.timestamp import Timestamp
+ from pypaimon.table.row.generic_row import GenericRow
+ from pypaimon.manifest.schema.simple_stats import SimpleStats
+ creation_time = Timestamp.from_local_date_time(creation_time_dt)
# File for partition 1
- file_meta_1 = DataFileMeta(
+ file_meta_1 = DataFileMeta.create(
file_name="test_file_1.parquet",
file_size=1024 * 1024,
row_count=10000,
- min_key=None,
- max_key=None,
- key_stats=None,
- value_stats=None,
+ min_key=GenericRow([], []),
+ max_key=GenericRow([], []),
+ key_stats=SimpleStats.empty_stats(),
+ value_stats=SimpleStats.empty_stats(),
min_sequence_number=1,
max_sequence_number=100,
schema_id=0,
level=0,
- extra_files=None,
+ extra_files=[],
creation_time=creation_time,
external_path=None,
first_row_id=None,
@@ -191,19 +203,19 @@ class TestFileStoreCommit(unittest.TestCase):
)
# File for partition 2
- file_meta_2 = DataFileMeta(
+ file_meta_2 = DataFileMeta.create(
file_name="test_file_2.parquet",
file_size=2 * 1024 * 1024,
row_count=20000,
- min_key=None,
- max_key=None,
- key_stats=None,
- value_stats=None,
+ min_key=GenericRow([], []),
+ max_key=GenericRow([], []),
+ key_stats=SimpleStats.empty_stats(),
+ value_stats=SimpleStats.empty_stats(),
min_sequence_number=101,
max_sequence_number=200,
schema_id=0,
level=0,
- extra_files=None,
+ extra_files=[],
creation_time=creation_time,
external_path=None,
first_row_id=None,
@@ -255,20 +267,24 @@ class TestFileStoreCommit(unittest.TestCase):
# Create FileStoreCommit instance
file_store_commit = self._create_file_store_commit()
- creation_time = datetime(2024, 1, 15, 10, 30, 0)
- file_meta = DataFileMeta(
+ creation_time_dt = datetime(2024, 1, 15, 10, 30, 0)
+ from pypaimon.data.timestamp import Timestamp
+ from pypaimon.table.row.generic_row import GenericRow
+ from pypaimon.manifest.schema.simple_stats import SimpleStats
+ creation_time = Timestamp.from_local_date_time(creation_time_dt)
+ file_meta = DataFileMeta.create(
file_name="test_file_1.parquet",
file_size=1024 * 1024,
row_count=10000,
- min_key=None,
- max_key=None,
- key_stats=None,
- value_stats=None,
+ min_key=GenericRow([], []),
+ max_key=GenericRow([], []),
+ key_stats=SimpleStats.empty_stats(),
+ value_stats=SimpleStats.empty_stats(),
min_sequence_number=1,
max_sequence_number=100,
schema_id=0,
level=0,
- extra_files=None,
+ extra_files=[],
creation_time=creation_time,
external_path=None,
first_row_id=None,
@@ -312,7 +328,6 @@ class TestFileStoreCommit(unittest.TestCase):
schema_id=0,
level=0,
extra_files=None,
- creation_time=None # No creation time
)
commit_message = CommitMessage(
@@ -338,20 +353,23 @@ class TestFileStoreCommit(unittest.TestCase):
file_store_commit = self._create_file_store_commit()
# Table has 2 partition keys but partition tuple has 3 values
- file_meta = DataFileMeta(
+ from pypaimon.data.timestamp import Timestamp
+ from pypaimon.table.row.generic_row import GenericRow
+ from pypaimon.manifest.schema.simple_stats import SimpleStats
+ file_meta = DataFileMeta.create(
file_name="test_file_1.parquet",
file_size=1024 * 1024,
row_count=10000,
- min_key=None,
- max_key=None,
- key_stats=None,
- value_stats=None,
+ min_key=GenericRow([], []),
+ max_key=GenericRow([], []),
+ key_stats=SimpleStats.empty_stats(),
+ value_stats=SimpleStats.empty_stats(),
min_sequence_number=1,
max_sequence_number=100,
schema_id=0,
level=0,
- extra_files=None,
- creation_time=datetime(2024, 1, 15, 10, 30, 0)
+ extra_files=[],
+ creation_time=Timestamp.from_local_date_time(datetime(2024, 1, 15,
10, 30, 0))
)
commit_message = CommitMessage(
diff --git
a/paimon-python/pypaimon/tests/manifest/manifest_entry_identifier_test.py
b/paimon-python/pypaimon/tests/manifest/manifest_entry_identifier_test.py
index bbe1833f2c..5b86e39a37 100644
--- a/paimon-python/pypaimon/tests/manifest/manifest_entry_identifier_test.py
+++ b/paimon-python/pypaimon/tests/manifest/manifest_entry_identifier_test.py
@@ -55,6 +55,7 @@ class ManifestEntryIdentifierTest(unittest.TestCase):
def _create_file_meta(self, file_name, level=0, extra_files=None,
external_path=None):
"""Helper to create DataFileMeta with common defaults."""
+ from pypaimon.data.timestamp import Timestamp
return DataFileMeta(
file_name=file_name,
file_size=1024,
@@ -76,7 +77,7 @@ class ManifestEntryIdentifierTest(unittest.TestCase):
schema_id=0,
level=level,
extra_files=extra_files or [],
- creation_time=1234567890,
+ creation_time=Timestamp.from_epoch_millis(1234567890),
delete_row_count=0,
embedded_index=None,
file_source=None,
diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
index 6e6d57f963..2c3a6775c0 100644
--- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
@@ -17,7 +17,7 @@ limitations under the License.
"""
import logging
import time
-from datetime import date, datetime
+from datetime import date
from decimal import Decimal
from unittest.mock import Mock
@@ -208,10 +208,12 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
partition = GenericRow(['East', 'Boston'], partition_fields)
# Create ADD entry
+ from pypaimon.data.timestamp import Timestamp
add_file_meta = Mock(spec=DataFileMeta)
add_file_meta.row_count = 200
add_file_meta.file_size = 2048
- add_file_meta.creation_time = datetime.now()
+ add_file_meta.creation_time = Timestamp.now()
+ add_file_meta.creation_time_epoch_millis =
Mock(return_value=int(time.time() * 1000))
add_entry = ManifestEntry(
kind=0, # ADD
@@ -225,7 +227,8 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
delete_file_meta = Mock(spec=DataFileMeta)
delete_file_meta.row_count = 80
delete_file_meta.file_size = 800
- delete_file_meta.creation_time = datetime.now()
+ delete_file_meta.creation_time = Timestamp.now()
+ delete_file_meta.creation_time_epoch_millis =
Mock(return_value=int(time.time() * 1000))
delete_entry = ManifestEntry(
kind=1, # DELETE
@@ -261,10 +264,12 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
DataField(1, "city", AtomicType("STRING"))
]
partition1 = GenericRow(['East', 'Boston'], partition_fields)
+ from pypaimon.data.timestamp import Timestamp
file_meta1 = Mock(spec=DataFileMeta)
file_meta1.row_count = 150
file_meta1.file_size = 1500
- file_meta1.creation_time = datetime.now()
+ file_meta1.creation_time = Timestamp.now()
+ file_meta1.creation_time_epoch_millis =
Mock(return_value=int(time.time() * 1000))
entry1 = ManifestEntry(
kind=0, # ADD
@@ -279,7 +284,8 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
file_meta2 = Mock(spec=DataFileMeta)
file_meta2.row_count = 75
file_meta2.file_size = 750
- file_meta2.creation_time = datetime.now()
+ file_meta2.creation_time = Timestamp.now()
+ file_meta2.creation_time_epoch_millis =
Mock(return_value=int(time.time() * 1000))
entry2 = ManifestEntry(
kind=1, # DELETE
@@ -816,6 +822,7 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
)
# Create DataFileMeta with value_stats_cols
+ from pypaimon.data.timestamp import Timestamp
file_meta = DataFileMeta(
file_name=f"test-file-{test_name}.parquet",
file_size=1024,
@@ -829,7 +836,7 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
schema_id=0,
level=0,
extra_files=[],
- creation_time=1234567890,
+ creation_time=Timestamp.from_epoch_millis(1234567890),
delete_row_count=0,
embedded_index=None,
file_source=None,
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py
b/paimon-python/pypaimon/tests/reader_base_test.py
index fdcb9f1ea2..b41cd2e05f 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -296,10 +296,13 @@ class ReaderBasicTest(unittest.TestCase):
partition = GenericRow(['East', 'Boston'], partition_fields)
# Create ADD entry
+ from pypaimon.data.timestamp import Timestamp
+ import time
add_file_meta = Mock(spec=DataFileMeta)
add_file_meta.row_count = 200
add_file_meta.file_size = 2048
- add_file_meta.creation_time = datetime.now()
+ add_file_meta.creation_time = Timestamp.now()
+ add_file_meta.creation_time_epoch_millis =
Mock(return_value=int(time.time() * 1000))
add_entry = ManifestEntry(
kind=0, # ADD
@@ -313,7 +316,8 @@ class ReaderBasicTest(unittest.TestCase):
delete_file_meta = Mock(spec=DataFileMeta)
delete_file_meta.row_count = 80
delete_file_meta.file_size = 800
- delete_file_meta.creation_time = datetime.now()
+ delete_file_meta.creation_time = Timestamp.now()
+ delete_file_meta.creation_time_epoch_millis =
Mock(return_value=int(time.time() * 1000))
delete_entry = ManifestEntry(
kind=1, # DELETE
@@ -349,10 +353,13 @@ class ReaderBasicTest(unittest.TestCase):
DataField(1, "city", AtomicType("STRING"))
]
partition1 = GenericRow(['East', 'Boston'], partition_fields)
+ from pypaimon.data.timestamp import Timestamp
+ import time
file_meta1 = Mock(spec=DataFileMeta)
file_meta1.row_count = 150
file_meta1.file_size = 1500
- file_meta1.creation_time = datetime.now()
+ file_meta1.creation_time = Timestamp.now()
+ file_meta1.creation_time_epoch_millis =
Mock(return_value=int(time.time() * 1000))
entry1 = ManifestEntry(
kind=0, # ADD
@@ -367,7 +374,8 @@ class ReaderBasicTest(unittest.TestCase):
file_meta2 = Mock(spec=DataFileMeta)
file_meta2.row_count = 75
file_meta2.file_size = 750
- file_meta2.creation_time = datetime.now()
+ file_meta2.creation_time = Timestamp.now()
+ file_meta2.creation_time_epoch_millis =
Mock(return_value=int(time.time() * 1000))
entry2 = ManifestEntry(
kind=1, # DELETE
@@ -600,6 +608,7 @@ class ReaderBasicTest(unittest.TestCase):
)
# Create DataFileMeta with value_stats_cols
+ from pypaimon.data.timestamp import Timestamp
file_meta = DataFileMeta(
file_name=f"test-file-{test_name}.parquet",
file_size=1024,
@@ -613,7 +622,7 @@ class ReaderBasicTest(unittest.TestCase):
schema_id=0,
level=0,
extra_files=[],
- creation_time=1234567890,
+ creation_time=Timestamp.from_epoch_millis(1234567890),
delete_row_count=0,
embedded_index=None,
file_source=None,
diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py
b/paimon-python/pypaimon/tests/reader_primary_key_test.py
index b62783852a..762229438e 100644
--- a/paimon-python/pypaimon/tests/reader_primary_key_test.py
+++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py
@@ -344,6 +344,49 @@ class PkReaderTest(unittest.TestCase):
}, schema=self.pa_schema).sort_by('user_id')
self.assertEqual(expected, actual)
+ def test_manifest_creation_time_timestamp(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
+ partition_keys=['dt'],
+ primary_keys=['user_id', 'dt'],
+ options={'bucket': '2'})
+ self.catalog.create_table('default.test_manifest_creation_time',
schema, False)
+ table = self.catalog.get_table('default.test_manifest_creation_time')
+
+ self._write_test_table(table)
+
+ snapshot_manager = SnapshotManager(table)
+ latest_snapshot = snapshot_manager.get_latest_snapshot()
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ manifest_list_manager =
table_scan.starting_scanner.manifest_list_manager
+ manifest_files = manifest_list_manager.read_all(latest_snapshot)
+
+ manifest_file_manager =
table_scan.starting_scanner.manifest_file_manager
+ creation_times_found = []
+ for manifest_file_meta in manifest_files:
+ entries = manifest_file_manager.read(manifest_file_meta.file_name,
drop_stats=False)
+ for entry in entries:
+ if entry.file.creation_time is not None:
+ creation_time = entry.file.creation_time
+ self.assertIsNotNone(creation_time)
+ epoch_millis = entry.file.creation_time_epoch_millis()
+ self.assertIsNotNone(epoch_millis)
+ self.assertGreater(epoch_millis, 0)
+ import time
+ expected_epoch_millis = creation_time.get_millisecond()
+ local_dt = creation_time.to_local_date_time()
+ local_time_struct = local_dt.timetuple()
+ local_timestamp = time.mktime(local_time_struct)
+ local_time_struct_utc = time.gmtime(local_timestamp)
+ utc_timestamp = time.mktime(local_time_struct_utc)
+ expected_epoch_millis = int(utc_timestamp * 1000)
+ self.assertEqual(epoch_millis, expected_epoch_millis)
+ creation_times_found.append(epoch_millis)
+
+ self.assertGreater(
+ len(creation_times_found), 0,
+ "At least one manifest entry should have creation_time")
+
def _write_test_table(self, table):
write_builder = table.new_batch_write_builder()
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index 014e8fbf6e..5c642ca6b1 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -308,9 +308,9 @@ class FileStoreCommit:
file_size_in_bytes = file_meta.file_size if entry.kind == 0 else
file_meta.file_size * -1
file_count = 1 if entry.kind == 0 else -1
- # Convert creation_time to milliseconds (Java uses epoch millis)
+ # Use epoch millis
if file_meta.creation_time:
- file_creation_time = int(file_meta.creation_time.timestamp() *
1000)
+ file_creation_time = file_meta.creation_time_epoch_millis()
else:
file_creation_time = int(time.time() * 1000)
diff --git a/paimon-python/pypaimon/write/writer/blob_writer.py
b/paimon-python/pypaimon/write/writer/blob_writer.py
index c8d0d45076..6dc7b177b1 100644
--- a/paimon-python/pypaimon/write/writer/blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/blob_writer.py
@@ -22,6 +22,7 @@ import pyarrow as pa
from typing import Optional, Tuple, Dict
from pypaimon.common.core_options import CoreOptions
+from pypaimon.data.timestamp import Timestamp
from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter
from pypaimon.write.writer.blob_file_writer import BlobFileWriter
@@ -159,7 +160,6 @@ class BlobWriter(AppendOnlyDataWriter):
def _add_file_metadata(self, file_name: str, file_path: str,
data_or_row_count, file_size: int,
external_path: Optional[str] = None):
"""Add file metadata to committed_files."""
- from datetime import datetime
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.table.row.generic_row import GenericRow
@@ -191,7 +191,7 @@ class BlobWriter(AppendOnlyDataWriter):
max_seq = self.sequence_generator.current - 1
self.sequence_generator.start = self.sequence_generator.current
- self.committed_files.append(DataFileMeta(
+ self.committed_files.append(DataFileMeta.create(
file_name=file_name,
file_size=file_size,
row_count=row_count,
@@ -207,7 +207,7 @@ class BlobWriter(AppendOnlyDataWriter):
schema_id=self.table.table_schema.id,
level=0,
extra_files=[],
- creation_time=datetime.now(),
+ creation_time=Timestamp.now(),
delete_row_count=0,
file_source=0, # FileSource.APPEND = 0
value_stats_cols=None,
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index 65107856ee..079b408e66 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -18,12 +18,12 @@
import logging
import uuid
-from datetime import datetime
from typing import List, Optional, Tuple, Dict
import pyarrow as pa
from pypaimon.common.core_options import CoreOptions
+from pypaimon.data.timestamp import Timestamp
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.table.row.generic_row import GenericRow
@@ -286,7 +286,7 @@ class DataBlobWriter(DataWriter):
self.sequence_generator.start = self.sequence_generator.current
- return DataFileMeta(
+ return DataFileMeta.create(
file_name=file_name,
file_size=self.file_io.get_file_size(file_path),
row_count=data.num_rows,
@@ -305,7 +305,7 @@ class DataBlobWriter(DataWriter):
schema_id=self.table.table_schema.id,
level=0,
extra_files=[],
- creation_time=datetime.now(),
+ creation_time=Timestamp.now(),
delete_row_count=0,
file_source=0,
value_stats_cols=self.normal_column_names,
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index 8656a58879..e043da0eee 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -19,11 +19,11 @@ import pyarrow as pa
import pyarrow.compute as pc
import uuid
from abc import ABC, abstractmethod
-from datetime import datetime
from typing import Dict, List, Optional, Tuple
from pypaimon.common.core_options import CoreOptions
from pypaimon.common.external_path_provider import ExternalPathProvider
+from pypaimon.data.timestamp import Timestamp
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.schema.data_types import PyarrowFieldParser
@@ -211,7 +211,7 @@ class DataWriter(ABC):
min_seq = self.sequence_generator.start
max_seq = self.sequence_generator.current
self.sequence_generator.start = self.sequence_generator.current
- self.committed_files.append(DataFileMeta(
+ self.committed_files.append(DataFileMeta.create(
file_name=file_name,
file_size=self.file_io.get_file_size(file_path),
row_count=data.num_rows,
@@ -232,7 +232,7 @@ class DataWriter(ABC):
schema_id=self.table.table_schema.id,
level=0,
extra_files=[],
- creation_time=datetime.now(),
+ creation_time=Timestamp.now(),
delete_row_count=0,
file_source=0,
value_stats_cols=None, # None means all columns in the data have
statistics