This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new d92a7f37 [FLINK-28605] Throw exception intentionally when new snapshots are committed during restore d92a7f37 is described below commit d92a7f37dc072a6d3178b28c2d6040667b9f96b1 Author: tsreaper <tsreape...@gmail.com> AuthorDate: Tue Jul 19 16:08:32 2022 +0800 [FLINK-28605] Throw exception intentionally when new snapshots are committed during restore This closes #225 --- .../store/connector/sink/CommitterOperator.java | 16 +- .../connector/sink/CommitterOperatorTest.java | 172 +++++++++++++++++++++ 2 files changed, 185 insertions(+), 3 deletions(-) diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java index 2ae924aa..fb7083ac 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java @@ -136,14 +136,24 @@ public class CommitterOperator extends AbstractStreamOperator<Committable> commit(true, restored); } - public void commit(boolean isRecover, List<ManifestCommittable> committables) throws Exception { + private void commit(boolean isRecover, List<ManifestCommittable> committables) + throws Exception { if (isRecover) { committables = committer.filterRecoveredCommittables(committables); + if (!committables.isEmpty()) { + committer.commit(committables); + throw new RuntimeException( + "This exception is intentionally thrown " + + "after committing the restored checkpoints. " + + "By restarting the job we hope that " + + "writers can start writing based on these new commits."); + } + } else { + committer.commit(committables); } - committer.commit(committables); } - public ManifestCommittable toCommittables(long checkpoint, List<Committable> inputs) + private ManifestCommittable toCommittables(long checkpoint, List<Committable> inputs) throws Exception { return committer.combine(checkpoint, inputs); } diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java new file mode 100644 index 00000000..c9ea2710 --- /dev/null +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.sink; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.CoreOptions; +import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer; +import org.apache.flink.table.store.file.schema.SchemaManager; +import org.apache.flink.table.store.file.schema.UpdateSchema; +import org.apache.flink.table.store.file.utils.RecordReader; +import org.apache.flink.table.store.file.utils.RecordReaderIterator; +import org.apache.flink.table.store.table.FileStoreTable; +import org.apache.flink.table.store.table.FileStoreTableFactory; +import org.apache.flink.table.store.table.sink.FileCommittable; +import org.apache.flink.table.store.table.sink.TableWrite; +import org.apache.flink.table.store.table.source.TableRead; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.fail; + +/** Tests for {@link CommitterOperator}. */ +public class CommitterOperatorTest { + + private static final RowType ROW_TYPE = + RowType.of( + new LogicalType[] { + DataTypes.INT().getLogicalType(), DataTypes.BIGINT().getLogicalType() + }, + new String[] {"a", "b"}); + + @TempDir public java.nio.file.Path tempDir; + private Path tablePath; + + @BeforeEach + public void before() { + tablePath = new Path(tempDir.toString()); + } + + @Test + public void testFailIntentionallyAfterRestore() throws Exception { + FileStoreTable table = createFileStoreTable(); + + OneInputStreamOperatorTestHarness<Committable, Committable> testHarness = + createTestHarness(table); + testHarness.open(); + + TableWrite write = table.newWrite(); + write.write(GenericRowData.of(1, 10L)); + write.write(GenericRowData.of(2, 20L)); + + long timestamp = 1; + for (FileCommittable committable : write.prepareCommit(false)) { + testHarness.processElement( + new Committable(Committable.Kind.FILE, committable), timestamp++); + } + // checkpoint is completed but not notified, so no snapshot is committed + OperatorSubtaskState snapshot = testHarness.snapshot(0, timestamp++); + assertThat(table.snapshotManager().latestSnapshotId()).isNull(); + + testHarness = createTestHarness(table); + try { + // commit snapshot from state, fail intentionally + testHarness.initializeState(snapshot); + testHarness.open(); + fail("Expecting intentional exception"); + } catch (Exception e) { + assertThat(e) + .hasMessageContaining( + "This exception is intentionally thrown " + + "after committing the restored checkpoints. " + + "By restarting the job we hope that " + + "writers can start writing based on these new commits."); + } + assertResults(table, "1, 10", "2, 20"); + + // snapshot is successfully committed, no failure is needed + testHarness = createTestHarness(table); + testHarness.initializeState(snapshot); + testHarness.open(); + assertResults(table, "1, 10", "2, 20"); + } + + private void assertResults(FileStoreTable table, String... expected) { + TableRead read = table.newRead(); + List<String> actual = new ArrayList<>(); + table.newScan() + .plan() + .splits + .forEach( + s -> { + try { + RecordReader<RowData> recordReader = read.createReader(s); + CloseableIterator<RowData> it = + new RecordReaderIterator<>(recordReader); + while (it.hasNext()) { + RowData row = it.next(); + actual.add(row.getInt(0) + ", " + row.getLong(1)); + } + it.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + Collections.sort(actual); + assertThat(actual).isEqualTo(Arrays.asList(expected)); + } + + private FileStoreTable createFileStoreTable() throws Exception { + Configuration conf = new Configuration(); + conf.set(CoreOptions.PATH, tablePath.toString()); + SchemaManager schemaManager = new SchemaManager(tablePath); + schemaManager.commitNewVersion( + new UpdateSchema( + ROW_TYPE, + Collections.emptyList(), + Collections.emptyList(), + conf.toMap(), + "")); + return FileStoreTableFactory.create(conf); + } + + private OneInputStreamOperatorTestHarness<Committable, Committable> createTestHarness( + FileStoreTable table) throws Exception { + CommitterOperator operator = + new CommitterOperator( + true, + user -> new StoreCommitter(table.newCommit(user)), + ManifestCommittableSerializer::new); + TypeSerializer<Committable> serializer = + new CommittableTypeInfo().createSerializer(new ExecutionConfig()); + OneInputStreamOperatorTestHarness<Committable, Committable> harness = + new OneInputStreamOperatorTestHarness<>(operator, serializer); + harness.setup(serializer); + return harness; + } +}