This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f2252d7c7c [core] add SuccessFileTagCallback for tag creation (#4847)
f2252d7c7c is described below
commit f2252d7c7c1a7387c9fd3934f247be413387de9c
Author: xiangyu0xf <[email protected]>
AuthorDate: Tue Jan 7 22:49:03 2025 +0800
[core] add SuccessFileTagCallback for tag creation (#4847)
---
.../shortcodes/generated/core_configuration.html | 42 +++++----
.../main/java/org/apache/paimon/CoreOptions.java | 10 ++
.../java/org/apache/paimon/AbstractFileStore.java | 4 +
.../apache/paimon/tag/SuccessFileTagCallback.java | 78 +++++++++++++++
.../paimon/tag/SuccessFileTagCallBackTest.java | 105 +++++++++++++++++++++
5 files changed, 221 insertions(+), 18 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 003d5ba4c2..7fdb43b8b0 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -218,6 +218,24 @@ under the License.
<td>Duration</td>
<td>The TTL in rocksdb index for cross partition upsert (primary
keys not contain all partition fields), this can avoid maintaining too many
indexes and lead to worse and worse performance, but please note that this may
also cause data duplication.</td>
</tr>
+ <tr>
+ <td><h5>data-file.external-paths</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The external paths where the data of this table will be
written, multiple elements separated by commas.</td>
+ </tr>
+ <tr>
+ <td><h5>data-file.external-paths.specific-fs</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The specific file system of the external path when
data-file.external-paths.strategy is set to specific-fs, should be the prefix
scheme of the external path, now supported are s3 and oss.</td>
+ </tr>
+ <tr>
+ <td><h5>data-file.external-paths.strategy</h5></td>
+ <td style="word-wrap: break-word;">none</td>
+ <td><p>Enum</p></td>
+ <td>The strategy of selecting an external path when writing
data.<br /><br />Possible values:<ul><li>"none": Do not choose any external
storage, data will still be written to the default warehouse
path.</li><li>"specific-fs": Select a specific file system as the external
path. Currently supported are S3 and OSS.</li><li>"round-robin": When writing a
new file, a path is chosen from data-file.external-paths in turn.</li></ul></td>
+ </tr>
<tr>
<td><h5>data-file.path-directory</h5></td>
<td style="word-wrap: break-word;">(none)</td>
@@ -930,6 +948,12 @@ If the data size allocated for the sorting task is
uneven,which may lead to perf
<td>String</td>
<td>A list of commit callback classes to be called after a
successful tag. Class names are connected with comma (example:
com.test.CallbackA,com.sample.CallbackB).</td>
</tr>
+ <tr>
+ <td><h5>tag.create-success-file</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to create tag success file for new created tags.</td>
+ </tr>
<tr>
<td><h5>tag.creation-delay</h5></td>
<td style="word-wrap: break-word;">0 ms</td>
@@ -1026,23 +1050,5 @@ If the data size allocated for the sorting task is
uneven,which may lead to perf
<td>Integer</td>
<td>The bytes of types (CHAR, VARCHAR, BINARY, VARBINARY) devote
to the zorder sort.</td>
</tr>
- <tr>
- <td><h5>data-file.external-paths</h5></td>
- <td style="word-wrap: break-word;">(none)</td>
- <td>String</td>
- <td>The external paths where the data of this table will be
written, multiple elements separated by commas.</td>
- </tr>
- <tr>
- <td><h5>data-file.external-paths.strategy</h5></td>
- <td style="word-wrap: break-word;">none</td>
- <td><p>Enum</p></td>
- <td>The strategy of selecting an external path when writing
data.<br /><br />Possible values:<ul><li>"none": Do not choose any external
storage, data will still be written to the default warehouse
path.</li><li>"specific-fs": Select a specific file system as the external
path. Currently supported are S3 and OSS.</li><li>"round-robin": When writing a
new file, a path is chosen from data-file.external-paths in turn.</li></ul></td>
- </tr>
- <tr>
- <td><h5>data-file.external-paths.specific-fs</h5></td>
- <td style="word-wrap: break-word;">(none)</td>
- <td>String</td>
- <td>The specific file system of the external path when
data-file.external-paths.strategy is set to specific-fs, should be the prefix
scheme of the external path, now supported are s3 and oss.</td>
- </tr>
</tbody>
</table>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 0beea0a8f2..3d6b0b5490 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1267,6 +1267,12 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether to create tag automatically. And how to
generate tags.");
+ public static final ConfigOption<Boolean> TAG_CREATE_SUCCESS_FILE =
+ key("tag.create-success-file")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to create tag success file for
new created tags.");
+
public static final ConfigOption<TagCreationPeriod> TAG_CREATION_PERIOD =
key("tag.creation-period")
.enumType(TagCreationPeriod.class)
@@ -2285,6 +2291,10 @@ public class CoreOptions implements Serializable {
return options.get(METASTORE_TAG_TO_PARTITION);
}
+ public boolean tagCreateSuccessFile() {
+ return options.get(TAG_CREATE_SUCCESS_FILE);
+ }
+
public TagCreationMode tagToPartitionPreview() {
return options.get(METASTORE_TAG_TO_PARTITION_PREVIEW);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 8b24ec2a54..953096c0b5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -48,6 +48,7 @@ import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.sink.CallbackUtils;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.TagCallback;
+import org.apache.paimon.tag.SuccessFileTagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
@@ -378,6 +379,9 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
callbacks.add(
new
AddPartitionTagCallback(metastoreClientFactory.create(), partitionField));
}
+ if (options.tagCreateSuccessFile()) {
+ callbacks.add(new SuccessFileTagCallback(fileIO,
newTagManager().tagDirectory()));
+ }
return callbacks;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/tag/SuccessFileTagCallback.java
b/paimon-core/src/main/java/org/apache/paimon/tag/SuccessFileTagCallback.java
new file mode 100644
index 0000000000..d1daa48ce8
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/tag/SuccessFileTagCallback.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.partition.file.SuccessFile;
+import org.apache.paimon.table.sink.TagCallback;
+
+import java.io.IOException;
+
+/** A {@link TagCallback} which create "{tagName}_SUCCESS" file. */
+public class SuccessFileTagCallback implements TagCallback {
+ private static final String SUCCESS_FILE_SUFFIX = "_SUCCESS";
+ private static final String SUCCESS_FILE_DIRECTORY = "tag-success-file";
+
+ private final FileIO fileIO;
+ private final Path successFileDirectory;
+
+ public SuccessFileTagCallback(FileIO fileIO, Path tagDirectory) {
+ this.fileIO = fileIO;
+ this.successFileDirectory = new Path(tagDirectory,
SUCCESS_FILE_DIRECTORY);
+ try {
+ fileIO.checkOrMkdirs(successFileDirectory);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void notifyCreation(String tagName) {
+ Path tagSuccessPath = tagSuccessFilePath(tagName);
+ long currentTime = System.currentTimeMillis();
+ SuccessFile successFile = new SuccessFile(currentTime, currentTime);
+
+ try {
+ if (fileIO.exists(tagSuccessPath)) {
+ successFile =
+ SuccessFile.fromPath(fileIO, tagSuccessPath)
+ .updateModificationTime(currentTime);
+ }
+ fileIO.overwriteFileUtf8(tagSuccessPath, successFile.toJson());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void notifyDeletion(String tagName) {
+ Path tagSuccessPath = tagSuccessFilePath(tagName);
+ fileIO.deleteQuietly(tagSuccessPath);
+ }
+
+ @VisibleForTesting
+ public Path tagSuccessFilePath(String tagName) {
+ return new Path(successFileDirectory, tagName + SUCCESS_FILE_SUFFIX);
+ }
+
+ @Override
+ public void close() throws Exception {}
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/tag/SuccessFileTagCallBackTest.java
b/paimon-core/src/test/java/org/apache/paimon/tag/SuccessFileTagCallBackTest.java
new file mode 100644
index 0000000000..34dd118818
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/tag/SuccessFileTagCallBackTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.FileStore;
+import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TagCallback;
+import org.apache.paimon.utils.TagManager;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.paimon.CoreOptions.TAG_AUTOMATIC_CREATION;
+import static org.apache.paimon.CoreOptions.TAG_CREATION_PERIOD;
+import static org.apache.paimon.CoreOptions.TAG_NUM_RETAINED_MAX;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link SuccessFileTagCallback}. */
+public class SuccessFileTagCallBackTest extends PrimaryKeyTableTestBase {
+
+ @Test
+ public void testWithSuccessFile() throws IOException {
+ Options options = new Options();
+ options.set(CoreOptions.TAG_CREATE_SUCCESS_FILE, true);
+ options.set(TAG_AUTOMATIC_CREATION,
CoreOptions.TagCreationMode.WATERMARK);
+ options.set(TAG_CREATION_PERIOD, CoreOptions.TagCreationPeriod.HOURLY);
+ options.set(TAG_NUM_RETAINED_MAX, 3);
+ FileStoreTable table = this.table.copy(options.toMap());
+ FileIO fileIO = table.fileIO();
+ TableCommitImpl commit =
table.newCommit(commitUser).ignoreEmptyCommit(false);
+ FileStore<?> fileStore = table.store();
+ TagManager tagManager = fileStore.newTagManager();
+ SuccessFileTagCallback successFileTagCallback = null;
+
+ List<TagCallback> tagCallbacks = fileStore.createTagCallbacks();
+
assertThat(tagCallbacks).hasAtLeastOneElementOfType(SuccessFileTagCallback.class);
+
+ for (TagCallback tagCallback : tagCallbacks) {
+ if (tagCallback instanceof SuccessFileTagCallback) {
+ successFileTagCallback = (SuccessFileTagCallback) tagCallback;
+ }
+ }
+
+ commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T12:12:00")));
+ assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11");
+
assertThat(fileIO.exists(successFileTagCallback.tagSuccessFilePath("2023-07-18
11")))
+ .isTrue();
+
+ table.deleteTag("2023-07-18 11");
+ assertThat(tagManager.get("2023-07-18 11")).isEmpty();
+
assertThat(fileIO.exists(successFileTagCallback.tagSuccessFilePath("2023-07-18
11")))
+ .isFalse();
+ }
+
+ @Test
+ public void testWithoutSuccessFile() throws IOException {
+ Options options = new Options();
+ options.set(CoreOptions.TAG_CREATE_SUCCESS_FILE, false);
+ options.set(TAG_AUTOMATIC_CREATION,
CoreOptions.TagCreationMode.WATERMARK);
+ options.set(TAG_CREATION_PERIOD, CoreOptions.TagCreationPeriod.HOURLY);
+ options.set(TAG_NUM_RETAINED_MAX, 3);
+ FileStoreTable table = this.table.copy(options.toMap());
+ FileIO fileIO = table.fileIO();
+ TableCommitImpl commit =
table.newCommit(commitUser).ignoreEmptyCommit(false);
+ FileStore<?> fileStore = table.store();
+ TagManager tagManager = fileStore.newTagManager();
+ List<TagCallback> tagCallbacks = fileStore.createTagCallbacks();
+
assertThat(tagCallbacks).doesNotHaveAnyElementsOfTypes(SuccessFileTagCallback.class);
+
+ commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T12:12:00")));
+ assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 11");
+ assertThat(fileIO.exists(new Path(tagManager.tagPath("2023-07-18 11"),
"tag-success-file")))
+ .isFalse();
+
+ table.deleteTag("2023-07-18 11");
+ assertThat(tagManager.get("2023-07-18 11")).isEmpty();
+ assertThat(fileIO.exists(new Path(tagManager.tagPath("2023-07-18 11"),
"tag-success-file")))
+ .isFalse();
+ }
+}