This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 992a4065d [core] Support timestamp field type in record level expire 
(#4417)
992a4065d is described below

commit 992a4065d30099f55b787e7d5fffc55fbb1ccd36
Author: askwang <[email protected]>
AuthorDate: Tue Nov 5 16:15:51 2024 +0800

    [core] Support timestamp field type in record level expire (#4417)
---
 .../shortcodes/generated/core_configuration.html   |  2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  6 +-
 .../org/apache/paimon/io/RecordLevelExpire.java    | 67 ++++++++++++++++++----
 .../RecordLevelExpireWithTimestampBaseTest.java    | 66 +++++++++++++++++++++
 .../RecordLevelExpireWithTimestampLTZTest.java     | 58 +++++++++++++++++++
 .../table/RecordLevelExpireWithTimestampTest.java  | 58 +++++++++++++++++++
 6 files changed, 243 insertions(+), 14 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index d9ac0b99b..984373a85 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -633,7 +633,7 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td><h5>record-level.time-field-type</h5></td>
             <td style="word-wrap: break-word;">seconds-int</td>
             <td><p>Enum</p></td>
-            <td>Time field type for record level expire, it can be 
seconds-int,seconds-long or millis-long.<br /><br />Possible 
values:<ul><li>"seconds-int": Timestamps in seconds with INT field 
type.</li><li>"seconds-long": Timestamps in seconds with BIGINT field 
type.</li><li>"millis-long": Timestamps in milliseconds with BIGINT field 
type.</li></ul></td>
+            <td>Time field type for record level expire, it can be 
seconds-int,seconds-long, millis-long or timestamp.<br /><br />Possible 
values:<ul><li>"seconds-int": Timestamps in seconds with INT field 
type.</li><li>"seconds-long": Timestamps in seconds with BIGINT field 
type.</li><li>"millis-long": Timestamps in milliseconds with BIGINT field 
type.</li><li>"timestamp": Timestamp field type.</li></ul></td>
         </tr>
         <tr>
             <td><h5>rowkind.field</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 6ba8d70e0..c20f91e36 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1358,7 +1358,7 @@ public class CoreOptions implements Serializable {
                     .enumType(TimeFieldType.class)
                     .defaultValue(TimeFieldType.SECONDS_INT)
                     .withDescription(
-                            "Time field type for record level expire, it can 
be seconds-int,seconds-long or millis-long.");
+                            "Time field type for record level expire, it can 
be seconds-int,seconds-long, millis-long or timestamp.");
 
     public static final ConfigOption<String> FIELDS_DEFAULT_AGG_FUNC =
             key(FIELDS_PREFIX + "." + DEFAULT_AGG_FUNCTION)
@@ -2926,7 +2926,9 @@ public class CoreOptions implements Serializable {
 
         SECONDS_LONG("seconds-long", "Timestamps in seconds with BIGINT field 
type."),
 
-        MILLIS_LONG("millis-long", "Timestamps in milliseconds with BIGINT 
field type.");
+        MILLIS_LONG("millis-long", "Timestamps in milliseconds with BIGINT 
field type."),
+
+        TIMESTAMP("timestamp", "Timestamp field type.");
 
         private final String value;
         private final String description;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
index 6a5a1f49b..6083ad92a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
@@ -20,11 +20,15 @@ package org.apache.paimon.io;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TimestampType;
 
 import javax.annotation.Nullable;
 
@@ -38,6 +42,7 @@ public class RecordLevelExpire {
     private final int timeFieldIndex;
     private final int expireTime;
     private final CoreOptions.TimeFieldType timeFieldType;
+    private final DataField rawDataField;
 
     @Nullable
     public static RecordLevelExpire create(CoreOptions options, RowType 
rowType) {
@@ -62,27 +67,44 @@ public class RecordLevelExpire {
 
         CoreOptions.TimeFieldType timeFieldType = 
options.recordLevelTimeFieldType();
         DataField field = rowType.getField(timeFieldName);
-        if (!((timeFieldType == CoreOptions.TimeFieldType.SECONDS_INT
-                        && field.type() instanceof IntType)
-                || (timeFieldType == CoreOptions.TimeFieldType.SECONDS_LONG
-                        && field.type() instanceof BigIntType)
-                || (timeFieldType == CoreOptions.TimeFieldType.MILLIS_LONG
-                        && field.type() instanceof BigIntType))) {
+        if (!isValidateFieldType(timeFieldType, field)) {
             throw new IllegalArgumentException(
                     String.format(
-                            "The record level time field type should be one of 
SECONDS_INT,SECONDS_LONG or MILLIS_LONG, "
-                                    + "but time field type is %s, field type 
is %s.",
-                            timeFieldType, field.type()));
+                            "The record level time field type should be one of 
SECONDS_INT, SECONDS_LONG, MILLIS_LONG or TIMESTAMP, "
+                                    + "but time field type is %s, field type 
is %s. You can specify the type through the config '%s'.",
+                            timeFieldType,
+                            field.type(),
+                            CoreOptions.RECORD_LEVEL_TIME_FIELD_TYPE.key()));
         }
 
-        return new RecordLevelExpire(fieldIndex, (int) 
expireTime.getSeconds(), timeFieldType);
+        return new RecordLevelExpire(
+                fieldIndex, (int) expireTime.getSeconds(), timeFieldType, 
field);
+    }
+
+    private static boolean isValidateFieldType(
+            CoreOptions.TimeFieldType timeFieldType, DataField field) {
+        DataType dataType = field.type();
+        return ((timeFieldType == CoreOptions.TimeFieldType.SECONDS_INT
+                        && dataType instanceof IntType)
+                || (timeFieldType == CoreOptions.TimeFieldType.SECONDS_LONG
+                        && dataType instanceof BigIntType)
+                || (timeFieldType == CoreOptions.TimeFieldType.MILLIS_LONG
+                        && dataType instanceof BigIntType)
+                || (timeFieldType == CoreOptions.TimeFieldType.TIMESTAMP
+                        && dataType instanceof TimestampType)
+                || (timeFieldType == CoreOptions.TimeFieldType.TIMESTAMP
+                        && dataType instanceof LocalZonedTimestampType));
     }
 
     private RecordLevelExpire(
-            int timeFieldIndex, int expireTime, CoreOptions.TimeFieldType 
timeFieldType) {
+            int timeFieldIndex,
+            int expireTime,
+            CoreOptions.TimeFieldType timeFieldType,
+            DataField rawDataField) {
         this.timeFieldIndex = timeFieldIndex;
         this.expireTime = expireTime;
         this.timeFieldType = timeFieldType;
+        this.rawDataField = rawDataField;
     }
 
     public FileReaderFactory<KeyValue> wrap(FileReaderFactory<KeyValue> 
readerFactory) {
@@ -107,6 +129,29 @@ public class RecordLevelExpire {
                         case MILLIS_LONG:
                             recordTime = (int) 
(kv.value().getLong(timeFieldIndex) / 1000);
                             break;
+                        case TIMESTAMP:
+                            Timestamp timestamp;
+                            if (rawDataField.type() instanceof TimestampType) {
+                                TimestampType timestampType = (TimestampType) 
rawDataField.type();
+                                timestamp =
+                                        kv.value()
+                                                .getTimestamp(
+                                                        timeFieldIndex,
+                                                        
timestampType.getPrecision());
+                            } else if (rawDataField.type() instanceof 
LocalZonedTimestampType) {
+                                LocalZonedTimestampType timestampType =
+                                        (LocalZonedTimestampType) 
rawDataField.type();
+                                timestamp =
+                                        kv.value()
+                                                .getTimestamp(
+                                                        timeFieldIndex,
+                                                        
timestampType.getPrecision());
+                            } else {
+                                throw new UnsupportedOperationException(
+                                        "Unsupported timestamp type: " + 
rawDataField.type());
+                            }
+                            recordTime = (int) (timestamp.getMillisecond() / 
1000);
+                            break;
                         default:
                             String msg =
                                     String.format(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
new file mode 100644
index 000000000..dcc8d246d
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.options.Options;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+abstract class RecordLevelExpireWithTimestampBaseTest extends 
PrimaryKeyTableTestBase {
+
+    @Override
+    protected Options tableOptions() {
+        Options options = new Options();
+        options.set(CoreOptions.BUCKET, 1);
+        options.set(CoreOptions.RECORD_LEVEL_EXPIRE_TIME, 
Duration.ofSeconds(1));
+        options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD, "col1");
+        options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD_TYPE, 
CoreOptions.TimeFieldType.TIMESTAMP);
+        return options;
+    }
+
+    @Test
+    public void testTimestampTypeExpire() throws Exception {
+        long millis = System.currentTimeMillis();
+        Timestamp timestamp1 = Timestamp.fromEpochMillis(millis - 60 * 1000);
+        Timestamp timestamp2 = Timestamp.fromEpochMillis(millis);
+        Timestamp timestamp3 = Timestamp.fromEpochMillis(millis + 60 * 1000);
+
+        // create at least two files in one bucket
+        writeCommit(GenericRow.of(1, 1, timestamp1), GenericRow.of(1, 2, 
timestamp2));
+        writeCommit(GenericRow.of(1, 3, timestamp3));
+
+        // no compaction, can be queried
+        assertThat(query(new int[] {0, 1}))
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 1), GenericRow.of(1, 2), 
GenericRow.of(1, 3));
+        Thread.sleep(2000);
+
+        // compact, expired
+        compact(1);
+        assertThat(query(new int[] {0, 
1})).containsExactlyInAnyOrder(GenericRow.of(1, 3));
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java
new file mode 100644
index 000000000..af834af27
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.UUID;
+
+class RecordLevelExpireWithTimestampLTZTest extends 
RecordLevelExpireWithTimestampBaseTest {
+
+    @Override
+    @BeforeEach
+    public void beforeEachBase() throws Exception {
+        CatalogContext context =
+                CatalogContext.create(
+                        new Path(TraceableFileIO.SCHEME + "://" + 
tempPath.toString()));
+        Catalog catalog = CatalogFactory.createCatalog(context);
+        Identifier identifier = new Identifier("default", "T");
+        catalog.createDatabase(identifier.getDatabaseName(), true);
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pt", DataTypes.INT())
+                        .column("pk", DataTypes.INT())
+                        .column("col1", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+                        .partitionKeys("pt")
+                        .primaryKey("pk", "pt")
+                        .options(tableOptions().toMap())
+                        .build();
+        catalog.createTable(identifier, schema, true);
+        table = (FileStoreTable) catalog.getTable(identifier);
+        commitUser = UUID.randomUUID().toString();
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java
new file mode 100644
index 000000000..3c4add891
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.UUID;
+
+class RecordLevelExpireWithTimestampTest extends 
RecordLevelExpireWithTimestampBaseTest {
+
+    @Override
+    @BeforeEach
+    public void beforeEachBase() throws Exception {
+        CatalogContext context =
+                CatalogContext.create(
+                        new Path(TraceableFileIO.SCHEME + "://" + 
tempPath.toString()));
+        Catalog catalog = CatalogFactory.createCatalog(context);
+        Identifier identifier = new Identifier("default", "T");
+        catalog.createDatabase(identifier.getDatabaseName(), true);
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pt", DataTypes.INT())
+                        .column("pk", DataTypes.INT())
+                        .column("col1", DataTypes.TIMESTAMP())
+                        .partitionKeys("pt")
+                        .primaryKey("pk", "pt")
+                        .options(tableOptions().toMap())
+                        .build();
+        catalog.createTable(identifier, schema, true);
+        table = (FileStoreTable) catalog.getTable(identifier);
+        commitUser = UUID.randomUUID().toString();
+    }
+}

Reply via email to