This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 992a4065d [core] Support timestamp field type in record level expire
(#4417)
992a4065d is described below
commit 992a4065d30099f55b787e7d5fffc55fbb1ccd36
Author: askwang <[email protected]>
AuthorDate: Tue Nov 5 16:15:51 2024 +0800
[core] Support timestamp field type in record level expire (#4417)
---
.../shortcodes/generated/core_configuration.html | 2 +-
.../main/java/org/apache/paimon/CoreOptions.java | 6 +-
.../org/apache/paimon/io/RecordLevelExpire.java | 67 ++++++++++++++++++----
.../RecordLevelExpireWithTimestampBaseTest.java | 66 +++++++++++++++++++++
.../RecordLevelExpireWithTimestampLTZTest.java | 58 +++++++++++++++++++
.../table/RecordLevelExpireWithTimestampTest.java | 58 +++++++++++++++++++
6 files changed, 243 insertions(+), 14 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index d9ac0b99b..984373a85 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -633,7 +633,7 @@ This config option does not affect the default filesystem
metastore.</td>
<td><h5>record-level.time-field-type</h5></td>
<td style="word-wrap: break-word;">seconds-int</td>
<td><p>Enum</p></td>
- <td>Time field type for record level expire, it can be
seconds-int,seconds-long or millis-long.<br /><br />Possible
values:<ul><li>"seconds-int": Timestamps in seconds with INT field
type.</li><li>"seconds-long": Timestamps in seconds with BIGINT field
type.</li><li>"millis-long": Timestamps in milliseconds with BIGINT field
type.</li></ul></td>
+ <td>Time field type for record level expire, it can be
seconds-int,seconds-long, millis-long or timestamp.<br /><br />Possible
values:<ul><li>"seconds-int": Timestamps in seconds with INT field
type.</li><li>"seconds-long": Timestamps in seconds with BIGINT field
type.</li><li>"millis-long": Timestamps in milliseconds with BIGINT field
type.</li><li>"timestamp": Timestamp field type.</li></ul></td>
</tr>
<tr>
<td><h5>rowkind.field</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 6ba8d70e0..c20f91e36 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1358,7 +1358,7 @@ public class CoreOptions implements Serializable {
.enumType(TimeFieldType.class)
.defaultValue(TimeFieldType.SECONDS_INT)
.withDescription(
- "Time field type for record level expire, it can
be seconds-int,seconds-long or millis-long.");
+ "Time field type for record level expire, it can
be seconds-int,seconds-long, millis-long or timestamp.");
public static final ConfigOption<String> FIELDS_DEFAULT_AGG_FUNC =
key(FIELDS_PREFIX + "." + DEFAULT_AGG_FUNCTION)
@@ -2926,7 +2926,9 @@ public class CoreOptions implements Serializable {
SECONDS_LONG("seconds-long", "Timestamps in seconds with BIGINT field
type."),
- MILLIS_LONG("millis-long", "Timestamps in milliseconds with BIGINT
field type.");
+ MILLIS_LONG("millis-long", "Timestamps in milliseconds with BIGINT
field type."),
+
+ TIMESTAMP("timestamp", "Timestamp field type.");
private final String value;
private final String description;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
index 6a5a1f49b..6083ad92a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
@@ -20,11 +20,15 @@ package org.apache.paimon.io;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.Timestamp;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TimestampType;
import javax.annotation.Nullable;
@@ -38,6 +42,7 @@ public class RecordLevelExpire {
private final int timeFieldIndex;
private final int expireTime;
private final CoreOptions.TimeFieldType timeFieldType;
+ private final DataField rawDataField;
@Nullable
public static RecordLevelExpire create(CoreOptions options, RowType
rowType) {
@@ -62,27 +67,44 @@ public class RecordLevelExpire {
CoreOptions.TimeFieldType timeFieldType =
options.recordLevelTimeFieldType();
DataField field = rowType.getField(timeFieldName);
- if (!((timeFieldType == CoreOptions.TimeFieldType.SECONDS_INT
- && field.type() instanceof IntType)
- || (timeFieldType == CoreOptions.TimeFieldType.SECONDS_LONG
- && field.type() instanceof BigIntType)
- || (timeFieldType == CoreOptions.TimeFieldType.MILLIS_LONG
- && field.type() instanceof BigIntType))) {
+ if (!isValidateFieldType(timeFieldType, field)) {
throw new IllegalArgumentException(
String.format(
- "The record level time field type should be one of
SECONDS_INT,SECONDS_LONG or MILLIS_LONG, "
- + "but time field type is %s, field type
is %s.",
- timeFieldType, field.type()));
+ "The record level time field type should be one of
SECONDS_INT, SECONDS_LONG, MILLIS_LONG or TIMESTAMP, "
+ + "but time field type is %s, field type
is %s. You can specify the type through the config '%s'.",
+ timeFieldType,
+ field.type(),
+ CoreOptions.RECORD_LEVEL_TIME_FIELD_TYPE.key()));
}
- return new RecordLevelExpire(fieldIndex, (int)
expireTime.getSeconds(), timeFieldType);
+ return new RecordLevelExpire(
+ fieldIndex, (int) expireTime.getSeconds(), timeFieldType,
field);
+ }
+
+ private static boolean isValidateFieldType(
+ CoreOptions.TimeFieldType timeFieldType, DataField field) {
+ DataType dataType = field.type();
+ return ((timeFieldType == CoreOptions.TimeFieldType.SECONDS_INT
+ && dataType instanceof IntType)
+ || (timeFieldType == CoreOptions.TimeFieldType.SECONDS_LONG
+ && dataType instanceof BigIntType)
+ || (timeFieldType == CoreOptions.TimeFieldType.MILLIS_LONG
+ && dataType instanceof BigIntType)
+ || (timeFieldType == CoreOptions.TimeFieldType.TIMESTAMP
+ && dataType instanceof TimestampType)
+ || (timeFieldType == CoreOptions.TimeFieldType.TIMESTAMP
+ && dataType instanceof LocalZonedTimestampType));
}
private RecordLevelExpire(
- int timeFieldIndex, int expireTime, CoreOptions.TimeFieldType
timeFieldType) {
+ int timeFieldIndex,
+ int expireTime,
+ CoreOptions.TimeFieldType timeFieldType,
+ DataField rawDataField) {
this.timeFieldIndex = timeFieldIndex;
this.expireTime = expireTime;
this.timeFieldType = timeFieldType;
+ this.rawDataField = rawDataField;
}
public FileReaderFactory<KeyValue> wrap(FileReaderFactory<KeyValue>
readerFactory) {
@@ -107,6 +129,29 @@ public class RecordLevelExpire {
case MILLIS_LONG:
recordTime = (int)
(kv.value().getLong(timeFieldIndex) / 1000);
break;
+ case TIMESTAMP:
+ Timestamp timestamp;
+ if (rawDataField.type() instanceof TimestampType) {
+ TimestampType timestampType = (TimestampType)
rawDataField.type();
+ timestamp =
+ kv.value()
+ .getTimestamp(
+ timeFieldIndex,
+
timestampType.getPrecision());
+ } else if (rawDataField.type() instanceof
LocalZonedTimestampType) {
+ LocalZonedTimestampType timestampType =
+ (LocalZonedTimestampType)
rawDataField.type();
+ timestamp =
+ kv.value()
+ .getTimestamp(
+ timeFieldIndex,
+
timestampType.getPrecision());
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported timestamp type: " +
rawDataField.type());
+ }
+ recordTime = (int) (timestamp.getMillisecond() /
1000);
+ break;
default:
String msg =
String.format(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
new file mode 100644
index 000000000..dcc8d246d
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.options.Options;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+abstract class RecordLevelExpireWithTimestampBaseTest extends
PrimaryKeyTableTestBase {
+
+ @Override
+ protected Options tableOptions() {
+ Options options = new Options();
+ options.set(CoreOptions.BUCKET, 1);
+ options.set(CoreOptions.RECORD_LEVEL_EXPIRE_TIME,
Duration.ofSeconds(1));
+ options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD, "col1");
+ options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD_TYPE,
CoreOptions.TimeFieldType.TIMESTAMP);
+ return options;
+ }
+
+ @Test
+ public void testTimestampTypeExpire() throws Exception {
+ long millis = System.currentTimeMillis();
+ Timestamp timestamp1 = Timestamp.fromEpochMillis(millis - 60 * 1000);
+ Timestamp timestamp2 = Timestamp.fromEpochMillis(millis);
+ Timestamp timestamp3 = Timestamp.fromEpochMillis(millis + 60 * 1000);
+
+ // create at least two files in one bucket
+ writeCommit(GenericRow.of(1, 1, timestamp1), GenericRow.of(1, 2,
timestamp2));
+ writeCommit(GenericRow.of(1, 3, timestamp3));
+
+ // no compaction, can be queried
+ assertThat(query(new int[] {0, 1}))
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 1), GenericRow.of(1, 2),
GenericRow.of(1, 3));
+ Thread.sleep(2000);
+
+ // compact, expired
+ compact(1);
+ assertThat(query(new int[] {0,
1})).containsExactlyInAnyOrder(GenericRow.of(1, 3));
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java
new file mode 100644
index 000000000..af834af27
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.UUID;
+
+class RecordLevelExpireWithTimestampLTZTest extends
RecordLevelExpireWithTimestampBaseTest {
+
+ @Override
+ @BeforeEach
+ public void beforeEachBase() throws Exception {
+ CatalogContext context =
+ CatalogContext.create(
+ new Path(TraceableFileIO.SCHEME + "://" +
tempPath.toString()));
+ Catalog catalog = CatalogFactory.createCatalog(context);
+ Identifier identifier = new Identifier("default", "T");
+ catalog.createDatabase(identifier.getDatabaseName(), true);
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("pk", DataTypes.INT())
+ .column("col1",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .options(tableOptions().toMap())
+ .build();
+ catalog.createTable(identifier, schema, true);
+ table = (FileStoreTable) catalog.getTable(identifier);
+ commitUser = UUID.randomUUID().toString();
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java
new file mode 100644
index 000000000..3c4add891
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.UUID;
+
+class RecordLevelExpireWithTimestampTest extends
RecordLevelExpireWithTimestampBaseTest {
+
+ @Override
+ @BeforeEach
+ public void beforeEachBase() throws Exception {
+ CatalogContext context =
+ CatalogContext.create(
+ new Path(TraceableFileIO.SCHEME + "://" +
tempPath.toString()));
+ Catalog catalog = CatalogFactory.createCatalog(context);
+ Identifier identifier = new Identifier("default", "T");
+ catalog.createDatabase(identifier.getDatabaseName(), true);
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("pk", DataTypes.INT())
+ .column("col1", DataTypes.TIMESTAMP())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .options(tableOptions().toMap())
+ .build();
+ catalog.createTable(identifier, schema, true);
+ table = (FileStoreTable) catalog.getTable(identifier);
+ commitUser = UUID.randomUUID().toString();
+ }
+}