This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f9720d23c [core] Introduce commit callback for tables (#1492)
f9720d23c is described below

commit f9720d23cbd1cbd230345feee2203df2c4373685
Author: tsreaper <[email protected]>
AuthorDate: Wed Jul 5 16:29:59 2023 +0800

    [core] Introduce commit callback for tables (#1492)
---
 .../shortcodes/generated/core_configuration.html   |  12 ++
 .../main/java/org/apache/paimon/CoreOptions.java   |  58 +++++++
 .../paimon/table/AbstractFileStoreTable.java       |   1 +
 .../apache/paimon/table/sink/CommitCallback.java   |  39 +++++
 .../apache/paimon/table/sink/TableCommitImpl.java  |  20 ++-
 .../apache/paimon/table/sink/TableCommitTest.java  | 168 +++++++++++++++++++++
 6 files changed, 296 insertions(+), 2 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 0c15e6c5e..6cb820349 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -56,6 +56,18 @@ under the License.
             <td>Boolean</td>
             <td>Whether to generate -U, +U changelog for the same record. This 
configuration is only valid for the changelog-producer is lookup or 
full-compaction.</td>
         </tr>
+        <tr>
+            <td><h5>commit.callback.#.param</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Parameter string for the constructor of class #. Callback 
class should parse the parameter by itself.</td>
+        </tr>
+        <tr>
+            <td><h5>commit.callbacks</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>A list of commit callback classes to be called after a 
successful commit. Class names are connected with comma (example: 
com.test.CallbackA,com.sample.CallbackB).</td>
+        </tr>
         <tr>
             <td><h5>commit.force-compact</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 010d6e169..0a1db378f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -30,7 +30,9 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.options.description.DescribedEnum;
 import org.apache.paimon.options.description.Description;
 import org.apache.paimon.options.description.InlineElement;
+import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
 
 import java.io.Serializable;
@@ -726,6 +728,23 @@ public class CoreOptions implements Serializable {
                                                             + 
STATS_MODE_SUFFIX))
                                     .build());
 
+    public static final ConfigOption<String> COMMIT_CALLBACKS =
+            key("commit.callbacks")
+                    .stringType()
+                    .defaultValue("")
+                    .withDescription(
+                            "A list of commit callback classes to be called 
after a successful commit. "
+                                    + "Class names are connected with comma "
+                                    + "(example: 
com.test.CallbackA,com.sample.CallbackB).");
+
+    public static final ConfigOption<String> COMMIT_CALLBACK_PARAM =
+            key("commit.callback.#.param")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Parameter string for the constructor of class #. "
+                                    + "Callback class should parse the 
parameter by itself.");
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
@@ -1054,6 +1073,45 @@ public class CoreOptions implements Serializable {
         return options.get(CONSUMER_EXPIRATION_TIME);
     }
 
