This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 39de7cc31c [core] support null value for record-level.time-field
(#4839)
39de7cc31c is described below
commit 39de7cc31cab53f64c99500c1c0bf7d7c25b548f
Author: xiangyu0xf <[email protected]>
AuthorDate: Mon Jan 6 22:14:24 2025 +0800
[core] support null value for record-level.time-field (#4839)
---
.../org/apache/paimon/io/RecordLevelExpire.java | 50 ++++++++++++++--------
.../apache/paimon/table/RecordLevelExpireTest.java | 12 ++++--
...a => RecordLevelExpireWithAggregationTest.java} | 37 ++++++++--------
.../RecordLevelExpireWithMillisecondTest.java | 9 ++--
.../RecordLevelExpireWithTimestampBaseTest.java | 9 ++--
5 files changed, 71 insertions(+), 46 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
index e43a9d03d9..05c70dcbe1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
@@ -33,15 +33,14 @@ import org.apache.paimon.types.TimestampType;
import javax.annotation.Nullable;
import java.time.Duration;
+import java.util.Optional;
import java.util.function.Function;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
/** A factory to create {@link RecordReader} expires records by time. */
public class RecordLevelExpire {
private final int expireTime;
- private final Function<InternalRow, Integer> fieldGetter;
+ private final Function<InternalRow, Optional<Integer>> fieldGetter;
@Nullable
public static RecordLevelExpire create(CoreOptions options, RowType
rowType) {
@@ -65,11 +64,13 @@ public class RecordLevelExpire {
}
DataType dataType = rowType.getField(timeFieldName).type();
- Function<InternalRow, Integer> fieldGetter =
createFieldGetter(dataType, fieldIndex);
+ Function<InternalRow, Optional<Integer>> fieldGetter =
+ createFieldGetter(dataType, fieldIndex);
return new RecordLevelExpire((int) expireTime.getSeconds(),
fieldGetter);
}
- private RecordLevelExpire(int expireTime, Function<InternalRow, Integer>
fieldGetter) {
+ private RecordLevelExpire(
+ int expireTime, Function<InternalRow, Optional<Integer>>
fieldGetter) {
this.expireTime = expireTime;
this.fieldGetter = fieldGetter;
}
@@ -80,26 +81,46 @@ public class RecordLevelExpire {
private RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
int currentTime = (int) (System.currentTimeMillis() / 1000);
- return reader.filter(kv -> currentTime <=
fieldGetter.apply(kv.value()) + expireTime);
+ return reader.filter(
+ keyValue ->
+ fieldGetter
+ .apply(keyValue.value())
+ .map(integer -> currentTime <= integer +
expireTime)
+ .orElse(true));
}
- private static Function<InternalRow, Integer> createFieldGetter(
+ private static Function<InternalRow, Optional<Integer>> createFieldGetter(
DataType dataType, int fieldIndex) {
- final Function<InternalRow, Integer> fieldGetter;
+ final Function<InternalRow, Optional<Integer>> fieldGetter;
if (dataType instanceof IntType) {
- fieldGetter = row -> row.getInt(fieldIndex);
+ fieldGetter =
+ row ->
+ row.isNullAt(fieldIndex)
+ ? Optional.empty()
+ : Optional.of(row.getInt(fieldIndex));
} else if (dataType instanceof BigIntType) {
fieldGetter =
row -> {
+ if (row.isNullAt(fieldIndex)) {
+ return Optional.empty();
+ }
long value = row.getLong(fieldIndex);
// If it is milliseconds, convert it to seconds.
- return (int) (value >= 1_000_000_000_000L ? value /
1000 : value);
+ return Optional.of(
+ (int) (value >= 1_000_000_000_000L ? value /
1000 : value));
};
} else if (dataType instanceof TimestampType
|| dataType instanceof LocalZonedTimestampType) {
int precision = DataTypeChecks.getPrecision(dataType);
fieldGetter =
- row -> (int) (row.getTimestamp(fieldIndex,
precision).getMillisecond() / 1000);
+ row ->
+ row.isNullAt(fieldIndex)
+ ? Optional.empty()
+ : Optional.of(
+ (int)
+
(row.getTimestamp(fieldIndex, precision)
+
.getMillisecond()
+ / 1000));
} else {
throw new IllegalArgumentException(
String.format(
@@ -107,11 +128,6 @@ public class RecordLevelExpire {
dataType));
}
- return row -> {
- checkArgument(
- !row.isNullAt(fieldIndex),
- "Time field for record-level expire should not be null.");
- return fieldGetter.apply(row);
- };
+ return fieldGetter;
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
index bd13b0ecf8..bce57c2752 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
@@ -38,7 +38,6 @@ import java.time.Duration;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
class RecordLevelExpireTest extends PrimaryKeyTableTestBase {
@@ -102,12 +101,17 @@ class RecordLevelExpireTest extends
PrimaryKeyTableTestBase {
.containsExactlyInAnyOrder(GenericRow.of(currentSecs + 60 *
60));
writeCommit(GenericRow.of(1, 5, null));
+ compact(1);
assertThat(query())
.containsExactlyInAnyOrder(
GenericRow.of(1, 4, currentSecs + 60 * 60),
GenericRow.of(1, 5, null));
- // null time field for record-level expire is not supported yet.
- assertThatThrownBy(() -> compact(1))
- .hasMessageContaining("Time field for record-level expire
should not be null.");
+ writeCommit(GenericRow.of(1, 5, currentSecs + 60 * 60));
+ // compact, merged
+ compact(1);
+ assertThat(query())
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 4, currentSecs + 60 * 60),
+ GenericRow.of(1, 5, currentSecs + 60 * 60));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithAggregationTest.java
similarity index 79%
copy from
paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
copy to
paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithAggregationTest.java
index bd13b0ecf8..02ed8ee3ed 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithAggregationTest.java
@@ -29,6 +29,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.TraceableFileIO;
import org.junit.jupiter.api.BeforeEach;
@@ -38,9 +39,8 @@ import java.time.Duration;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-class RecordLevelExpireTest extends PrimaryKeyTableTestBase {
+class RecordLevelExpireWithAggregationTest extends PrimaryKeyTableTestBase {
@Override
@BeforeEach
@@ -71,20 +71,23 @@ class RecordLevelExpireTest extends PrimaryKeyTableTestBase
{
options.set(CoreOptions.BUCKET, 1);
options.set(CoreOptions.RECORD_LEVEL_EXPIRE_TIME,
Duration.ofSeconds(1));
options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD, "col1");
+ options.set(CoreOptions.MERGE_ENGINE,
CoreOptions.MergeEngine.AGGREGATE);
+ options.set(CoreOptions.FIELDS_DEFAULT_AGG_FUNC,
"last_non_null_value");
+ options.set("fields.col1.ignore-retract", "true");
return options;
}
@Test
public void test() throws Exception {
writeCommit(GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 2));
-
- // can be queried
- assertThat(query())
- .containsExactlyInAnyOrder(GenericRow.of(1, 1, 1),
GenericRow.of(1, 2, 2));
+ // disordered retract message will generate null fields
+ writeCommit(
+ GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 3, 1),
+ GenericRow.ofKind(RowKind.DELETE, 1, 3, 1));
int currentSecs = (int) (System.currentTimeMillis() / 1000);
- writeCommit(GenericRow.of(1, 3, currentSecs));
writeCommit(GenericRow.of(1, 4, currentSecs + 60 * 60));
+
Thread.sleep(2000);
// no compaction, can be queried
@@ -92,22 +95,22 @@ class RecordLevelExpireTest extends PrimaryKeyTableTestBase
{
.containsExactlyInAnyOrder(
GenericRow.of(1, 1, 1),
GenericRow.of(1, 2, 2),
- GenericRow.of(1, 3, currentSecs),
+ GenericRow.of(1, 3, null),
GenericRow.of(1, 4, currentSecs + 60 * 60));
// compact, expired
compact(1);
- assertThat(query()).containsExactlyInAnyOrder(GenericRow.of(1, 4,
currentSecs + 60 * 60));
- assertThat(query(new int[] {2}))
- .containsExactlyInAnyOrder(GenericRow.of(currentSecs + 60 *
60));
-
- writeCommit(GenericRow.of(1, 5, null));
assertThat(query())
.containsExactlyInAnyOrder(
- GenericRow.of(1, 4, currentSecs + 60 * 60),
GenericRow.of(1, 5, null));
+ GenericRow.of(1, 3, null), GenericRow.of(1, 4,
currentSecs + 60 * 60));
- // null time field for record-level expire is not supported yet.
- assertThatThrownBy(() -> compact(1))
- .hasMessageContaining("Time field for record-level expire
should not be null.");
+ writeCommit(GenericRow.of(1, 3, currentSecs + 60 * 60));
+ compact(1);
+
+ // compact, merged
+ assertThat(query())
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 4, currentSecs + 60 * 60),
+ GenericRow.of(1, 3, currentSecs + 60 * 60));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java
index 295058bfbd..f9cfbf0ce3 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java
@@ -38,7 +38,6 @@ import java.time.Duration;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
class RecordLevelExpireWithMillisecondTest extends PrimaryKeyTableTestBase {
@Override
@@ -102,8 +101,10 @@ class RecordLevelExpireWithMillisecondTest extends
PrimaryKeyTableTestBase {
assertThat(query(new int[] {0, 1}))
.containsExactlyInAnyOrder(GenericRow.of(1, 4),
GenericRow.of(1, 5));
- // null time field for record-level expire is not supported yet.
- assertThatThrownBy(() -> compact(1))
- .hasMessageContaining("Time field for record-level expire
should not be null.");
+ writeCommit(GenericRow.of(1, 5, currentSecs + 60 * 60 * 1000));
+ // compact, merged
+ compact(1);
+ assertThat(query(new int[] {0, 1}))
+ .containsExactlyInAnyOrder(GenericRow.of(1, 4),
GenericRow.of(1, 5));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
index f352693759..f9aef643f6 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
@@ -29,7 +29,6 @@ import org.junit.jupiter.api.Test;
import java.time.Duration;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
abstract class RecordLevelExpireWithTimestampBaseTest extends
PrimaryKeyTableTestBase {
@@ -67,8 +66,10 @@ abstract class RecordLevelExpireWithTimestampBaseTest
extends PrimaryKeyTableTes
assertThat(query(new int[] {0, 1}))
.containsExactlyInAnyOrder(GenericRow.of(1, 3),
GenericRow.of(1, 5));
- // null time field for record-level expire is not supported yet.
- assertThatThrownBy(() -> compact(1))
- .hasMessageContaining("Time field for record-level expire
should not be null.");
+ writeCommit(GenericRow.of(1, 5, timestamp3));
+ // compact, merged
+ compact(1);
+ assertThat(query(new int[] {0, 1}))
+ .containsExactlyInAnyOrder(GenericRow.of(1, 3),
GenericRow.of(1, 5));
}
}