This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 39de7cc31c [core] support null value for record-level.time-field 
(#4839)
39de7cc31c is described below

commit 39de7cc31cab53f64c99500c1c0bf7d7c25b548f
Author: xiangyu0xf <[email protected]>
AuthorDate: Mon Jan 6 22:14:24 2025 +0800

    [core] support null value for record-level.time-field (#4839)
---
 .../org/apache/paimon/io/RecordLevelExpire.java    | 50 ++++++++++++++--------
 .../apache/paimon/table/RecordLevelExpireTest.java | 12 ++++--
 ...a => RecordLevelExpireWithAggregationTest.java} | 37 ++++++++--------
 .../RecordLevelExpireWithMillisecondTest.java      |  9 ++--
 .../RecordLevelExpireWithTimestampBaseTest.java    |  9 ++--
 5 files changed, 71 insertions(+), 46 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
index e43a9d03d9..05c70dcbe1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
@@ -33,15 +33,14 @@ import org.apache.paimon.types.TimestampType;
 import javax.annotation.Nullable;
 
 import java.time.Duration;
+import java.util.Optional;
 import java.util.function.Function;
 
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
 /** A factory to create {@link RecordReader} expires records by time. */
 public class RecordLevelExpire {
 
     private final int expireTime;
-    private final Function<InternalRow, Integer> fieldGetter;
+    private final Function<InternalRow, Optional<Integer>> fieldGetter;
 
     @Nullable
     public static RecordLevelExpire create(CoreOptions options, RowType 
rowType) {
@@ -65,11 +64,13 @@ public class RecordLevelExpire {
         }
 
         DataType dataType = rowType.getField(timeFieldName).type();
-        Function<InternalRow, Integer> fieldGetter = 
createFieldGetter(dataType, fieldIndex);
+        Function<InternalRow, Optional<Integer>> fieldGetter =
+                createFieldGetter(dataType, fieldIndex);
         return new RecordLevelExpire((int) expireTime.getSeconds(), 
fieldGetter);
     }
 
-    private RecordLevelExpire(int expireTime, Function<InternalRow, Integer> 
fieldGetter) {
+    private RecordLevelExpire(
+            int expireTime, Function<InternalRow, Optional<Integer>> 
fieldGetter) {
         this.expireTime = expireTime;
         this.fieldGetter = fieldGetter;
     }
@@ -80,26 +81,46 @@ public class RecordLevelExpire {
 
     private RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
         int currentTime = (int) (System.currentTimeMillis() / 1000);
-        return reader.filter(kv -> currentTime <= 
fieldGetter.apply(kv.value()) + expireTime);
+        return reader.filter(
+                keyValue ->
+                        fieldGetter
+                                .apply(keyValue.value())
+                                .map(integer -> currentTime <= integer + 
expireTime)
+                                .orElse(true));
     }
 
-    private static Function<InternalRow, Integer> createFieldGetter(
+    private static Function<InternalRow, Optional<Integer>> createFieldGetter(
             DataType dataType, int fieldIndex) {
-        final Function<InternalRow, Integer> fieldGetter;
+        final Function<InternalRow, Optional<Integer>> fieldGetter;
         if (dataType instanceof IntType) {
-            fieldGetter = row -> row.getInt(fieldIndex);
+            fieldGetter =
+                    row ->
+                            row.isNullAt(fieldIndex)
+                                    ? Optional.empty()
+                                    : Optional.of(row.getInt(fieldIndex));
         } else if (dataType instanceof BigIntType) {
             fieldGetter =
                     row -> {
+                        if (row.isNullAt(fieldIndex)) {
+                            return Optional.empty();
+                        }
                         long value = row.getLong(fieldIndex);
                         // If it is milliseconds, convert it to seconds.
-                        return (int) (value >= 1_000_000_000_000L ? value / 
1000 : value);
+                        return Optional.of(
+                                (int) (value >= 1_000_000_000_000L ? value / 
1000 : value));
                     };
         } else if (dataType instanceof TimestampType
                 || dataType instanceof LocalZonedTimestampType) {
             int precision = DataTypeChecks.getPrecision(dataType);
             fieldGetter =
-                    row -> (int) (row.getTimestamp(fieldIndex, 
precision).getMillisecond() / 1000);
+                    row ->
+                            row.isNullAt(fieldIndex)
+                                    ? Optional.empty()
+                                    : Optional.of(
+                                            (int)
+                                                    
(row.getTimestamp(fieldIndex, precision)
+                                                                    
.getMillisecond()
+                                                            / 1000));
         } else {
             throw new IllegalArgumentException(
                     String.format(
@@ -107,11 +128,6 @@ public class RecordLevelExpire {
                             dataType));
         }
 
-        return row -> {
-            checkArgument(
-                    !row.isNullAt(fieldIndex),
-                    "Time field for record-level expire should not be null.");
-            return fieldGetter.apply(row);
-        };
+        return fieldGetter;
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
index bd13b0ecf8..bce57c2752 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
@@ -38,7 +38,6 @@ import java.time.Duration;
 import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 class RecordLevelExpireTest extends PrimaryKeyTableTestBase {
 
@@ -102,12 +101,17 @@ class RecordLevelExpireTest extends 
PrimaryKeyTableTestBase {
                 .containsExactlyInAnyOrder(GenericRow.of(currentSecs + 60 * 
60));
 
         writeCommit(GenericRow.of(1, 5, null));
+        compact(1);
         assertThat(query())
                 .containsExactlyInAnyOrder(
                         GenericRow.of(1, 4, currentSecs + 60 * 60), 
GenericRow.of(1, 5, null));
 
-        // null time field for record-level expire is not supported yet.
-        assertThatThrownBy(() -> compact(1))
-                .hasMessageContaining("Time field for record-level expire 
should not be null.");
+        writeCommit(GenericRow.of(1, 5, currentSecs + 60 * 60));
+        // compact, merged
+        compact(1);
+        assertThat(query())
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 4, currentSecs + 60 * 60),
+                        GenericRow.of(1, 5, currentSecs + 60 * 60));
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithAggregationTest.java
similarity index 79%
copy from 
paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
copy to 
paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithAggregationTest.java
index bd13b0ecf8..02ed8ee3ed 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithAggregationTest.java
@@ -29,6 +29,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.TraceableFileIO;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -38,9 +39,8 @@ import java.time.Duration;
 import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-class RecordLevelExpireTest extends PrimaryKeyTableTestBase {
+class RecordLevelExpireWithAggregationTest extends PrimaryKeyTableTestBase {
 
     @Override
     @BeforeEach
@@ -71,20 +71,23 @@ class RecordLevelExpireTest extends PrimaryKeyTableTestBase 
{
         options.set(CoreOptions.BUCKET, 1);
         options.set(CoreOptions.RECORD_LEVEL_EXPIRE_TIME, 
Duration.ofSeconds(1));
         options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD, "col1");
+        options.set(CoreOptions.MERGE_ENGINE, 
CoreOptions.MergeEngine.AGGREGATE);
+        options.set(CoreOptions.FIELDS_DEFAULT_AGG_FUNC, 
"last_non_null_value");
+        options.set("fields.col1.ignore-retract", "true");
         return options;
     }
 
     @Test
     public void test() throws Exception {
         writeCommit(GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 2));
-
-        // can be queried
-        assertThat(query())
-                .containsExactlyInAnyOrder(GenericRow.of(1, 1, 1), 
GenericRow.of(1, 2, 2));
+        // disordered retract message will generate null fields
+        writeCommit(
+                GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 3, 1),
+                GenericRow.ofKind(RowKind.DELETE, 1, 3, 1));
 
         int currentSecs = (int) (System.currentTimeMillis() / 1000);
-        writeCommit(GenericRow.of(1, 3, currentSecs));
         writeCommit(GenericRow.of(1, 4, currentSecs + 60 * 60));
+
         Thread.sleep(2000);
 
         // no compaction, can be queried
@@ -92,22 +95,22 @@ class RecordLevelExpireTest extends PrimaryKeyTableTestBase 
{
                 .containsExactlyInAnyOrder(
                         GenericRow.of(1, 1, 1),
                         GenericRow.of(1, 2, 2),
-                        GenericRow.of(1, 3, currentSecs),
+                        GenericRow.of(1, 3, null),
                         GenericRow.of(1, 4, currentSecs + 60 * 60));
 
         // compact, expired
         compact(1);
-        assertThat(query()).containsExactlyInAnyOrder(GenericRow.of(1, 4, 
currentSecs + 60 * 60));
-        assertThat(query(new int[] {2}))
-                .containsExactlyInAnyOrder(GenericRow.of(currentSecs + 60 * 
60));
-
-        writeCommit(GenericRow.of(1, 5, null));
         assertThat(query())
                 .containsExactlyInAnyOrder(
-                        GenericRow.of(1, 4, currentSecs + 60 * 60), 
GenericRow.of(1, 5, null));
+                        GenericRow.of(1, 3, null), GenericRow.of(1, 4, 
currentSecs + 60 * 60));
 
-        // null time field for record-level expire is not supported yet.
-        assertThatThrownBy(() -> compact(1))
-                .hasMessageContaining("Time field for record-level expire 
should not be null.");
+        writeCommit(GenericRow.of(1, 3, currentSecs + 60 * 60));
+        compact(1);
+
+        // compact, merged
+        assertThat(query())
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 4, currentSecs + 60 * 60),
+                        GenericRow.of(1, 3, currentSecs + 60 * 60));
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java
index 295058bfbd..f9cfbf0ce3 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java
@@ -38,7 +38,6 @@ import java.time.Duration;
 import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 class RecordLevelExpireWithMillisecondTest extends PrimaryKeyTableTestBase {
     @Override
@@ -102,8 +101,10 @@ class RecordLevelExpireWithMillisecondTest extends 
PrimaryKeyTableTestBase {
         assertThat(query(new int[] {0, 1}))
                 .containsExactlyInAnyOrder(GenericRow.of(1, 4), 
GenericRow.of(1, 5));
 
-        // null time field for record-level expire is not supported yet.
-        assertThatThrownBy(() -> compact(1))
-                .hasMessageContaining("Time field for record-level expire 
should not be null.");
+        writeCommit(GenericRow.of(1, 5, currentSecs + 60 * 60 * 1000));
+        // compact, merged
+        compact(1);
+        assertThat(query(new int[] {0, 1}))
+                .containsExactlyInAnyOrder(GenericRow.of(1, 4), 
GenericRow.of(1, 5));
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
index f352693759..f9aef643f6 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
@@ -29,7 +29,6 @@ import org.junit.jupiter.api.Test;
 import java.time.Duration;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 abstract class RecordLevelExpireWithTimestampBaseTest extends 
PrimaryKeyTableTestBase {
 
@@ -67,8 +66,10 @@ abstract class RecordLevelExpireWithTimestampBaseTest 
extends PrimaryKeyTableTes
         assertThat(query(new int[] {0, 1}))
                 .containsExactlyInAnyOrder(GenericRow.of(1, 3), 
GenericRow.of(1, 5));
 
-        // null time field for record-level expire is not supported yet.
-        assertThatThrownBy(() -> compact(1))
-                .hasMessageContaining("Time field for record-level expire 
should not be null.");
+        writeCommit(GenericRow.of(1, 5, timestamp3));
+        // compact, merged
+        compact(1);
+        assertThat(query(new int[] {0, 1}))
+                .containsExactlyInAnyOrder(GenericRow.of(1, 3), 
GenericRow.of(1, 5));
     }
 }

Reply via email to