+    public List<CommitCallback> commitCallbacks() {
+        List<CommitCallback> result = new ArrayList<>();
+        for (String className : options.get(COMMIT_CALLBACKS).split(",")) {
+            className = className.trim();
+            if (className.length() == 0) {
+                continue;
+            }
+
+            Class<?> clazz;
+            try {
+                clazz =
+                        Class.forName(
+                                className, true, 
Thread.currentThread().getContextClassLoader());
+            } catch (ClassNotFoundException e) {
+                throw new RuntimeException(e);
+            }
+            Preconditions.checkArgument(
+                    CommitCallback.class.isAssignableFrom(clazz),
+                    "Class " + clazz + " must implement " + 
CommitCallback.class);
+            String param = 
options.get(COMMIT_CALLBACK_PARAM.key().replace("#", className));
+
+            try {
+                if (param == null) {
+                    result.add((CommitCallback) clazz.newInstance());
+                } else {
+                    result.add(
+                            (CommitCallback) 
clazz.getConstructor(String.class).newInstance(param));
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(
+                        "Failed to initialize commit callback "
+                                + className
+                                + (param == null ? "" : " with param " + 
param),
+                        e);
+            }
+        }
+        return result;
+    }
+
     /** Specifies the merge engine for table with primary key. */
     public enum MergeEngine implements DescribedEnum {
         DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 78eceeb98..a769d6586 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -230,6 +230,7 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
     public TableCommitImpl newCommit(String commitUser) {
         return new TableCommitImpl(
                 store().newCommit(commitUser),
+                coreOptions().commitCallbacks(),
                 coreOptions().writeOnly() ? null : store().newExpire(),
                 coreOptions().writeOnly() ? null : 
store().newPartitionExpire(commitUser),
                 lockFactory.create(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
new file mode 100644
index 000000000..2d6fbc8e8
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.manifest.ManifestCommittable;
+
+import java.util.List;
+
+/**
+ * This callback will be called after a list of {@link ManifestCommittable}s 
is committed.
+ *
+ * <p>As a usage example, we can extract what partitions are created from the 
{@link
+ * ManifestCommittable}s and add these partitions into Hive metastore.
+ *
+ * <p>NOTE: To guarantee that this callback is called, if a failure occurs 
right after the commit,
+ * this callback might be called multiple times. Please make sure that your 
implementation is
+ * idempotent. That is, your callback can be called multiple times and still 
have the desired
+ * effect.
+ */
+public interface CommitCallback extends AutoCloseable {
+
+    void call(List<ManifestCommittable> committables);
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index a25823a0d..669359feb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -24,6 +24,7 @@ import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.operation.FileStoreExpire;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.operation.PartitionExpire;
+import org.apache.paimon.utils.IOUtils;
 
 import javax.annotation.Nullable;
 
@@ -46,6 +47,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkState;
 public class TableCommitImpl implements InnerTableCommit {
 
     private final FileStoreCommit commit;
+    private final List<CommitCallback> commitCallbacks;
     @Nullable private final FileStoreExpire expire;
     @Nullable private final PartitionExpire partitionExpire;
     private final Lock lock;
@@ -59,6 +61,7 @@ public class TableCommitImpl implements InnerTableCommit {
 
     public TableCommitImpl(
             FileStoreCommit commit,
+            List<CommitCallback> commitCallbacks,
             @Nullable FileStoreExpire expire,
             @Nullable PartitionExpire partitionExpire,
             Lock lock,
@@ -73,6 +76,7 @@ public class TableCommitImpl implements InnerTableCommit {
         }
 
         this.commit = commit;
+        this.commitCallbacks = commitCallbacks;
         this.expire = expire;
         this.partitionExpire = partitionExpire;
         this.lock = lock;
@@ -156,6 +160,8 @@ public class TableCommitImpl implements InnerTableCommit {
             commit.overwrite(overwritePartition, committable, 
Collections.emptyMap());
             expire(committable.identifier());
         }
+
+        commitCallbacks.forEach(c -> c.call(committables));
     }
 
     public int filterAndCommitMultiple(List<ManifestCommittable> committables) 
{
@@ -164,6 +170,15 @@ public class TableCommitImpl implements InnerTableCommit {
                         committables.stream()
                                 .map(ManifestCommittable::identifier)
                                 .collect(Collectors.toSet()));
+
+        // commitCallback may fail after the snapshot file is successfully 
created,
+        // so we have to try all of them again
+        List<ManifestCommittable> succeededCommittables =
+                committables.stream()
+                        .filter(c -> 
!retryIdentifiers.contains(c.identifier()))
+                        .collect(Collectors.toList());
+        commitCallbacks.forEach(c -> c.call(succeededCommittables));
+
         List<ManifestCommittable> retryCommittables =
                 committables.stream()
                         .filter(c -> retryIdentifiers.contains(c.identifier()))
@@ -193,9 +208,10 @@ public class TableCommitImpl implements InnerTableCommit {
 
     @Override
     public void close() throws Exception {
-        if (lock != null) {
-            lock.close();
+        for (CommitCallback commitCallback : commitCallbacks) {
+            IOUtils.closeQuietly(commitCallback);
         }
+        IOUtils.closeQuietly(lock);
     }
 
     @Override
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
new file mode 100644
index 000000000..00f91e37a
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.WriteMode;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ExceptionUtils;
+import org.apache.paimon.utils.FailingFileIO;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link TableCommit}. */
+public class TableCommitTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private static final Map<String, Set<Long>> commitCallbackResult = new 
ConcurrentHashMap<>();
+
+    @Test
+    public void testCommitCallbackWithFailure() throws Exception {
+        int numIdentifiers = 30;
+        String testId = UUID.randomUUID().toString();
+        commitCallbackResult.put(testId, new HashSet<>());
+
+        try {
+            testCommitCallbackWithFailureImpl(numIdentifiers, testId);
+        } finally {
+            commitCallbackResult.remove(testId);
+        }
+    }
+
+    private void testCommitCallbackWithFailureImpl(int numIdentifiers, String 
testId)
+            throws Exception {
+        String failingName = UUID.randomUUID().toString();
+        // no failure when creating table and writing data
+        FailingFileIO.reset(failingName, 0, 1);
+
+        String path = FailingFileIO.getFailingPath(failingName, 
tempDir.toString());
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
+                        new String[] {"k", "v"});
+
+        Options conf = new Options();
+        conf.set(CoreOptions.PATH, path);
+        conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+        conf.set(CoreOptions.COMMIT_CALLBACKS, 
TestCommitCallback.class.getName());
+        conf.set(
+                CoreOptions.COMMIT_CALLBACK_PARAM
+                        .key()
+                        .replace("#", TestCommitCallback.class.getName()),
+                testId);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), new 
Path(path)),
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.emptyList(),
+                                Collections.singletonList("k"),
+                                conf.toMap(),
+                                ""));
+
+        FileStoreTable table =
+                FileStoreTableFactory.create(
+                        new FailingFileIO(), new Path(path), tableSchema, 
Lock.emptyFactory());
+
+        String commitUser = UUID.randomUUID().toString();
+        StreamTableWrite write = table.newWrite(commitUser);
+        Map<Long, List<CommitMessage>> commitMessages = new HashMap<>();
+        for (int i = 0; i < numIdentifiers; i++) {
+            write.write(GenericRow.of(i, i * 1000L));
+            commitMessages.put((long) i, write.prepareCommit(true, i));
+        }
+        write.close();
+
+        StreamTableCommit commit = table.newCommit(commitUser);
+        // enable failure when committing
+        FailingFileIO.reset(failingName, 3, 1000);
+        while (true) {
+            try {
+                commit.filterAndCommit(commitMessages);
+                break;
+            } catch (Throwable t) {
+                // artificial exception is intended
+                Optional<FailingFileIO.ArtificialException> 
artificialException =
+                        ExceptionUtils.findThrowable(t, 
FailingFileIO.ArtificialException.class);
+                // this test emulates an extremely slow commit procedure,
+                // so conflicts may occur due to back pressuring
+                Optional<Throwable> conflictException =
+                        ExceptionUtils.findThrowableWithMessage(
+                                t, "Conflicts during commits are normal");
+                if (artificialException.isPresent() || 
conflictException.isPresent()) {
+                    continue;
+                }
+                throw t;
+            }
+        }
+        commit.close();
+
+        assertThat(commitCallbackResult.get(testId))
+                .isEqualTo(LongStream.range(0, 
numIdentifiers).boxed().collect(Collectors.toSet()));
+    }
+
+    /** {@link CommitCallback} for test. */
+    public static class TestCommitCallback implements CommitCallback {
+
+        private final String testId;
+
+        public TestCommitCallback(String testId) {
+            this.testId = testId;
+        }
+
+        @Override
+        public void call(List<ManifestCommittable> committables) {
+            committables.forEach(c -> 
commitCallbackResult.get(testId).add(c.identifier()));
+        }
+
+        @Override
+        public void close() throws Exception {}
+    }
+}

Reply via email to