This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ffca5da95 [core] fix record expire get second overflow bug (#5667)
1ffca5da95 is described below

commit 1ffca5da95bdfe94f745a64c614faeeb13a70fd0
Author: jerry <[email protected]>
AuthorDate: Tue May 27 11:06:07 2025 +0800

    [core] fix record expire get second overflow bug (#5667)
---
 .../org/apache/paimon/io/RecordLevelExpire.java    | 58 +++++++++++-----------
 .../apache/paimon/table/RecordLevelExpireTest.java | 25 ++++++++++
 2 files changed, 53 insertions(+), 30 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
index e8cae827ca..9e05ed1674 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
@@ -20,6 +20,7 @@ package org.apache.paimon.io;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
@@ -52,8 +53,8 @@ public class RecordLevelExpire {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(RecordLevelExpire.class);
 
-    private final int expireTime;
-    private final Function<InternalRow, Optional<Integer>> fieldGetter;
+    private final long expireTime;
+    private final Function<InternalRow, Optional<Long>> fieldGetter;
 
     private final ConcurrentMap<Long, TableSchema> tableSchemas;
     private final TableSchema schema;
@@ -84,20 +85,19 @@ public class RecordLevelExpire {
         }
 
         DataType dataType = rowType.getField(timeFieldName).type();
-        Function<InternalRow, Optional<Integer>> fieldGetter =
-                createFieldGetter(dataType, fieldIndex);
+        Function<InternalRow, Optional<Long>> fieldGetter =
+                createFieldGetterAndConvertToSecond(dataType, fieldIndex);
 
         LOG.info(
                 "Create RecordExpire. expireTime is {}s,timeField is {}",
-                (int) expireTime.getSeconds(),
+                expireTime.getSeconds(),
                 timeFieldName);
-        return new RecordLevelExpire(
-                (int) expireTime.getSeconds(), fieldGetter, schema, 
schemaManager);
+        return new RecordLevelExpire(expireTime.getSeconds(), fieldGetter, 
schema, schemaManager);
     }
 
     private RecordLevelExpire(
-            int expireTime,
-            Function<InternalRow, Optional<Integer>> fieldGetter,
+            long expireTime,
+            Function<InternalRow, Optional<Long>> fieldGetter,
             TableSchema schema,
             SchemaManager schemaManager) {
         this.expireTime = expireTime;
@@ -132,8 +132,8 @@ public class RecordLevelExpire {
             minValues = result.minValues();
         }
 
-        int currentTime = (int) (System.currentTimeMillis() / 1000);
-        Optional<Integer> minTime = fieldGetter.apply(minValues);
+        long currentTime = System.currentTimeMillis() / 1000L;
+        Optional<Long> minTime = fieldGetter.apply(minValues);
 
         if (LOG.isDebugEnabled()) {
             LOG.debug(
@@ -155,25 +155,16 @@ public class RecordLevelExpire {
         return file -> wrap(readerFactory.createRecordReader(file));
     }
 
-    private RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
-        int currentTime = (int) (System.currentTimeMillis() / 1000);
-        return reader.filter(
-                keyValue ->
-                        fieldGetter
-                                .apply(keyValue.value())
-                                .map(integer -> currentTime <= integer + 
expireTime)
-                                .orElse(true));
-    }
-
-    private static Function<InternalRow, Optional<Integer>> createFieldGetter(
+    @VisibleForTesting
+    public static Function<InternalRow, Optional<Long>> 
createFieldGetterAndConvertToSecond(
             DataType dataType, int fieldIndex) {
-        final Function<InternalRow, Optional<Integer>> fieldGetter;
+        final Function<InternalRow, Optional<Long>> fieldGetter;
         if (dataType instanceof IntType) {
             fieldGetter =
                     row ->
                             row.isNullAt(fieldIndex)
                                     ? Optional.empty()
-                                    : Optional.of(row.getInt(fieldIndex));
+                                    : Optional.of((long) 
row.getInt(fieldIndex));
         } else if (dataType instanceof BigIntType) {
             fieldGetter =
                     row -> {
@@ -182,8 +173,7 @@ public class RecordLevelExpire {
                         }
                         long value = row.getLong(fieldIndex);
                         // If it is milliseconds, convert it to seconds.
-                        return Optional.of(
-                                (int) (value >= 1_000_000_000_000L ? value / 
1000 : value));
+                        return Optional.of(value >= 1_000_000_000_000L ? value 
/ 1000L : value);
                     };
         } else if (dataType instanceof TimestampType
                 || dataType instanceof LocalZonedTimestampType) {
@@ -193,10 +183,8 @@ public class RecordLevelExpire {
                             row.isNullAt(fieldIndex)
                                     ? Optional.empty()
                                     : Optional.of(
-                                            (int)
-                                                    
(row.getTimestamp(fieldIndex, precision)
-                                                                    
.getMillisecond()
-                                                            / 1000));
+                                            row.getTimestamp(fieldIndex, 
precision).getMillisecond()
+                                                    / 1000L);
         } else {
             throw new IllegalArgumentException(
                     String.format(
@@ -207,6 +195,16 @@ public class RecordLevelExpire {
         return fieldGetter;
     }
 
+    private RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
+        long currentTime = System.currentTimeMillis() / 1000L;
+        return reader.filter(
+                keyValue ->
+                        fieldGetter
+                                .apply(keyValue.value())
+                                .map(integer -> currentTime <= integer + 
expireTime)
+                                .orElse(true));
+    }
+
     private TableSchema scanTableSchema(long id) {
         return tableSchemas.computeIfAbsent(
                 id, key -> key == schema.id() ? schema : 
schemaManager.schema(id));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
index cb5c517973..b24eb527e4 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
@@ -25,6 +25,8 @@ import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
 import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.RecordLevelExpire;
@@ -43,7 +45,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
+import java.util.function.Function;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -81,6 +85,27 @@ class RecordLevelExpireTest extends PrimaryKeyTableTestBase {
         return options;
     }
 
+    @Test
+    public void testConvertFieldToSecond() {
+        Function<InternalRow, Optional<Long>> fieldGetter =
+                
RecordLevelExpire.createFieldGetterAndConvertToSecond(DataTypes.INT(), 0);
+        assertThat(fieldGetter.apply(GenericRow.of(1))).get().isEqualTo(1L);
+        
assertThat(fieldGetter.apply(GenericRow.of(2147483647))).get().isEqualTo(2147483647L);
+        fieldGetter = 
RecordLevelExpire.createFieldGetterAndConvertToSecond(DataTypes.BIGINT(), 0);
+        
assertThat(fieldGetter.apply(GenericRow.of(2147483649L))).get().isEqualTo(2147483649L);
+        assertThat(fieldGetter.apply(GenericRow.of(1_000_000_000_000L)))
+                .get()
+                .isEqualTo(1_000_000_000L);
+        assertThat(fieldGetter.apply(GenericRow.of(1_000_000_000_001L)))
+                .get()
+                .isEqualTo(1_000_000_000L);
+        fieldGetter =
+                
RecordLevelExpire.createFieldGetterAndConvertToSecond(DataTypes.TIMESTAMP(6), 
0);
+        
assertThat(fieldGetter.apply(GenericRow.of(Timestamp.fromEpochMillis(2147483649L))))
+                .get()
+                .isEqualTo(2147483L);
+    }
+
     @Test
     public void test() throws Exception {
         writeCommit(GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 2));

Reply via email to