This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b4601f35a [core] The record level expired time field type is
automatically recognized. (#4458)
b4601f35a is described below
commit b4601f35a1c458e9ccac6e7d66b5580e4dfec9a4
Author: Kerwin <[email protected]>
AuthorDate: Wed Nov 6 13:36:07 2024 +0800
[core] The record level expired time field type is automatically
recognized. (#4458)
---
.../generated/catalog_configuration.html | 12 +-
.../shortcodes/generated/core_configuration.html | 8 +-
.../generated/flink_connector_configuration.html | 24 ++--
.../main/java/org/apache/paimon/CoreOptions.java | 42 +------
.../paimon/fileindex/FileIndexPredicate.java | 4 +-
.../org/apache/paimon/io/RecordLevelExpire.java | 133 +++++++--------------
.../RecordLevelExpireWithMillisecondTest.java | 2 -
.../RecordLevelExpireWithTimestampBaseTest.java | 1 -
8 files changed, 64 insertions(+), 162 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index c7688c3b6..8954e898f 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -44,12 +44,6 @@ under the License.
<td>Duration</td>
<td>Controls the duration for which databases and tables in the
catalog are cached.</td>
</tr>
- <tr>
- <td><h5>cache.partition.max-num</h5></td>
- <td style="word-wrap: break-word;">0</td>
- <td>Long</td>
- <td>Controls the max number for which partitions in the catalog
are cached.</td>
- </tr>
<tr>
<td><h5>cache.manifest.max-memory</h5></td>
<td style="word-wrap: break-word;">(none)</td>
@@ -68,6 +62,12 @@ under the License.
<td>MemorySize</td>
<td>Controls the threshold of small manifest file.</td>
</tr>
+ <tr>
+ <td><h5>cache.partition.max-num</h5></td>
+ <td style="word-wrap: break-word;">0</td>
+ <td>Long</td>
+ <td>Controls the max number for which partitions in the catalog
are cached.</td>
+ </tr>
<tr>
<td><h5>client-pool-size</h5></td>
<td style="word-wrap: break-word;">2</td>
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 984373a85..6556867ac 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -627,13 +627,7 @@ This config option does not affect the default filesystem
metastore.</td>
<td><h5>record-level.time-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
- <td>Time field for record level expire.</td>
- </tr>
- <tr>
- <td><h5>record-level.time-field-type</h5></td>
- <td style="word-wrap: break-word;">seconds-int</td>
- <td><p>Enum</p></td>
- <td>Time field type for record level expire, it can be
seconds-int,seconds-long, millis-long or timestamp.<br /><br />Possible
values:<ul><li>"seconds-int": Timestamps in seconds with INT field
type.</li><li>"seconds-long": Timestamps in seconds with BIGINT field
type.</li><li>"millis-long": Timestamps in milliseconds with BIGINT field
type.</li><li>"timestamp": Timestamp field type.</li></ul></td>
+ <td>Time field for record level expire. It supports the following
types: `timestamps in seconds with INT`,`timestamps in seconds with BIGINT`,
`timestamps in milliseconds with BIGINT` or `timestamp`.</td>
</tr>
<tr>
<td><h5>rowkind.field</h5></td>
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 9da703f44..aa4b46e2d 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -230,6 +230,12 @@ under the License.
<td>MemorySize</td>
<td>Weight of writer buffer in managed memory, Flink will compute
the memory size for writer according to the weight, the actual memory used
depends on the running environment.</td>
</tr>
+ <tr>
+ <td><h5>sink.operator-uid.suffix</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Set the uid suffix for the writer, dynamic bucket assigner and
committer operators. The uid format is
${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set,
flink will automatically generate the operator uid, which may be incompatible
when the topology changes.</td>
+ </tr>
<tr>
<td><h5>sink.parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
@@ -248,18 +254,6 @@ under the License.
<td>Boolean</td>
<td>If true, flink sink will use managed memory for merge tree;
otherwise, it will create an independent memory allocator.</td>
</tr>
- <tr>
- <td><h5>sink.operator-uid.suffix</h5></td>
- <td style="word-wrap: break-word;">(none)</td>
- <td>String</td>
- <td>Set the uid suffix for the writer, dynamic bucket assigner and
committer operators. The uid format is
${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set,
flink will automatically generate the operator uid, which may be incompatible
when the topology changes.</td>
- </tr>
- <tr>
- <td><h5>source.operator-uid.suffix</h5></td>
- <td style="word-wrap: break-word;">(none)</td>
- <td>String</td>
- <td>Set the uid suffix for the source operators. After setting,
the uid format is ${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid
suffix is not set, flink will automatically generate the operator uid, which
may be incompatible when the topology changes.</td>
- </tr>
<tr>
<td><h5>source.checkpoint-align.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
@@ -272,6 +266,12 @@ under the License.
<td>Duration</td>
<td>If the new snapshot has not been generated when the checkpoint
starts to trigger, the enumerator will block the checkpoint and wait for the
new snapshot. Set the maximum waiting time to avoid infinite waiting, if
timeout, the checkpoint will fail. Note that it should be set smaller than the
checkpoint timeout.</td>
</tr>
+ <tr>
+ <td><h5>source.operator-uid.suffix</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Set the uid suffix for the source operators. After setting,
the uid format is ${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid
suffix is not set, flink will automatically generate the operator uid, which
may be incompatible when the topology changes.</td>
+ </tr>
<tr>
<td><h5>streaming-read.shuffle-bucket-with-partition</h5></td>
<td style="word-wrap: break-word;">true</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index c20f91e36..c69f0aae0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1351,14 +1351,8 @@ public class CoreOptions implements Serializable {
key("record-level.time-field")
.stringType()
.noDefaultValue()
- .withDescription("Time field for record level expire.");
-
- public static final ConfigOption<TimeFieldType>
RECORD_LEVEL_TIME_FIELD_TYPE =
- key("record-level.time-field-type")
- .enumType(TimeFieldType.class)
- .defaultValue(TimeFieldType.SECONDS_INT)
.withDescription(
- "Time field type for record level expire, it can
be seconds-int,seconds-long, millis-long or timestamp.");
+ "Time field for record level expire. It supports
the following types: `timestamps in seconds with INT`,`timestamps in seconds
with BIGINT`, `timestamps in milliseconds with BIGINT` or `timestamp`.");
public static final ConfigOption<String> FIELDS_DEFAULT_AGG_FUNC =
key(FIELDS_PREFIX + "." + DEFAULT_AGG_FUNCTION)
@@ -2267,11 +2261,6 @@ public class CoreOptions implements Serializable {
return options.get(RECORD_LEVEL_TIME_FIELD);
}
- @Nullable
- public TimeFieldType recordLevelTimeFieldType() {
- return options.get(RECORD_LEVEL_TIME_FIELD_TYPE);
- }
-
public boolean prepareCommitWaitCompaction() {
if (!needLookup()) {
return false;
@@ -2920,35 +2909,6 @@ public class CoreOptions implements Serializable {
}
}
- /** Time field type for record level expire. */
- public enum TimeFieldType implements DescribedEnum {
- SECONDS_INT("seconds-int", "Timestamps in seconds with INT field
type."),
-
- SECONDS_LONG("seconds-long", "Timestamps in seconds with BIGINT field
type."),
-
- MILLIS_LONG("millis-long", "Timestamps in milliseconds with BIGINT
field type."),
-
- TIMESTAMP("timestamp", "Timestamp field type.");
-
- private final String value;
- private final String description;
-
- TimeFieldType(String value, String description) {
- this.value = value;
- this.description = description;
- }
-
- @Override
- public String toString() {
- return value;
- }
-
- @Override
- public InlineElement getDescription() {
- return text(description);
- }
- }
-
/** The time unit of materialized table freshness. */
public enum MaterializedTableIntervalFreshnessTimeUnit {
SECOND,
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
index 1d19dfbb9..2eb3f2195 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
@@ -72,10 +72,10 @@ public class FileIndexPredicate implements Closeable {
return true;
}
- Set<String> requredFieldNames = getRequiredNames(filePredicate);
+ Set<String> requiredFieldNames = getRequiredNames(filePredicate);
Map<String, Collection<FileIndexReader>> indexReaders = new
HashMap<>();
- requredFieldNames.forEach(name -> indexReaders.put(name,
reader.readColumnIndex(name)));
+ requiredFieldNames.forEach(name -> indexReaders.put(name,
reader.readColumnIndex(name)));
if (!new
FileIndexPredicateTest(indexReaders).test(filePredicate).remain()) {
LOG.debug(
"One file has been filtered: "
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
index 6083ad92a..e43a9d03d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
@@ -20,11 +20,11 @@ package org.apache.paimon.io;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
-import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.BigIntType;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.RowType;
@@ -33,16 +33,15 @@ import org.apache.paimon.types.TimestampType;
import javax.annotation.Nullable;
import java.time.Duration;
+import java.util.function.Function;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** A factory to create {@link RecordReader} expires records by time. */
public class RecordLevelExpire {
- private final int timeFieldIndex;
private final int expireTime;
- private final CoreOptions.TimeFieldType timeFieldType;
- private final DataField rawDataField;
+ private final Function<InternalRow, Integer> fieldGetter;
@Nullable
public static RecordLevelExpire create(CoreOptions options, RowType
rowType) {
@@ -65,46 +64,14 @@ public class RecordLevelExpire {
"Can not find time field %s for record level
expire.", timeFieldName));
}
- CoreOptions.TimeFieldType timeFieldType =
options.recordLevelTimeFieldType();
- DataField field = rowType.getField(timeFieldName);
- if (!isValidateFieldType(timeFieldType, field)) {
- throw new IllegalArgumentException(
- String.format(
- "The record level time field type should be one of
SECONDS_INT, SECONDS_LONG, MILLIS_LONG or TIMESTAMP, "
- + "but time field type is %s, field type
is %s. You can specify the type through the config '%s'.",
- timeFieldType,
- field.type(),
- CoreOptions.RECORD_LEVEL_TIME_FIELD_TYPE.key()));
- }
-
- return new RecordLevelExpire(
- fieldIndex, (int) expireTime.getSeconds(), timeFieldType,
field);
- }
-
- private static boolean isValidateFieldType(
- CoreOptions.TimeFieldType timeFieldType, DataField field) {
- DataType dataType = field.type();
- return ((timeFieldType == CoreOptions.TimeFieldType.SECONDS_INT
- && dataType instanceof IntType)
- || (timeFieldType == CoreOptions.TimeFieldType.SECONDS_LONG
- && dataType instanceof BigIntType)
- || (timeFieldType == CoreOptions.TimeFieldType.MILLIS_LONG
- && dataType instanceof BigIntType)
- || (timeFieldType == CoreOptions.TimeFieldType.TIMESTAMP
- && dataType instanceof TimestampType)
- || (timeFieldType == CoreOptions.TimeFieldType.TIMESTAMP
- && dataType instanceof LocalZonedTimestampType));
+ DataType dataType = rowType.getField(timeFieldName).type();
+ Function<InternalRow, Integer> fieldGetter =
createFieldGetter(dataType, fieldIndex);
+ return new RecordLevelExpire((int) expireTime.getSeconds(),
fieldGetter);
}
- private RecordLevelExpire(
- int timeFieldIndex,
- int expireTime,
- CoreOptions.TimeFieldType timeFieldType,
- DataField rawDataField) {
- this.timeFieldIndex = timeFieldIndex;
+ private RecordLevelExpire(int expireTime, Function<InternalRow, Integer>
fieldGetter) {
this.expireTime = expireTime;
- this.timeFieldType = timeFieldType;
- this.rawDataField = rawDataField;
+ this.fieldGetter = fieldGetter;
}
public FileReaderFactory<KeyValue> wrap(FileReaderFactory<KeyValue>
readerFactory) {
@@ -113,54 +80,38 @@ public class RecordLevelExpire {
private RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
int currentTime = (int) (System.currentTimeMillis() / 1000);
- return reader.filter(
- kv -> {
- checkArgument(
- !kv.value().isNullAt(timeFieldIndex),
- "Time field for record-level expire should not be
null.");
- final int recordTime;
- switch (timeFieldType) {
- case SECONDS_INT:
- recordTime = kv.value().getInt(timeFieldIndex);
- break;
- case SECONDS_LONG:
- recordTime = (int)
kv.value().getLong(timeFieldIndex);
- break;
- case MILLIS_LONG:
- recordTime = (int)
(kv.value().getLong(timeFieldIndex) / 1000);
- break;
- case TIMESTAMP:
- Timestamp timestamp;
- if (rawDataField.type() instanceof TimestampType) {
- TimestampType timestampType = (TimestampType)
rawDataField.type();
- timestamp =
- kv.value()
- .getTimestamp(
- timeFieldIndex,
-
timestampType.getPrecision());
- } else if (rawDataField.type() instanceof
LocalZonedTimestampType) {
- LocalZonedTimestampType timestampType =
- (LocalZonedTimestampType)
rawDataField.type();
- timestamp =
- kv.value()
- .getTimestamp(
- timeFieldIndex,
-
timestampType.getPrecision());
- } else {
- throw new UnsupportedOperationException(
- "Unsupported timestamp type: " +
rawDataField.type());
- }
- recordTime = (int) (timestamp.getMillisecond() /
1000);
- break;
- default:
- String msg =
- String.format(
- "type %s not support in %s",
- timeFieldType,
-
CoreOptions.TimeFieldType.class.getName());
- throw new IllegalArgumentException(msg);
- }
- return currentTime <= recordTime + expireTime;
- });
+ return reader.filter(kv -> currentTime <=
fieldGetter.apply(kv.value()) + expireTime);
+ }
+
+ private static Function<InternalRow, Integer> createFieldGetter(
+ DataType dataType, int fieldIndex) {
+ final Function<InternalRow, Integer> fieldGetter;
+ if (dataType instanceof IntType) {
+ fieldGetter = row -> row.getInt(fieldIndex);
+ } else if (dataType instanceof BigIntType) {
+ fieldGetter =
+ row -> {
+ long value = row.getLong(fieldIndex);
+ // If it is milliseconds, convert it to seconds.
+ return (int) (value >= 1_000_000_000_000L ? value /
1000 : value);
+ };
+ } else if (dataType instanceof TimestampType
+ || dataType instanceof LocalZonedTimestampType) {
+ int precision = DataTypeChecks.getPrecision(dataType);
+ fieldGetter =
+ row -> (int) (row.getTimestamp(fieldIndex,
precision).getMillisecond() / 1000);
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "The record level time field type should be one of
INT, BIGINT, or TIMESTAMP, but field type is %s.",
+ dataType));
+ }
+
+ return row -> {
+ checkArgument(
+ !row.isNullAt(fieldIndex),
+ "Time field for record-level expire should not be null.");
+ return fieldGetter.apply(row);
+ };
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java
index 14ec6885c..15b6556c3 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java
@@ -69,8 +69,6 @@ class RecordLevelExpireWithMillisecondTest extends
PrimaryKeyTableTestBase {
options.set(CoreOptions.BUCKET, 1);
options.set(CoreOptions.RECORD_LEVEL_EXPIRE_TIME,
Duration.ofSeconds(1));
options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD, "col1");
- options.set(
- CoreOptions.RECORD_LEVEL_TIME_FIELD_TYPE,
CoreOptions.TimeFieldType.MILLIS_LONG);
return options;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
index dcc8d246d..abcb8c1c7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
@@ -38,7 +38,6 @@ abstract class RecordLevelExpireWithTimestampBaseTest extends
PrimaryKeyTableTes
options.set(CoreOptions.BUCKET, 1);
options.set(CoreOptions.RECORD_LEVEL_EXPIRE_TIME,
Duration.ofSeconds(1));
options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD, "col1");
- options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD_TYPE,
CoreOptions.TimeFieldType.TIMESTAMP);
return options;
}