This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1ffca5da95 [core] fix record expire get second overflow bug (#5667)
1ffca5da95 is described below
commit 1ffca5da95bdfe94f745a64c614faeeb13a70fd0
Author: jerry <[email protected]>
AuthorDate: Tue May 27 11:06:07 2025 +0800
[core] fix record expire get second overflow bug (#5667)
---
.../org/apache/paimon/io/RecordLevelExpire.java | 58 +++++++++++-----------
.../apache/paimon/table/RecordLevelExpireTest.java | 25 ++++++++++
2 files changed, 53 insertions(+), 30 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
index e8cae827ca..9e05ed1674 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
@@ -20,6 +20,7 @@ package org.apache.paimon.io;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
@@ -52,8 +53,8 @@ public class RecordLevelExpire {
private static final Logger LOG =
LoggerFactory.getLogger(RecordLevelExpire.class);
- private final int expireTime;
- private final Function<InternalRow, Optional<Integer>> fieldGetter;
+ private final long expireTime;
+ private final Function<InternalRow, Optional<Long>> fieldGetter;
private final ConcurrentMap<Long, TableSchema> tableSchemas;
private final TableSchema schema;
@@ -84,20 +85,19 @@ public class RecordLevelExpire {
}
DataType dataType = rowType.getField(timeFieldName).type();
- Function<InternalRow, Optional<Integer>> fieldGetter =
- createFieldGetter(dataType, fieldIndex);
+ Function<InternalRow, Optional<Long>> fieldGetter =
+ createFieldGetterAndConvertToSecond(dataType, fieldIndex);
LOG.info(
"Create RecordExpire. expireTime is {}s,timeField is {}",
- (int) expireTime.getSeconds(),
+ expireTime.getSeconds(),
timeFieldName);
- return new RecordLevelExpire(
- (int) expireTime.getSeconds(), fieldGetter, schema,
schemaManager);
+ return new RecordLevelExpire(expireTime.getSeconds(), fieldGetter,
schema, schemaManager);
}
private RecordLevelExpire(
- int expireTime,
- Function<InternalRow, Optional<Integer>> fieldGetter,
+ long expireTime,
+ Function<InternalRow, Optional<Long>> fieldGetter,
TableSchema schema,
SchemaManager schemaManager) {
this.expireTime = expireTime;
@@ -132,8 +132,8 @@ public class RecordLevelExpire {
minValues = result.minValues();
}
- int currentTime = (int) (System.currentTimeMillis() / 1000);
- Optional<Integer> minTime = fieldGetter.apply(minValues);
+ long currentTime = System.currentTimeMillis() / 1000L;
+ Optional<Long> minTime = fieldGetter.apply(minValues);
if (LOG.isDebugEnabled()) {
LOG.debug(
@@ -155,25 +155,16 @@ public class RecordLevelExpire {
return file -> wrap(readerFactory.createRecordReader(file));
}
- private RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
- int currentTime = (int) (System.currentTimeMillis() / 1000);
- return reader.filter(
- keyValue ->
- fieldGetter
- .apply(keyValue.value())
- .map(integer -> currentTime <= integer +
expireTime)
- .orElse(true));
- }
-
- private static Function<InternalRow, Optional<Integer>> createFieldGetter(
+ @VisibleForTesting
+ public static Function<InternalRow, Optional<Long>>
createFieldGetterAndConvertToSecond(
DataType dataType, int fieldIndex) {
- final Function<InternalRow, Optional<Integer>> fieldGetter;
+ final Function<InternalRow, Optional<Long>> fieldGetter;
if (dataType instanceof IntType) {
fieldGetter =
row ->
row.isNullAt(fieldIndex)
? Optional.empty()
- : Optional.of(row.getInt(fieldIndex));
+ : Optional.of((long)
row.getInt(fieldIndex));
} else if (dataType instanceof BigIntType) {
fieldGetter =
row -> {
@@ -182,8 +173,7 @@ public class RecordLevelExpire {
}
long value = row.getLong(fieldIndex);
// If it is milliseconds, convert it to seconds.
- return Optional.of(
- (int) (value >= 1_000_000_000_000L ? value /
1000 : value));
+ return Optional.of(value >= 1_000_000_000_000L ? value
/ 1000L : value);
};
} else if (dataType instanceof TimestampType
|| dataType instanceof LocalZonedTimestampType) {
@@ -193,10 +183,8 @@ public class RecordLevelExpire {
row.isNullAt(fieldIndex)
? Optional.empty()
: Optional.of(
- (int)
-
(row.getTimestamp(fieldIndex, precision)
-
.getMillisecond()
- / 1000));
+ row.getTimestamp(fieldIndex,
precision).getMillisecond()
+ / 1000L);
} else {
throw new IllegalArgumentException(
String.format(
@@ -207,6 +195,16 @@ public class RecordLevelExpire {
return fieldGetter;
}
+ private RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
+ long currentTime = System.currentTimeMillis() / 1000L;
+ return reader.filter(
+ keyValue ->
+ fieldGetter
+ .apply(keyValue.value())
+ .map(integer -> currentTime <= integer +
expireTime)
+ .orElse(true));
+ }
+
private TableSchema scanTableSchema(long id) {
return tableSchemas.computeIfAbsent(
id, key -> key == schema.id() ? schema :
schemaManager.schema(id));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
index cb5c517973..b24eb527e4 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
@@ -25,6 +25,8 @@ import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.RecordLevelExpire;
@@ -43,7 +45,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
+import java.util.function.Function;
import static org.assertj.core.api.Assertions.assertThat;
@@ -81,6 +85,27 @@ class RecordLevelExpireTest extends PrimaryKeyTableTestBase {
return options;
}
+ @Test
+ public void testConvertFieldToSecond() {
+ Function<InternalRow, Optional<Long>> fieldGetter =
+
RecordLevelExpire.createFieldGetterAndConvertToSecond(DataTypes.INT(), 0);
+ assertThat(fieldGetter.apply(GenericRow.of(1))).get().isEqualTo(1L);
+
assertThat(fieldGetter.apply(GenericRow.of(2147483647))).get().isEqualTo(2147483647L);
+ fieldGetter =
RecordLevelExpire.createFieldGetterAndConvertToSecond(DataTypes.BIGINT(), 0);
+
assertThat(fieldGetter.apply(GenericRow.of(2147483649L))).get().isEqualTo(2147483649L);
+ assertThat(fieldGetter.apply(GenericRow.of(1_000_000_000_000L)))
+ .get()
+ .isEqualTo(1_000_000_000L);
+ assertThat(fieldGetter.apply(GenericRow.of(1_000_000_000_001L)))
+ .get()
+ .isEqualTo(1_000_000_000L);
+ fieldGetter =
+
RecordLevelExpire.createFieldGetterAndConvertToSecond(DataTypes.TIMESTAMP(6),
0);
+
assertThat(fieldGetter.apply(GenericRow.of(Timestamp.fromEpochMillis(2147483649L))))
+ .get()
+ .isEqualTo(2147483L);
+ }
+
@Test
public void test() throws Exception {
writeCommit(GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 2));