This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f9720d23c [core] Introduce commit callback for tables (#1492)
f9720d23c is described below
commit f9720d23cbd1cbd230345feee2203df2c4373685
Author: tsreaper <[email protected]>
AuthorDate: Wed Jul 5 16:29:59 2023 +0800
[core] Introduce commit callback for tables (#1492)
---
.../shortcodes/generated/core_configuration.html | 12 ++
.../main/java/org/apache/paimon/CoreOptions.java | 58 +++++++
.../paimon/table/AbstractFileStoreTable.java | 1 +
.../apache/paimon/table/sink/CommitCallback.java | 39 +++++
.../apache/paimon/table/sink/TableCommitImpl.java | 20 ++-
.../apache/paimon/table/sink/TableCommitTest.java | 168 +++++++++++++++++++++
6 files changed, 296 insertions(+), 2 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 0c15e6c5e..6cb820349 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -56,6 +56,18 @@ under the License.
<td>Boolean</td>
<td>Whether to generate -U, +U changelog for the same record. This
configuration is only valid for the changelog-producer is lookup or
full-compaction.</td>
</tr>
+ <tr>
+ <td><h5>commit.callback.#.param</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Parameter string for the constructor of class #. Callback
class should parse the parameter by itself.</td>
+ </tr>
+ <tr>
+ <td><h5>commit.callbacks</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>A list of commit callback classes to be called after a
successful commit. Class names are connected with comma (example:
com.test.CallbackA,com.sample.CallbackB).</td>
+ </tr>
<tr>
<td><h5>commit.force-compact</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 010d6e169..0a1db378f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -30,7 +30,9 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.options.description.DescribedEnum;
import org.apache.paimon.options.description.Description;
import org.apache.paimon.options.description.InlineElement;
+import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
import java.io.Serializable;
@@ -726,6 +728,23 @@ public class CoreOptions implements Serializable {
+
STATS_MODE_SUFFIX))
.build());
+ public static final ConfigOption<String> COMMIT_CALLBACKS =
+ key("commit.callbacks")
+ .stringType()
+ .defaultValue("")
+ .withDescription(
+ "A list of commit callback classes to be called
after a successful commit. "
+ + "Class names are connected with comma "
+ + "(example:
com.test.CallbackA,com.sample.CallbackB).");
+
+ public static final ConfigOption<String> COMMIT_CALLBACK_PARAM =
+ key("commit.callback.#.param")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Parameter string for the constructor of class #. "
+ + "Callback class should parse the
parameter by itself.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -1054,6 +1073,45 @@ public class CoreOptions implements Serializable {
return options.get(CONSUMER_EXPIRATION_TIME);
}
+ public List<CommitCallback> commitCallbacks() {
+ List<CommitCallback> result = new ArrayList<>();
+ for (String className : options.get(COMMIT_CALLBACKS).split(",")) {
+ className = className.trim();
+ if (className.length() == 0) {
+ continue;
+ }
+
+ Class<?> clazz;
+ try {
+ clazz =
+ Class.forName(
+ className, true,
Thread.currentThread().getContextClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ Preconditions.checkArgument(
+ CommitCallback.class.isAssignableFrom(clazz),
+ "Class " + clazz + " must implement " +
CommitCallback.class);
+ String param =
options.get(COMMIT_CALLBACK_PARAM.key().replace("#", className));
+
+ try {
+ if (param == null) {
+ result.add((CommitCallback) clazz.newInstance());
+ } else {
+ result.add(
+ (CommitCallback)
clazz.getConstructor(String.class).newInstance(param));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to initialize commit callback "
+ + className
+ + (param == null ? "" : " with param " +
param),
+ e);
+ }
+ }
+ return result;
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 78eceeb98..a769d6586 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -230,6 +230,7 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
public TableCommitImpl newCommit(String commitUser) {
return new TableCommitImpl(
store().newCommit(commitUser),
+ coreOptions().commitCallbacks(),
coreOptions().writeOnly() ? null : store().newExpire(),
coreOptions().writeOnly() ? null :
store().newPartitionExpire(commitUser),
lockFactory.create(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
new file mode 100644
index 000000000..2d6fbc8e8
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.manifest.ManifestCommittable;
+
+import java.util.List;
+
+/**
+ * This callback will be called after a list of {@link ManifestCommittable}s
is committed.
+ *
+ * <p>As a usage example, we can extract what partitions are created from the
{@link
+ * ManifestCommittable}s and add these partitions into Hive metastore.
+ *
+ * <p>NOTE: To guarantee that this callback is called, if a failure occurs
right after the commit,
+ * this callback might be called multiple times. Please make sure that your
implementation is
+ * idempotent. That is, your callback can be called multiple times and still
have the desired
+ * effect.
+ */
+public interface CommitCallback extends AutoCloseable {
+
+ void call(List<ManifestCommittable> committables);
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index a25823a0d..669359feb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -24,6 +24,7 @@ import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreExpire;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.operation.PartitionExpire;
+import org.apache.paimon.utils.IOUtils;
import javax.annotation.Nullable;
@@ -46,6 +47,7 @@ import static
org.apache.paimon.utils.Preconditions.checkState;
public class TableCommitImpl implements InnerTableCommit {
private final FileStoreCommit commit;
+ private final List<CommitCallback> commitCallbacks;
@Nullable private final FileStoreExpire expire;
@Nullable private final PartitionExpire partitionExpire;
private final Lock lock;
@@ -59,6 +61,7 @@ public class TableCommitImpl implements InnerTableCommit {
public TableCommitImpl(
FileStoreCommit commit,
+ List<CommitCallback> commitCallbacks,
@Nullable FileStoreExpire expire,
@Nullable PartitionExpire partitionExpire,
Lock lock,
@@ -73,6 +76,7 @@ public class TableCommitImpl implements InnerTableCommit {
}
this.commit = commit;
+ this.commitCallbacks = commitCallbacks;
this.expire = expire;
this.partitionExpire = partitionExpire;
this.lock = lock;
@@ -156,6 +160,8 @@ public class TableCommitImpl implements InnerTableCommit {
commit.overwrite(overwritePartition, committable,
Collections.emptyMap());
expire(committable.identifier());
}
+
+ commitCallbacks.forEach(c -> c.call(committables));
}
public int filterAndCommitMultiple(List<ManifestCommittable> committables)
{
@@ -164,6 +170,15 @@ public class TableCommitImpl implements InnerTableCommit {
committables.stream()
.map(ManifestCommittable::identifier)
.collect(Collectors.toSet()));
+
+ // commitCallback may fail after the snapshot file is successfully
created,
+ // so we have to try all of them again
+ List<ManifestCommittable> succeededCommittables =
+ committables.stream()
+ .filter(c ->
!retryIdentifiers.contains(c.identifier()))
+ .collect(Collectors.toList());
+ commitCallbacks.forEach(c -> c.call(succeededCommittables));
+
List<ManifestCommittable> retryCommittables =
committables.stream()
.filter(c -> retryIdentifiers.contains(c.identifier()))
@@ -193,9 +208,10 @@ public class TableCommitImpl implements InnerTableCommit {
@Override
public void close() throws Exception {
- if (lock != null) {
- lock.close();
+ for (CommitCallback commitCallback : commitCallbacks) {
+ IOUtils.closeQuietly(commitCallback);
}
+ IOUtils.closeQuietly(lock);
}
@Override
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
new file mode 100644
index 000000000..00f91e37a
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.WriteMode;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ExceptionUtils;
+import org.apache.paimon.utils.FailingFileIO;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link TableCommit}. */
+public class TableCommitTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private static final Map<String, Set<Long>> commitCallbackResult = new
ConcurrentHashMap<>();
+
+ @Test
+ public void testCommitCallbackWithFailure() throws Exception {
+ int numIdentifiers = 30;
+ String testId = UUID.randomUUID().toString();
+ commitCallbackResult.put(testId, new HashSet<>());
+
+ try {
+ testCommitCallbackWithFailureImpl(numIdentifiers, testId);
+ } finally {
+ commitCallbackResult.remove(testId);
+ }
+ }
+
+ private void testCommitCallbackWithFailureImpl(int numIdentifiers, String
testId)
+ throws Exception {
+ String failingName = UUID.randomUUID().toString();
+ // no failure when creating table and writing data
+ FailingFileIO.reset(failingName, 0, 1);
+
+ String path = FailingFileIO.getFailingPath(failingName,
tempDir.toString());
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
+ new String[] {"k", "v"});
+
+ Options conf = new Options();
+ conf.set(CoreOptions.PATH, path);
+ conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+ conf.set(CoreOptions.COMMIT_CALLBACKS,
TestCommitCallback.class.getName());
+ conf.set(
+ CoreOptions.COMMIT_CALLBACK_PARAM
+ .key()
+ .replace("#", TestCommitCallback.class.getName()),
+ testId);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), new
Path(path)),
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ conf.toMap(),
+ ""));
+
+ FileStoreTable table =
+ FileStoreTableFactory.create(
+ new FailingFileIO(), new Path(path), tableSchema,
Lock.emptyFactory());
+
+ String commitUser = UUID.randomUUID().toString();
+ StreamTableWrite write = table.newWrite(commitUser);
+ Map<Long, List<CommitMessage>> commitMessages = new HashMap<>();
+ for (int i = 0; i < numIdentifiers; i++) {
+ write.write(GenericRow.of(i, i * 1000L));
+ commitMessages.put((long) i, write.prepareCommit(true, i));
+ }
+ write.close();
+
+ StreamTableCommit commit = table.newCommit(commitUser);
+ // enable failure when committing
+ FailingFileIO.reset(failingName, 3, 1000);
+ while (true) {
+ try {
+ commit.filterAndCommit(commitMessages);
+ break;
+ } catch (Throwable t) {
+ // artificial exception is intended
+ Optional<FailingFileIO.ArtificialException>
artificialException =
+ ExceptionUtils.findThrowable(t,
FailingFileIO.ArtificialException.class);
+ // this test emulates an extremely slow commit procedure,
+ // so conflicts may occur due to back pressuring
+ Optional<Throwable> conflictException =
+ ExceptionUtils.findThrowableWithMessage(
+ t, "Conflicts during commits are normal");
+ if (artificialException.isPresent() ||
conflictException.isPresent()) {
+ continue;
+ }
+ throw t;
+ }
+ }
+ commit.close();
+
+ assertThat(commitCallbackResult.get(testId))
+ .isEqualTo(LongStream.range(0,
numIdentifiers).boxed().collect(Collectors.toSet()));
+ }
+
+ /** {@link CommitCallback} for test. */
+ public static class TestCommitCallback implements CommitCallback {
+
+ private final String testId;
+
+ public TestCommitCallback(String testId) {
+ this.testId = testId;
+ }
+
+ @Override
+ public void call(List<ManifestCommittable> committables) {
+ committables.forEach(c ->
commitCallbackResult.get(testId).add(c.identifier()));
+ }
+
+ @Override
+ public void close() throws Exception {}
+ }
+}