This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fe04a6e9d0f8be33cb9801b4e021c0cd7d0a5b29 Author: Arvid Heise <[email protected]> AuthorDate: Sun Apr 13 09:38:22 2025 +0200 [FLINK-37605][runtime] Remove obsolete sink tests With the removal of SinkV1, all adapter tests have also been testing V2. We can remove the adapter tests and simplify test hierarchy. --- .../operators/sink/CommitterOperatorTestBase.java | 359 ------------- .../sink/SinkV2CommitterOperatorTest.java | 335 ++++++++++++- .../sink/SinkV2SinkWriterOperatorTest.java | 509 ++++++++++++++++++- .../operators/sink/SinkWriterOperatorTestBase.java | 558 --------------------- .../sink/WithAdapterCommitterOperatorTest.java | 70 --- .../sink/WithAdapterSinkWriterOperatorTest.java | 135 ----- 6 files changed, 828 insertions(+), 1138 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java deleted file mode 100644 index 5fdb36a3953..00000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java +++ /dev/null @@ -1,359 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.sink; - -import org.apache.flink.api.connector.sink2.SupportsCommitter; -import org.apache.flink.configuration.SinkOptions; -import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; -import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; -import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; -import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert; -import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; - -import org.assertj.core.api.AbstractThrowableAssert; -import org.assertj.core.api.ListAssert; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; - -import java.util.function.IntSupplier; - -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; -import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; -import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; -import static org.assertj.core.api.Assertions.as; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatCode; - -abstract class CommitterOperatorTestBase { - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testEmitCommittables(boolean withPostCommitTopology) throws Exception { - SinkAndCounters sinkAndCounters; - if (withPostCommitTopology) { - // Insert global committer to simulate post commit topology - sinkAndCounters = sinkWithPostCommit(); - } else { - sinkAndCounters = sinkWithoutPostCommit(); - } - final OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - testHarness = - new OneInputStreamOperatorTestHarness<>( - new CommitterOperatorFactory<>(sinkAndCounters.sink, false, true)); - testHarness.open(); - - final CommittableSummary<String> committableSummary = - new CommittableSummary<>(1, 1, 1L, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableWithLineage<String> committableWithLineage = - new CommittableWithLineage<>("1", 1L, 1); - testHarness.processElement(new StreamRecord<>(committableWithLineage)); - - // Trigger commit - testHarness.notifyOfCompletedCheckpoint(1); - - assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1); - if (withPostCommitTopology) { - ListAssert<CommittableMessage<String>> records = - assertThat(testHarness.extractOutputValues()).hasSize(2); - records.element(0, as(committableSummary())) - .hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()) - .hasOverallCommittables(committableSummary.getNumberOfCommittables()); - records.element(1, as(committableWithLineage())) - .isEqualTo(committableWithLineage.withSubtaskId(0)); - } else { - assertThat(testHarness.getOutput()).isEmpty(); - } - testHarness.close(); - } - - @Test - void ensureAllCommittablesArrivedBeforeCommitting() throws Exception { - SinkAndCounters sinkAndCounters = sinkWithPostCommit(); - final OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - testHarness = createTestHarness(sinkAndCounters.sink, false, true); - testHarness.open(); - testHarness.setProcessingTime(0); - - // Only send first committable - final CommittableSummary<String> committableSummary = - new CommittableSummary<>(1, 1, 1L, 2, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableWithLineage<String> first = new CommittableWithLineage<>("1", 1L, 1); - testHarness.processElement(new StreamRecord<>(first)); - - assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1)) - .hasMessageContaining("Trying to commit incomplete batch of committables"); - - assertThat(testHarness.getOutput()).isEmpty(); - assertThat(sinkAndCounters.commitCounter.getAsInt()).isZero(); - - final CommittableWithLineage<String> second = new CommittableWithLineage<>("2", 1L, 1); - testHarness.processElement(new StreamRecord<>(second)); - - assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1)).doesNotThrowAnyException(); - - assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2); - ListAssert<CommittableMessage<String>> records = - assertThat(testHarness.extractOutputValues()).hasSize(3); - records.element(0, as(committableSummary())) - .hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()) - .hasOverallCommittables(committableSummary.getNumberOfCommittables()); - records.element(1, as(committableWithLineage())).isEqualTo(first.withSubtaskId(0)); - records.element(2, as(committableWithLineage())).isEqualTo(second.withSubtaskId(0)); - testHarness.close(); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws Exception { - SinkAndCounters sinkAndCounters = sinkWithPostCommit(); - final OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - testHarness = createTestHarness(sinkAndCounters.sink, isBatchMode, !isBatchMode); - testHarness.open(); - - final CommittableSummary<String> committableSummary = - new CommittableSummary<>(1, 2, EOI, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableSummary<String> committableSummary2 = - new CommittableSummary<>(2, 2, EOI, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary2)); - - final CommittableWithLineage<String> first = new CommittableWithLineage<>("1", EOI, 1); - testHarness.processElement(new StreamRecord<>(first)); - final CommittableWithLineage<String> second = new CommittableWithLineage<>("1", EOI, 2); - testHarness.processElement(new StreamRecord<>(second)); - - testHarness.endInput(); - if (!isBatchMode) { - assertThat(testHarness.getOutput()).isEmpty(); - // notify final checkpoint complete - testHarness.notifyOfCompletedCheckpoint(1); - } - - ListAssert<CommittableMessage<String>> records = - assertThat(testHarness.extractOutputValues()).hasSize(3); - records.element(0, as(committableSummary())) - .hasFailedCommittables(0) - .hasOverallCommittables(2); - records.element(1, as(committableWithLineage())).isEqualTo(first.withSubtaskId(0)); - records.element(2, as(committableWithLineage())).isEqualTo(second.withSubtaskId(0)); - testHarness.close(); - } - - @Test - void testStateRestore() throws Exception { - - final int originalSubtaskId = 0; - final int subtaskIdAfterRecovery = 9; - - final OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - testHarness = - createTestHarness( - sinkWithPostCommitWithRetry().sink, - false, - true, - 1, - 1, - originalSubtaskId); - testHarness.open(); - - // We cannot test a different checkpoint thant 0 because when using the OperatorTestHarness - // for recovery the lastCompleted checkpoint is always reset to 0. - long checkpointId = 0L; - - final CommittableSummary<String> committableSummary = - new CommittableSummary<>(originalSubtaskId, 1, checkpointId, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableWithLineage<String> first = - new CommittableWithLineage<>("1", checkpointId, originalSubtaskId); - testHarness.processElement(new StreamRecord<>(first)); - - // another committable for the same checkpointId but from different subtask. - final CommittableSummary<String> committableSummary2 = - new CommittableSummary<>(originalSubtaskId + 1, 1, checkpointId, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary2)); - final CommittableWithLineage<String> second = - new CommittableWithLineage<>("2", checkpointId, originalSubtaskId + 1); - testHarness.processElement(new StreamRecord<>(second)); - - final OperatorSubtaskState snapshot = testHarness.snapshot(checkpointId, 2L); - assertThat(testHarness.getOutput()).isEmpty(); - testHarness.close(); - - // create new testHarness but with different parallelism level and subtaskId that original - // one. - // we will make sure that new subtaskId was used during committable recovery. - SinkAndCounters sinkAndCounters = sinkWithPostCommit(); - final OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - restored = - createTestHarness( - sinkAndCounters.sink, false, true, 10, 10, subtaskIdAfterRecovery); - - restored.initializeState(snapshot); - restored.open(); - - // Previous committables are immediately committed if possible - assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2); - ListAssert<CommittableMessage<String>> records = - assertThat(restored.extractOutputValues()).hasSize(3); - CommittableSummaryAssert<Object> objectCommittableSummaryAssert = - records.element(0, as(committableSummary())) - .hasCheckpointId(checkpointId) - .hasFailedCommittables(0) - .hasSubtaskId(subtaskIdAfterRecovery); - objectCommittableSummaryAssert.hasOverallCommittables(2); - - // Expect the same checkpointId that the original snapshot was made with. - records.element(1, as(committableWithLineage())) - .hasCheckpointId(checkpointId) - .hasSubtaskId(subtaskIdAfterRecovery) - .hasCommittable(first.getCommittable()); - records.element(2, as(committableWithLineage())) - .hasCheckpointId(checkpointId) - .hasSubtaskId(subtaskIdAfterRecovery) - .hasCommittable(second.getCommittable()); - restored.close(); - } - - @ParameterizedTest - @ValueSource(ints = {0, 1}) - void testNumberOfRetries(int numRetries) throws Exception { - try (OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - testHarness = - createTestHarness( - sinkWithPostCommitWithRetry().sink, false, true, 1, 1, 0)) { - testHarness - .getStreamConfig() - .getConfiguration() - .set(SinkOptions.COMMITTER_RETRIES, numRetries); - testHarness.open(); - - long ckdId = 1L; - testHarness.processElement( - new StreamRecord<>(new CommittableSummary<>(0, 1, ckdId, 1, 0))); - testHarness.processElement( - new StreamRecord<>(new CommittableWithLineage<>("1", ckdId, 0))); - AbstractThrowableAssert<?, ? extends Throwable> throwableAssert = - assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(ckdId)); - if (numRetries == 0) { - throwableAssert.hasMessageContaining("Failed to commit 1 committables"); - } else { - throwableAssert.doesNotThrowAnyException(); - } - } - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception { - final SinkAndCounters sinkAndCounters = sinkWithPostCommit(); - - try (OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - testHarness = - new OneInputStreamOperatorTestHarness<>( - new CommitterOperatorFactory<>( - sinkAndCounters.sink, false, isCheckpointingEnabled))) { - testHarness.open(); - - final CommittableSummary<String> committableSummary = - new CommittableSummary<>(1, 1, 1L, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableWithLineage<String> committableWithLineage = - new CommittableWithLineage<>("1", 1L, 1); - testHarness.processElement(new StreamRecord<>(committableWithLineage)); - - testHarness.endInput(); - - // If checkpointing enabled endInput does not emit anything because a final checkpoint - // follows - if (isCheckpointingEnabled) { - testHarness.notifyOfCompletedCheckpoint(1); - } - - ListAssert<CommittableMessage<String>> records = - assertThat(testHarness.extractOutputValues()).hasSize(2); - CommittableSummaryAssert<Object> objectCommittableSummaryAssert = - records.element(0, as(committableSummary())).hasCheckpointId(1L); - objectCommittableSummaryAssert.hasOverallCommittables(1); - records.element(1, as(committableWithLineage())) - .isEqualTo(committableWithLineage.withSubtaskId(0)); - - // Future emission calls should change the output - testHarness.notifyOfCompletedCheckpoint(2); - testHarness.endInput(); - - assertThat(testHarness.getOutput()).hasSize(2); - } - } - - private OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - createTestHarness( - SupportsCommitter<String> sink, - boolean isBatchMode, - boolean isCheckpointingEnabled) - throws Exception { - return new OneInputStreamOperatorTestHarness<>( - new CommitterOperatorFactory<>(sink, isBatchMode, isCheckpointingEnabled)); - } - - private OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - createTestHarness( - SupportsCommitter<String> sink, - boolean isBatchMode, - boolean isCheckpointingEnabled, - int maxParallelism, - int parallelism, - int subtaskId) - throws Exception { - return new OneInputStreamOperatorTestHarness<>( - new CommitterOperatorFactory<>(sink, isBatchMode, isCheckpointingEnabled), - maxParallelism, - parallelism, - subtaskId); - } - - abstract SinkAndCounters sinkWithPostCommit(); - - abstract SinkAndCounters sinkWithPostCommitWithRetry(); - - abstract SinkAndCounters sinkWithoutPostCommit(); - - static class SinkAndCounters { - SupportsCommitter<String> sink; - IntSupplier commitCounter; - - public SinkAndCounters(SupportsCommitter<String> sink, IntSupplier commitCounter) { - this.sink = sink; - this.commitCounter = commitCounter; - } - } -} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java index 0112b5cf862..4c7291dd44c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java @@ -19,11 +19,32 @@ package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.api.connector.sink2.SupportsCommitter; +import org.apache.flink.configuration.SinkOptions; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import org.assertj.core.api.AbstractThrowableAssert; +import org.assertj.core.api.ListAssert; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.Collection; +import java.util.function.IntSupplier; + +import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; +import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; +import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; +import static org.assertj.core.api.Assertions.as; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; -class SinkV2CommitterOperatorTest extends CommitterOperatorTestBase { - @Override +class SinkV2CommitterOperatorTest { SinkAndCounters sinkWithPostCommit() { ForwardingCommitter committer = new ForwardingCommitter(); return new SinkAndCounters( @@ -35,9 +56,8 @@ class SinkV2CommitterOperatorTest extends CommitterOperatorTestBase { () -> committer.successfulCommits); } - @Override SinkAndCounters sinkWithPostCommitWithRetry() { - return new CommitterOperatorTestBase.SinkAndCounters( + return new SinkAndCounters( (SupportsCommitter<String>) TestSinkV2.newBuilder() .setCommitter(new TestSinkV2.RetryOnceCommitter()) @@ -46,7 +66,6 @@ class SinkV2CommitterOperatorTest extends CommitterOperatorTestBase { () -> 0); } - @Override SinkAndCounters sinkWithoutPostCommit() { ForwardingCommitter committer = new ForwardingCommitter(); return new SinkAndCounters( @@ -58,6 +77,302 @@ class SinkV2CommitterOperatorTest extends CommitterOperatorTestBase { () -> committer.successfulCommits); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testEmitCommittables(boolean withPostCommitTopology) throws Exception { + SinkAndCounters sinkAndCounters; + if (withPostCommitTopology) { + // Insert global committer to simulate post commit topology + sinkAndCounters = sinkWithPostCommit(); + } else { + sinkAndCounters = sinkWithoutPostCommit(); + } + final OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + testHarness = + new OneInputStreamOperatorTestHarness<>( + new CommitterOperatorFactory<>(sinkAndCounters.sink, false, true)); + testHarness.open(); + + final CommittableSummary<String> committableSummary = + new CommittableSummary<>(1, 1, 1L, 1, 0); + testHarness.processElement(new StreamRecord<>(committableSummary)); + final CommittableWithLineage<String> committableWithLineage = + new CommittableWithLineage<>("1", 1L, 1); + testHarness.processElement(new StreamRecord<>(committableWithLineage)); + + // Trigger commit + testHarness.notifyOfCompletedCheckpoint(1); + + assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1); + if (withPostCommitTopology) { + ListAssert<CommittableMessage<String>> records = + assertThat(testHarness.extractOutputValues()).hasSize(2); + records.element(0, as(committableSummary())) + .hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()) + .hasOverallCommittables(committableSummary.getNumberOfCommittables()); + records.element(1, as(committableWithLineage())) + .isEqualTo(committableWithLineage.withSubtaskId(0)); + } else { + assertThat(testHarness.getOutput()).isEmpty(); + } + testHarness.close(); + } + + @Test + void ensureAllCommittablesArrivedBeforeCommitting() throws Exception { + SinkAndCounters sinkAndCounters = sinkWithPostCommit(); + final OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + testHarness = createTestHarness(sinkAndCounters.sink, false, true); + testHarness.open(); + testHarness.setProcessingTime(0); + + // Only send first committable + final CommittableSummary<String> committableSummary = + new CommittableSummary<>(1, 1, 1L, 2, 0); + testHarness.processElement(new StreamRecord<>(committableSummary)); + final CommittableWithLineage<String> first = new CommittableWithLineage<>("1", 1L, 1); + testHarness.processElement(new StreamRecord<>(first)); + + assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1)) + .hasMessageContaining("Trying to commit incomplete batch of committables"); + + assertThat(testHarness.getOutput()).isEmpty(); + assertThat(sinkAndCounters.commitCounter.getAsInt()).isZero(); + + final CommittableWithLineage<String> second = new CommittableWithLineage<>("2", 1L, 1); + testHarness.processElement(new StreamRecord<>(second)); + + assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1)).doesNotThrowAnyException(); + + assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2); + ListAssert<CommittableMessage<String>> records = + assertThat(testHarness.extractOutputValues()).hasSize(3); + records.element(0, as(committableSummary())) + .hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()) + .hasOverallCommittables(committableSummary.getNumberOfCommittables()); + records.element(1, as(committableWithLineage())).isEqualTo(first.withSubtaskId(0)); + records.element(2, as(committableWithLineage())).isEqualTo(second.withSubtaskId(0)); + testHarness.close(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws Exception { + SinkAndCounters sinkAndCounters = sinkWithPostCommit(); + final OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + testHarness = createTestHarness(sinkAndCounters.sink, isBatchMode, !isBatchMode); + testHarness.open(); + + final CommittableSummary<String> committableSummary = + new CommittableSummary<>(1, 2, EOI, 1, 0); + testHarness.processElement(new StreamRecord<>(committableSummary)); + final CommittableSummary<String> committableSummary2 = + new CommittableSummary<>(2, 2, EOI, 1, 0); + testHarness.processElement(new StreamRecord<>(committableSummary2)); + + final CommittableWithLineage<String> first = new CommittableWithLineage<>("1", EOI, 1); + testHarness.processElement(new StreamRecord<>(first)); + final CommittableWithLineage<String> second = new CommittableWithLineage<>("1", EOI, 2); + testHarness.processElement(new StreamRecord<>(second)); + + testHarness.endInput(); + if (!isBatchMode) { + assertThat(testHarness.getOutput()).isEmpty(); + // notify final checkpoint complete + testHarness.notifyOfCompletedCheckpoint(1); + } + + ListAssert<CommittableMessage<String>> records = + assertThat(testHarness.extractOutputValues()).hasSize(3); + records.element(0, as(committableSummary())) + .hasFailedCommittables(0) + .hasOverallCommittables(2); + records.element(1, as(committableWithLineage())).isEqualTo(first.withSubtaskId(0)); + records.element(2, as(committableWithLineage())).isEqualTo(second.withSubtaskId(0)); + testHarness.close(); + } + + @Test + void testStateRestore() throws Exception { + + final int originalSubtaskId = 0; + final int subtaskIdAfterRecovery = 9; + + final OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + testHarness = + createTestHarness( + sinkWithPostCommitWithRetry().sink, + false, + true, + 1, + 1, + originalSubtaskId); + testHarness.open(); + + // We cannot test a different checkpoint thant 0 because when using the OperatorTestHarness + // for recovery the lastCompleted checkpoint is always reset to 0. + long checkpointId = 0L; + + final CommittableSummary<String> committableSummary = + new CommittableSummary<>(originalSubtaskId, 1, checkpointId, 1, 0); + testHarness.processElement(new StreamRecord<>(committableSummary)); + final CommittableWithLineage<String> first = + new CommittableWithLineage<>("1", checkpointId, originalSubtaskId); + testHarness.processElement(new StreamRecord<>(first)); + + // another committable for the same checkpointId but from different subtask. + final CommittableSummary<String> committableSummary2 = + new CommittableSummary<>(originalSubtaskId + 1, 1, checkpointId, 1, 0); + testHarness.processElement(new StreamRecord<>(committableSummary2)); + final CommittableWithLineage<String> second = + new CommittableWithLineage<>("2", checkpointId, originalSubtaskId + 1); + testHarness.processElement(new StreamRecord<>(second)); + + final OperatorSubtaskState snapshot = testHarness.snapshot(checkpointId, 2L); + assertThat(testHarness.getOutput()).isEmpty(); + testHarness.close(); + + // create new testHarness but with different parallelism level and subtaskId that original + // one. + // we will make sure that new subtaskId was used during committable recovery. + SinkAndCounters sinkAndCounters = sinkWithPostCommit(); + final OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + restored = + createTestHarness( + sinkAndCounters.sink, false, true, 10, 10, subtaskIdAfterRecovery); + + restored.initializeState(snapshot); + restored.open(); + + // Previous committables are immediately committed if possible + assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2); + ListAssert<CommittableMessage<String>> records = + assertThat(restored.extractOutputValues()).hasSize(3); + CommittableSummaryAssert<Object> objectCommittableSummaryAssert = + records.element(0, as(committableSummary())) + .hasCheckpointId(checkpointId) + .hasFailedCommittables(0) + .hasSubtaskId(subtaskIdAfterRecovery); + objectCommittableSummaryAssert.hasOverallCommittables(2); + + // Expect the same checkpointId that the original snapshot was made with. + records.element(1, as(committableWithLineage())) + .hasCheckpointId(checkpointId) + .hasSubtaskId(subtaskIdAfterRecovery) + .hasCommittable(first.getCommittable()); + records.element(2, as(committableWithLineage())) + .hasCheckpointId(checkpointId) + .hasSubtaskId(subtaskIdAfterRecovery) + .hasCommittable(second.getCommittable()); + restored.close(); + } + + @ParameterizedTest + @ValueSource(ints = {0, 1}) + void testNumberOfRetries(int numRetries) throws Exception { + try (OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + testHarness = + createTestHarness( + sinkWithPostCommitWithRetry().sink, false, true, 1, 1, 0)) { + testHarness + .getStreamConfig() + .getConfiguration() + .set(SinkOptions.COMMITTER_RETRIES, numRetries); + testHarness.open(); + + long ckdId = 1L; + testHarness.processElement( + new StreamRecord<>(new CommittableSummary<>(0, 1, ckdId, 1, 0))); + testHarness.processElement( + new StreamRecord<>(new CommittableWithLineage<>("1", ckdId, 0))); + AbstractThrowableAssert<?, ? extends Throwable> throwableAssert = + assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(ckdId)); + if (numRetries == 0) { + throwableAssert.hasMessageContaining("Failed to commit 1 committables"); + } else { + throwableAssert.doesNotThrowAnyException(); + } + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception { + final SinkAndCounters sinkAndCounters = sinkWithPostCommit(); + + try (OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + testHarness = + new OneInputStreamOperatorTestHarness<>( + new CommitterOperatorFactory<>( + sinkAndCounters.sink, false, isCheckpointingEnabled))) { + testHarness.open(); + + final CommittableSummary<String> committableSummary = + new CommittableSummary<>(1, 1, 1L, 1, 0); + testHarness.processElement(new StreamRecord<>(committableSummary)); + final CommittableWithLineage<String> committableWithLineage = + new CommittableWithLineage<>("1", 1L, 1); + testHarness.processElement(new StreamRecord<>(committableWithLineage)); + + testHarness.endInput(); + + // If checkpointing enabled endInput does not emit anything because a final checkpoint + // follows + if (isCheckpointingEnabled) { + testHarness.notifyOfCompletedCheckpoint(1); + } + + ListAssert<CommittableMessage<String>> records = + assertThat(testHarness.extractOutputValues()).hasSize(2); + CommittableSummaryAssert<Object> objectCommittableSummaryAssert = + records.element(0, as(committableSummary())).hasCheckpointId(1L); + objectCommittableSummaryAssert.hasOverallCommittables(1); + records.element(1, as(committableWithLineage())) + .isEqualTo(committableWithLineage.withSubtaskId(0)); + + // Future emission calls should change the output + testHarness.notifyOfCompletedCheckpoint(2); + testHarness.endInput(); + + assertThat(testHarness.getOutput()).hasSize(2); + } + } + + private OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + createTestHarness( + SupportsCommitter<String> sink, + boolean isBatchMode, + boolean isCheckpointingEnabled) + throws Exception { + return new OneInputStreamOperatorTestHarness<>( + new CommitterOperatorFactory<>(sink, isBatchMode, isCheckpointingEnabled)); + } + + private OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + createTestHarness( + SupportsCommitter<String> sink, + boolean isBatchMode, + boolean isCheckpointingEnabled, + int maxParallelism, + int parallelism, + int subtaskId) + throws Exception { + return new OneInputStreamOperatorTestHarness<>( + new CommitterOperatorFactory<>(sink, isBatchMode, isCheckpointingEnabled), + maxParallelism, + parallelism, + subtaskId); + } + private static class ForwardingCommitter extends TestSinkV2.DefaultCommitter { private int successfulCommits = 0; @@ -69,4 +384,14 @@ class SinkV2CommitterOperatorTest extends CommitterOperatorTestBase { @Override public void close() throws Exception {} } + + static class SinkAndCounters { + SupportsCommitter<String> sink; + IntSupplier commitCounter; + + public SinkAndCounters(SupportsCommitter<String> sink, IntSupplier commitCounter) { + this.sink = sink; + this.commitCounter = commitCounter; + } + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java index ae0616e8b97..4cac3953d4f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java @@ -18,26 +18,119 @@ package org.apache.flink.streaming.runtime.operators.sink; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineageAssert; +import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; +import org.apache.flink.streaming.runtime.operators.sink.committables.SinkV1CommittableDeserializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableList; +import org.assertj.core.api.ListAssert; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import static org.apache.flink.api.connector.sink2.InitContext.INITIAL_CHECKPOINT_ID; +import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; +import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; +import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; +import static org.assertj.core.api.Assertions.as; +import static org.assertj.core.api.Assertions.assertThat; + +class SinkV2SinkWriterOperatorTest { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testLoadPreviousSinkState(boolean stateful) throws Exception { + // 1. Build previous sink state + final List<String> previousSinkInputs = + Arrays.asList( + "bit", "mention", "thick", "stick", "stir", "easy", "sleep", "forth", + "cost", "prompt"); + + InspectableSink sink = sinkWithState(stateful, CompatibleStateSinkOperator.SINK_STATE_NAME); + int expectedState = 5; + final OneInputStreamOperatorTestHarness<String, String> previousSink = + new OneInputStreamOperatorTestHarness<>( + new CompatibleStateSinkOperator<>( + TestSinkV2.WRITER_SERIALIZER, expectedState), + StringSerializer.INSTANCE); + + OperatorSubtaskState previousSinkState = + TestHarnessUtil.buildSubtaskState(previousSink, previousSinkInputs); + + // 2. Load previous sink state and verify state + Sink<Integer> sink3 = sink.getSink(); + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> + compatibleWriterOperator = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink3)); + + // load the state from previous sink + compatibleWriterOperator.initializeState(previousSinkState); + assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? expectedState : 0); + + // 3. do another snapshot and check if this also can be restored without compabitible state + // name + compatibleWriterOperator.prepareSnapshotPreBarrier(1L); + OperatorSubtaskState snapshot = compatibleWriterOperator.snapshot(1L, 1L); + + compatibleWriterOperator.close(); + + // 4. Restore the sink without previous sink's state + InspectableSink sink2 = + sinkWithState(stateful, CompatibleStateSinkOperator.SINK_STATE_NAME); + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> + restoredSinkOperator = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink2.getSink())); + + restoredSinkOperator.initializeState(snapshot); + assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? expectedState : 0); + + restoredSinkOperator.close(); + } -class SinkV2SinkWriterOperatorTest extends SinkWriterOperatorTestBase { - @Override InspectableSink sinkWithoutCommitter() { TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new TestSinkV2.DefaultSinkWriter<>(); return new InspectableSink(TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build()); } - @Override InspectableSink sinkWithCommitter() { TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new TestSinkV2.DefaultCommittingSinkWriter<>(); @@ -48,7 +141,6 @@ class SinkV2SinkWriterOperatorTest extends SinkWriterOperatorTestBase { .build()); } - @Override InspectableSink sinkWithTimeBasedWriter() { TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new TimeBasedBufferingSinkWriter(); return new InspectableSink( @@ -58,7 +150,6 @@ class SinkV2SinkWriterOperatorTest extends SinkWriterOperatorTestBase { .build()); } - @Override InspectableSink sinkWithState(boolean withState, String stateName) { TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new TestSinkV2.DefaultStatefulSinkWriter<>(); @@ -76,6 +167,308 @@ class SinkV2SinkWriterOperatorTest extends SinkWriterOperatorTestBase { return new InspectableSink(builder.build()); } + @Test + void testNotEmitCommittablesWithoutCommitter() throws Exception { + InspectableSink sink = sinkWithoutCommitter(); + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink.getSink())); + testHarness.open(); + testHarness.processElement(1, 1); + + assertThat(testHarness.extractOutputValues()).isEmpty(); + assertThat(sink.getRecordsOfCurrentCheckpoint()) + .containsOnly("(1,1," + Long.MIN_VALUE + ")"); + + testHarness.prepareSnapshotPreBarrier(1); + assertThat(testHarness.extractOutputValues()).isEmpty(); + // Elements are flushed + assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty(); + testHarness.close(); + } + + @Test + void testWatermarkPropagatedToSinkWriter() throws Exception { + final long initialTime = 0; + + InspectableSink sink = sinkWithoutCommitter(); + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink.getSink())); + testHarness.open(); + + testHarness.processWatermark(initialTime); + testHarness.processWatermark(initialTime + 1); + + assertThat(testHarness.getOutput()) + .containsExactly( + new org.apache.flink.streaming.api.watermark.Watermark(initialTime), + new org.apache.flink.streaming.api.watermark.Watermark(initialTime + 1)); + assertThat(sink.getWatermarks()) + .containsExactly(new Watermark(initialTime), new Watermark(initialTime + 1)); + testHarness.close(); + } + + @Test + void testTimeBasedBufferingSinkWriter() throws Exception { + final long initialTime = 0; + + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sinkWithTimeBasedWriter().getSink())); + + testHarness.open(); + + testHarness.setProcessingTime(0L); + + testHarness.processElement(1, initialTime + 1); + testHarness.processElement(2, initialTime + 2); + + testHarness.prepareSnapshotPreBarrier(1L); + + // Expect empty committableSummary + assertBasicOutput(testHarness.extractOutputValues(), 0, 1L); + + testHarness.getProcessingTimeService().setCurrentTime(2001); + + testHarness.prepareSnapshotPreBarrier(2L); + + assertBasicOutput( + testHarness.extractOutputValues().stream().skip(1).collect(Collectors.toList()), + 2, + 2L); + testHarness.close(); + } + + @Test + void testEmitOnFlushWithCommitter() throws Exception { + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink())); + + testHarness.open(); + assertThat(testHarness.extractOutputValues()).isEmpty(); + + testHarness.processElement(1, 1); + testHarness.processElement(2, 2); + + // flush + testHarness.prepareSnapshotPreBarrier(1); + + assertBasicOutput(testHarness.extractOutputValues(), 2, 1L); + testHarness.close(); + } + + @Test + void testEmitOnEndOfInputInBatchMode() throws Exception { + final SinkWriterOperatorFactory<Integer, Integer> writerOperatorFactory = + new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink()); + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(writerOperatorFactory); + + testHarness.open(); + assertThat(testHarness.extractOutputValues()).isEmpty(); + + testHarness.processElement(1, 1); + testHarness.endInput(); + assertBasicOutput(testHarness.extractOutputValues(), 1, EOI); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testStateRestore(boolean stateful) throws Exception { + + final long initialTime = 0; + + final InspectableSink sink = sinkWithState(stateful, null); + Sink<Integer> sink2 = sink.getSink(); + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(new SinkWriterOperatorFactory<>(sink2)); + + testHarness.open(); + + testHarness.processWatermark(initialTime); + testHarness.processElement(1, initialTime + 1); + testHarness.processElement(2, initialTime + 2); + + testHarness.prepareSnapshotPreBarrier(1L); + OperatorSubtaskState snapshot = testHarness.snapshot(1L, 1L); + + assertThat(sink.getRecordCountFromState()).isEqualTo(2); + assertThat(sink.getLastCheckpointId()).isEqualTo(stateful ? 1L : -1L); + + testHarness.close(); + + final InspectableSink restoredSink = sinkWithState(stateful, null); + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> + restoredTestHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(restoredSink.getSink())); + + restoredTestHarness.initializeState(snapshot); + restoredTestHarness.open(); + + // check that the previous state is correctly restored + assertThat(restoredSink.getRecordCountFromState()).isEqualTo(stateful ? 2 : 0); + + restoredTestHarness.close(); + } + + @Test + void testRestoreCommitterState() throws Exception { + final List<String> committables = Arrays.asList("state1", "state2"); + + InspectableSink sink = sinkWithCommitter(); + final OneInputStreamOperatorTestHarness<String, String> committer = + new OneInputStreamOperatorTestHarness<>( + new TestCommitterOperator(TestSinkV2.COMMITTABLE_SERIALIZER), + StringSerializer.INSTANCE); + + final OperatorSubtaskState committerState = + TestHarnessUtil.buildSubtaskState(committer, committables); + + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink.getSink())); + + testHarness.initializeState(committerState); + + testHarness.open(); + + testHarness.prepareSnapshotPreBarrier(2); + + final ListAssert<CommittableMessage<Integer>> records = + assertThat(testHarness.extractOutputValues()).hasSize(4); + + records.element(0, as(committableSummary())) + .hasCheckpointId(INITIAL_CHECKPOINT_ID) + .hasOverallCommittables(committables.size()); + records.<CommittableWithLineageAssert<String>>element(1, as(committableWithLineage())) + .hasCommittable(committables.get(0)) + .hasCheckpointId(INITIAL_CHECKPOINT_ID) + .hasSubtaskId(0); + records.<CommittableWithLineageAssert<String>>element(2, as(committableWithLineage())) + .hasCommittable(committables.get(1)) + .hasCheckpointId(INITIAL_CHECKPOINT_ID) + .hasSubtaskId(0); + records.element(3, as(committableSummary())).hasCheckpointId(2L).hasOverallCommittables(0); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception { + InspectableSink sink = sinkWithCommitter(); + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<String>> testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink.getSink())); + testHarness.open(); + testHarness.processElement(1, 1); + + assertThat(testHarness.extractOutputValues()).isEmpty(); + final String record = "(1,1," + Long.MIN_VALUE + ")"; + assertThat(sink.getRecordsOfCurrentCheckpoint()).containsOnly(record); + + testHarness.endInput(); + + if (isCheckpointingEnabled) { + testHarness.prepareSnapshotPreBarrier(1); + } + + List<String> committables = Collections.singletonList(record); + + ListAssert<CommittableMessage<String>> records = + assertThat(testHarness.extractOutputValues()).hasSize(committables.size() + 1); + records.element(0, as(committableSummary())).hasOverallCommittables(committables.size()); + + records.filteredOn(message -> message instanceof CommittableWithLineage) + .map(message -> ((CommittableWithLineage<String>) message).getCommittable()) + .containsExactlyInAnyOrderElementsOf(committables); + assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty(); + + testHarness.close(); + } + + @Test + void testInitContext() throws Exception { + final AtomicReference<WriterInitContext> initContext = new AtomicReference<>(); + final Sink<String> sink = + context -> { + initContext.set(context); + return null; + }; + + final int subtaskId = 1; + final int parallelism = 10; + final TypeSerializer<String> typeSerializer = StringSerializer.INSTANCE; + final JobID jobID = new JobID(); + + final MockEnvironment environment = + MockEnvironment.builder() + .setSubtaskIndex(subtaskId) + .setParallelism(parallelism) + .setMaxParallelism(parallelism) + .setJobID(jobID) + .setExecutionConfig(new ExecutionConfig().enableObjectReuse()) + .build(); + + final OneInputStreamOperatorTestHarness<String, CommittableMessage<String>> testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink), typeSerializer, environment); + testHarness.open(); + + assertThat(initContext.get().getUserCodeClassLoader()).isNotNull(); + assertThat(initContext.get().getMailboxExecutor()).isNotNull(); + assertThat(initContext.get().getProcessingTimeService()).isNotNull(); + assertThat(initContext.get().getTaskInfo().getIndexOfThisSubtask()).isEqualTo(subtaskId); + assertThat(initContext.get().getTaskInfo().getNumberOfParallelSubtasks()) + .isEqualTo(parallelism); + assertThat(initContext.get().getTaskInfo().getAttemptNumber()).isZero(); + assertThat(initContext.get().metricGroup()).isNotNull(); + assertThat(initContext.get().getRestoredCheckpointId()).isNotPresent(); + assertThat(initContext.get().isObjectReuseEnabled()).isTrue(); + assertThat(initContext.get().createInputSerializer()).isEqualTo(typeSerializer); + assertThat(initContext.get().getJobInfo().getJobId()).isEqualTo(jobID); + + testHarness.close(); + } + + private static void assertContextsEqual( + WriterInitContext initContext, WriterInitContext original) { + assertThat(initContext.getUserCodeClassLoader().asClassLoader()) + .isEqualTo(original.getUserCodeClassLoader().asClassLoader()); + assertThat(initContext.getMailboxExecutor()).isEqualTo(original.getMailboxExecutor()); + assertThat(initContext.getProcessingTimeService()) + .isEqualTo(original.getProcessingTimeService()); + assertThat(initContext.getTaskInfo().getIndexOfThisSubtask()) + .isEqualTo(original.getTaskInfo().getIndexOfThisSubtask()); + assertThat(initContext.getTaskInfo().getNumberOfParallelSubtasks()) + .isEqualTo(original.getTaskInfo().getNumberOfParallelSubtasks()); + assertThat(initContext.getTaskInfo().getAttemptNumber()) + .isEqualTo(original.getTaskInfo().getAttemptNumber()); + assertThat(initContext.metricGroup()).isEqualTo(original.metricGroup()); + assertThat(initContext.getRestoredCheckpointId()) + .isEqualTo(original.getRestoredCheckpointId()); + assertThat(initContext.isObjectReuseEnabled()).isEqualTo(original.isObjectReuseEnabled()); + assertThat(initContext.createInputSerializer()).isEqualTo(original.createInputSerializer()); + assertThat(initContext.getJobInfo().getJobId()).isEqualTo(original.getJobInfo().getJobId()); + assertThat(initContext.metadataConsumer()).isEqualTo(original.metadataConsumer()); + } + + private static void assertBasicOutput( + List<CommittableMessage<Integer>> output, int numberOfCommittables, long checkpointId) { + ListAssert<CommittableMessage<Integer>> records = + assertThat(output).hasSize(numberOfCommittables + 1); + CommittableSummaryAssert<Object> objectCommittableSummaryAssert = + records.element(0, as(committableSummary())) + .hasOverallCommittables(numberOfCommittables); + records.filteredOn(r -> r instanceof CommittableWithLineage) + .allSatisfy( + cl -> + SinkV2Assertions.assertThat((CommittableWithLineage<?>) cl) + .hasCheckpointId(checkpointId) + .hasSubtaskId(0)); + } + private static class TimeBasedBufferingSinkWriter extends TestSinkV2.DefaultCommittingSinkWriter<Integer> implements ProcessingTimeService.ProcessingTimeCallback { @@ -131,30 +524,124 @@ class SinkV2SinkWriterOperatorTest extends SinkWriterOperatorTestBase { } } - static class InspectableSink extends AbstractInspectableSink<TestSinkV2<Integer>> { + static class InspectableSink { + private final TestSinkV2<Integer> sink; + InspectableSink(TestSinkV2<Integer> sink) { - super(sink); + this.sink = sink; + } + + public TestSinkV2<Integer> getSink() { + return sink; } - @Override public long getLastCheckpointId() { return getSink().getWriter().lastCheckpointId; } - @Override public List<String> getRecordsOfCurrentCheckpoint() { return getSink().getWriter().elements; } - @Override public List<Watermark> getWatermarks() { return getSink().getWriter().watermarks; } - @Override public int getRecordCountFromState() { return ((TestSinkV2.DefaultStatefulSinkWriter<?>) getSink().getWriter()) .getRecordCount(); } } + + private static class TestCommitterOperator extends AbstractStreamOperator<String> + implements OneInputStreamOperator<String, String> { + + private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC = + new ListStateDescriptor<>( + "streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE); + private ListState<List<String>> committerState; + private final List<String> buffer = new ArrayList<>(); + private final SimpleVersionedSerializer<String> serializer; + + public TestCommitterOperator(SimpleVersionedSerializer<String> serializer) { + this.serializer = serializer; + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + committerState = + new SimpleVersionedListState<>( + context.getOperatorStateStore() + .getListState(STREAMING_COMMITTER_RAW_STATES_DESC), + new TestingCommittableSerializer(serializer)); + } + + @Override + public void processElement(StreamRecord<String> element) throws Exception { + buffer.add(element.getValue()); + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + committerState.add(buffer); + } + } + + /** Writes state to test whether the sink can read from alternative state names. */ + private static class CompatibleStateSinkOperator<T> extends AbstractStreamOperator<String> + implements OneInputStreamOperator<String, String> { + + static final String SINK_STATE_NAME = "compatible_sink_state"; + + static final ListStateDescriptor<byte[]> SINK_STATE_DESC = + new ListStateDescriptor<>(SINK_STATE_NAME, BytePrimitiveArraySerializer.INSTANCE); + ListState<T> sinkState; + private final SimpleVersionedSerializer<T> serializer; + private final T initialState; + + public CompatibleStateSinkOperator( + SimpleVersionedSerializer<T> serializer, T initialState) { + this.serializer = serializer; + this.initialState = initialState; + } + + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + sinkState = + new SimpleVersionedListState<>( + context.getOperatorStateStore().getListState(SINK_STATE_DESC), + serializer); + if (!context.isRestored()) { + sinkState.add(initialState); + } + } + + @Override + public void processElement(StreamRecord<String> element) { + // do nothing + } + } + + private static class TestingCommittableSerializer + extends SinkV1WriterCommittableSerializer<String> { + + private final SimpleVersionedSerializer<String> committableSerializer; + + public TestingCommittableSerializer( + SimpleVersionedSerializer<String> committableSerializer) { + super(committableSerializer); + this.committableSerializer = committableSerializer; + } + + @Override + public byte[] serialize(List<String> obj) throws IOException { + final DataOutputSerializer out = new DataOutputSerializer(256); + out.writeInt(SinkV1CommittableDeserializer.MAGIC_NUMBER); + SimpleVersionedSerialization.writeVersionAndSerializeList( + committableSerializer, obj, out); + return out.getCopyOfBuffer(); + } + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java deleted file mode 100644 index 6bde86216b9..00000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java +++ /dev/null @@ -1,558 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.sink; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; -import org.apache.flink.api.connector.sink2.Sink; -import org.apache.flink.api.connector.sink2.WriterInitContext; -import org.apache.flink.core.io.SimpleVersionedSerialization; -import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.core.memory.DataOutputSerializer; -import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; -import org.apache.flink.runtime.operators.testutils.MockEnvironment; -import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.runtime.state.StateSnapshotContext; -import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; -import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert; -import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; -import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineageAssert; -import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.operators.sink.committables.SinkV1CommittableDeserializer; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.TestHarnessUtil; - -import org.assertj.core.api.ListAssert; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; - -import static org.apache.flink.api.connector.sink2.InitContext.INITIAL_CHECKPOINT_ID; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; -import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; -import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; -import static org.assertj.core.api.Assertions.as; -import static org.assertj.core.api.Assertions.assertThat; - -abstract class SinkWriterOperatorTestBase { - - @Test - void testNotEmitCommittablesWithoutCommitter() throws Exception { - InspectableSink sink = sinkWithoutCommitter(); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink.getSink())); - testHarness.open(); - testHarness.processElement(1, 1); - - assertThat(testHarness.extractOutputValues()).isEmpty(); - assertThat(sink.getRecordsOfCurrentCheckpoint()) - .containsOnly("(1,1," + Long.MIN_VALUE + ")"); - - testHarness.prepareSnapshotPreBarrier(1); - assertThat(testHarness.extractOutputValues()).isEmpty(); - // Elements are flushed - assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty(); - testHarness.close(); - } - - @Test - void testWatermarkPropagatedToSinkWriter() throws Exception { - final long initialTime = 0; - - InspectableSink sink = sinkWithoutCommitter(); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink.getSink())); - testHarness.open(); - - testHarness.processWatermark(initialTime); - testHarness.processWatermark(initialTime + 1); - - assertThat(testHarness.getOutput()) - .containsExactly(new Watermark(initialTime), new Watermark(initialTime + 1)); - assertThat(sink.getWatermarks()) - .containsExactly( - new org.apache.flink.api.common.eventtime.Watermark(initialTime), - new org.apache.flink.api.common.eventtime.Watermark(initialTime + 1)); - testHarness.close(); - } - - @Test - void testTimeBasedBufferingSinkWriter() throws Exception { - final long initialTime = 0; - - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sinkWithTimeBasedWriter().getSink())); - - testHarness.open(); - - testHarness.setProcessingTime(0L); - - testHarness.processElement(1, initialTime + 1); - testHarness.processElement(2, initialTime + 2); - - testHarness.prepareSnapshotPreBarrier(1L); - - // Expect empty committableSummary - assertBasicOutput(testHarness.extractOutputValues(), 0, 1L); - - testHarness.getProcessingTimeService().setCurrentTime(2001); - - testHarness.prepareSnapshotPreBarrier(2L); - - assertBasicOutput( - testHarness.extractOutputValues().stream().skip(1).collect(Collectors.toList()), - 2, - 2L); - testHarness.close(); - } - - @Test - void testEmitOnFlushWithCommitter() throws Exception { - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink())); - - testHarness.open(); - assertThat(testHarness.extractOutputValues()).isEmpty(); - - testHarness.processElement(1, 1); - testHarness.processElement(2, 2); - - // flush - testHarness.prepareSnapshotPreBarrier(1); - - assertBasicOutput(testHarness.extractOutputValues(), 2, 1L); - testHarness.close(); - } - - @Test - void testEmitOnEndOfInputInBatchMode() throws Exception { - final SinkWriterOperatorFactory<Integer, Integer> writerOperatorFactory = - new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink()); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>(writerOperatorFactory); - - testHarness.open(); - assertThat(testHarness.extractOutputValues()).isEmpty(); - - testHarness.processElement(1, 1); - testHarness.endInput(); - assertBasicOutput(testHarness.extractOutputValues(), 1, EOI); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testStateRestore(boolean stateful) throws Exception { - - final long initialTime = 0; - - final InspectableSink sink = sinkWithState(stateful, null); - Sink<Integer> sink2 = sink.getSink(); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>(new SinkWriterOperatorFactory<>(sink2)); - - testHarness.open(); - - testHarness.processWatermark(initialTime); - testHarness.processElement(1, initialTime + 1); - testHarness.processElement(2, initialTime + 2); - - testHarness.prepareSnapshotPreBarrier(1L); - OperatorSubtaskState snapshot = testHarness.snapshot(1L, 1L); - - assertThat(sink.getRecordCountFromState()).isEqualTo(2); - assertThat(sink.getLastCheckpointId()).isEqualTo(stateful ? 1L : -1L); - - testHarness.close(); - - final InspectableSink restoredSink = sinkWithState(stateful, null); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> - restoredTestHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(restoredSink.getSink())); - - restoredTestHarness.initializeState(snapshot); - restoredTestHarness.open(); - - // check that the previous state is correctly restored - assertThat(restoredSink.getRecordCountFromState()).isEqualTo(stateful ? 2 : 0); - - restoredTestHarness.close(); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testLoadPreviousSinkState(boolean stateful) throws Exception { - // 1. Build previous sink state - final List<String> previousSinkInputs = - Arrays.asList( - "bit", "mention", "thick", "stick", "stir", "easy", "sleep", "forth", - "cost", "prompt"); - - InspectableSink sink = sinkWithState(stateful, CompatibleStateSinkOperator.SINK_STATE_NAME); - int expectedState = 5; - final OneInputStreamOperatorTestHarness<String, String> previousSink = - new OneInputStreamOperatorTestHarness<>( - new CompatibleStateSinkOperator<>( - TestSinkV2.WRITER_SERIALIZER, expectedState), - StringSerializer.INSTANCE); - - OperatorSubtaskState previousSinkState = - TestHarnessUtil.buildSubtaskState(previousSink, previousSinkInputs); - - // 2. Load previous sink state and verify state - Sink<Integer> sink3 = sink.getSink(); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> - compatibleWriterOperator = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink3)); - - // load the state from previous sink - compatibleWriterOperator.initializeState(previousSinkState); - assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? expectedState : 0); - - // 3. do another snapshot and check if this also can be restored without compabitible state - // name - compatibleWriterOperator.prepareSnapshotPreBarrier(1L); - OperatorSubtaskState snapshot = compatibleWriterOperator.snapshot(1L, 1L); - - compatibleWriterOperator.close(); - - // 4. Restore the sink without previous sink's state - InspectableSink sink2 = - sinkWithState(stateful, CompatibleStateSinkOperator.SINK_STATE_NAME); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> - restoredSinkOperator = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink2.getSink())); - - restoredSinkOperator.initializeState(snapshot); - assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? expectedState : 0); - - restoredSinkOperator.close(); - } - - @Test - void testRestoreCommitterState() throws Exception { - final List<String> committables = Arrays.asList("state1", "state2"); - - InspectableSink sink = sinkWithCommitter(); - final OneInputStreamOperatorTestHarness<String, String> committer = - new OneInputStreamOperatorTestHarness<>( - new TestCommitterOperator(TestSinkV2.COMMITTABLE_SERIALIZER), - StringSerializer.INSTANCE); - - final OperatorSubtaskState committerState = - TestHarnessUtil.buildSubtaskState(committer, committables); - - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink.getSink())); - - testHarness.initializeState(committerState); - - testHarness.open(); - - testHarness.prepareSnapshotPreBarrier(2); - - final ListAssert<CommittableMessage<Integer>> records = - assertThat(testHarness.extractOutputValues()).hasSize(4); - - records.element(0, as(committableSummary())) - .hasCheckpointId(INITIAL_CHECKPOINT_ID) - .hasOverallCommittables(committables.size()); - records.<CommittableWithLineageAssert<String>>element(1, as(committableWithLineage())) - .hasCommittable(committables.get(0)) - .hasCheckpointId(INITIAL_CHECKPOINT_ID) - .hasSubtaskId(0); - records.<CommittableWithLineageAssert<String>>element(2, as(committableWithLineage())) - .hasCommittable(committables.get(1)) - .hasCheckpointId(INITIAL_CHECKPOINT_ID) - .hasSubtaskId(0); - records.element(3, as(committableSummary())).hasCheckpointId(2L).hasOverallCommittables(0); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception { - InspectableSink sink = sinkWithCommitter(); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<String>> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink.getSink())); - testHarness.open(); - testHarness.processElement(1, 1); - - assertThat(testHarness.extractOutputValues()).isEmpty(); - final String record = "(1,1," + Long.MIN_VALUE + ")"; - assertThat(sink.getRecordsOfCurrentCheckpoint()).containsOnly(record); - - testHarness.endInput(); - - if (isCheckpointingEnabled) { - testHarness.prepareSnapshotPreBarrier(1); - } - - List<String> committables = Collections.singletonList(record); - - ListAssert<CommittableMessage<String>> records = - assertThat(testHarness.extractOutputValues()).hasSize(committables.size() + 1); - records.element(0, as(committableSummary())).hasOverallCommittables(committables.size()); - - records.filteredOn(message -> message instanceof CommittableWithLineage) - .map(message -> ((CommittableWithLineage<String>) message).getCommittable()) - .containsExactlyInAnyOrderElementsOf(committables); - assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty(); - - testHarness.close(); - } - - @Test - void testInitContext() throws Exception { - final AtomicReference<org.apache.flink.api.connector.sink2.WriterInitContext> initContext = - new AtomicReference<>(); - final org.apache.flink.api.connector.sink2.Sink<String> sink = - context -> { - initContext.set(context); - return null; - }; - - final int subtaskId = 1; - final int parallelism = 10; - final TypeSerializer<String> typeSerializer = StringSerializer.INSTANCE; - final JobID jobID = new JobID(); - - final MockEnvironment environment = - MockEnvironment.builder() - .setSubtaskIndex(subtaskId) - .setParallelism(parallelism) - .setMaxParallelism(parallelism) - .setJobID(jobID) - .setExecutionConfig(new ExecutionConfig().enableObjectReuse()) - .build(); - - final OneInputStreamOperatorTestHarness<String, CommittableMessage<String>> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink), typeSerializer, environment); - testHarness.open(); - - assertThat(initContext.get().getUserCodeClassLoader()).isNotNull(); - assertThat(initContext.get().getMailboxExecutor()).isNotNull(); - assertThat(initContext.get().getProcessingTimeService()).isNotNull(); - assertThat(initContext.get().getTaskInfo().getIndexOfThisSubtask()).isEqualTo(subtaskId); - assertThat(initContext.get().getTaskInfo().getNumberOfParallelSubtasks()) - .isEqualTo(parallelism); - assertThat(initContext.get().getTaskInfo().getAttemptNumber()).isZero(); - assertThat(initContext.get().metricGroup()).isNotNull(); - assertThat(initContext.get().getRestoredCheckpointId()).isNotPresent(); - assertThat(initContext.get().isObjectReuseEnabled()).isTrue(); - assertThat(initContext.get().createInputSerializer()).isEqualTo(typeSerializer); - assertThat(initContext.get().getJobInfo().getJobId()).isEqualTo(jobID); - - testHarness.close(); - } - - private static void assertContextsEqual( - WriterInitContext initContext, WriterInitContext original) { - assertThat(initContext.getUserCodeClassLoader().asClassLoader()) - .isEqualTo(original.getUserCodeClassLoader().asClassLoader()); - assertThat(initContext.getMailboxExecutor()).isEqualTo(original.getMailboxExecutor()); - assertThat(initContext.getProcessingTimeService()) - .isEqualTo(original.getProcessingTimeService()); - assertThat(initContext.getTaskInfo().getIndexOfThisSubtask()) - .isEqualTo(original.getTaskInfo().getIndexOfThisSubtask()); - assertThat(initContext.getTaskInfo().getNumberOfParallelSubtasks()) - .isEqualTo(original.getTaskInfo().getNumberOfParallelSubtasks()); - assertThat(initContext.getTaskInfo().getAttemptNumber()) - .isEqualTo(original.getTaskInfo().getAttemptNumber()); - assertThat(initContext.metricGroup()).isEqualTo(original.metricGroup()); - assertThat(initContext.getRestoredCheckpointId()) - .isEqualTo(original.getRestoredCheckpointId()); - assertThat(initContext.isObjectReuseEnabled()).isEqualTo(original.isObjectReuseEnabled()); - assertThat(initContext.createInputSerializer()).isEqualTo(original.createInputSerializer()); - assertThat(initContext.getJobInfo().getJobId()).isEqualTo(original.getJobInfo().getJobId()); - assertThat(initContext.metadataConsumer()).isEqualTo(original.metadataConsumer()); - } - - private static void assertBasicOutput( - List<CommittableMessage<Integer>> output, int numberOfCommittables, long checkpointId) { - ListAssert<CommittableMessage<Integer>> records = - assertThat(output).hasSize(numberOfCommittables + 1); - CommittableSummaryAssert<Object> objectCommittableSummaryAssert = - records.element(0, as(committableSummary())) - .hasOverallCommittables(numberOfCommittables); - records.filteredOn(r -> r instanceof CommittableWithLineage) - .allSatisfy( - cl -> - SinkV2Assertions.assertThat((CommittableWithLineage<?>) cl) - .hasCheckpointId(checkpointId) - .hasSubtaskId(0)); - } - - private static class TestCommitterOperator extends AbstractStreamOperator<String> - implements OneInputStreamOperator<String, String> { - - private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC = - new ListStateDescriptor<>( - "streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE); - private ListState<List<String>> committerState; - private final List<String> buffer = new ArrayList<>(); - private final SimpleVersionedSerializer<String> serializer; - - public TestCommitterOperator(SimpleVersionedSerializer<String> serializer) { - this.serializer = serializer; - } - - @Override - public void initializeState(StateInitializationContext context) throws Exception { - super.initializeState(context); - committerState = - new SimpleVersionedListState<>( - context.getOperatorStateStore() - .getListState(STREAMING_COMMITTER_RAW_STATES_DESC), - new TestingCommittableSerializer(serializer)); - } - - @Override - public void processElement(StreamRecord<String> element) throws Exception { - buffer.add(element.getValue()); - } - - @Override - public void snapshotState(StateSnapshotContext context) throws Exception { - super.snapshotState(context); - committerState.add(buffer); - } - } - - /** Writes state to test whether the sink can read from alternative state names. */ - private static class CompatibleStateSinkOperator<T> extends AbstractStreamOperator<String> - implements OneInputStreamOperator<String, String> { - - static final String SINK_STATE_NAME = "compatible_sink_state"; - - static final ListStateDescriptor<byte[]> SINK_STATE_DESC = - new ListStateDescriptor<>(SINK_STATE_NAME, BytePrimitiveArraySerializer.INSTANCE); - ListState<T> sinkState; - private final SimpleVersionedSerializer<T> serializer; - private final T initialState; - - public CompatibleStateSinkOperator( - SimpleVersionedSerializer<T> serializer, T initialState) { - this.serializer = serializer; - this.initialState = initialState; - } - - public void initializeState(StateInitializationContext context) throws Exception { - super.initializeState(context); - sinkState = - new SimpleVersionedListState<>( - context.getOperatorStateStore().getListState(SINK_STATE_DESC), - serializer); - if (!context.isRestored()) { - sinkState.add(initialState); - } - } - - @Override - public void processElement(StreamRecord<String> element) { - // do nothing - } - } - - private static class TestingCommittableSerializer - extends SinkV1WriterCommittableSerializer<String> { - - private final SimpleVersionedSerializer<String> committableSerializer; - - public TestingCommittableSerializer( - SimpleVersionedSerializer<String> committableSerializer) { - super(committableSerializer); - this.committableSerializer = committableSerializer; - } - - @Override - public byte[] serialize(List<String> obj) throws IOException { - final DataOutputSerializer out = new DataOutputSerializer(256); - out.writeInt(SinkV1CommittableDeserializer.MAGIC_NUMBER); - SimpleVersionedSerialization.writeVersionAndSerializeList( - committableSerializer, obj, out); - return out.getCopyOfBuffer(); - } - } - - abstract InspectableSink sinkWithoutCommitter(); - - abstract InspectableSink sinkWithTimeBasedWriter(); - - abstract InspectableSink sinkWithState(boolean withState, String stateName); - - abstract InspectableSink sinkWithCommitter(); - - /** - * Basic abstraction to access the different flavors of sinks. Remove once the older interfaces - * are removed. - */ - interface InspectableSink { - long getLastCheckpointId(); - - List<String> getRecordsOfCurrentCheckpoint(); - - List<org.apache.flink.api.common.eventtime.Watermark> getWatermarks(); - - int getRecordCountFromState(); - - Sink<Integer> getSink(); - } - - abstract static class AbstractInspectableSink< - S extends org.apache.flink.api.connector.sink2.Sink<Integer>> - implements InspectableSink { - private final S sink; - - protected AbstractInspectableSink(S sink) { - this.sink = sink; - } - - @Override - public S getSink() { - return sink; - } - } -} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java deleted file mode 100644 index bb7795e38d4..00000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.sink; - -import org.apache.flink.api.connector.sink2.SupportsCommitter; - -import java.util.Collection; - -class WithAdapterCommitterOperatorTest extends CommitterOperatorTestBase { - - @Override - SinkAndCounters sinkWithPostCommit() { - ForwardingCommitter committer = new ForwardingCommitter(); - return new SinkAndCounters( - (SupportsCommitter<String>) - TestSinkV2.<String>newBuilder() - .setCommitter(committer) - .setWithPostCommitTopology(true) - .build(), - () -> committer.successfulCommits); - } - - @Override - SinkAndCounters sinkWithPostCommitWithRetry() { - return new SinkAndCounters( - (SupportsCommitter<String>) - TestSinkV2.<String>newBuilder() - .setCommitter(new TestSinkV2.RetryOnceCommitter()) - .setWithPostCommitTopology(true) - .build(), - () -> 0); - } - - @Override - SinkAndCounters sinkWithoutPostCommit() { - ForwardingCommitter committer = new ForwardingCommitter(); - return new SinkAndCounters( - (SupportsCommitter<String>) - TestSinkV2.<String>newBuilder().setCommitter(committer).build(), - () -> committer.successfulCommits); - } - - private static class ForwardingCommitter extends TestSinkV2.DefaultCommitter { - private int successfulCommits = 0; - - @Override - public void commit(Collection<CommitRequest<String>> committables) { - successfulCommits += committables.size(); - } - - @Override - public void close() throws Exception {} - } -} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java deleted file mode 100644 index b0bf67fe4d2..00000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.sink; - -import org.apache.flink.api.common.eventtime.Watermark; -import org.apache.flink.api.common.operators.ProcessingTimeService; -import org.apache.flink.api.connector.sink2.WriterInitContext; -import org.apache.flink.api.java.tuple.Tuple3; - -import java.util.ArrayList; -import java.util.List; - -class WithAdapterSinkWriterOperatorTest extends SinkWriterOperatorTestBase { - @Override - InspectableSink sinkWithoutCommitter() { - TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new TestSinkV2.DefaultSinkWriter<>(); - return new InspectableSink(TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build()); - } - - @Override - InspectableSink sinkWithCommitter() { - TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = - new TestSinkV2.DefaultCommittingSinkWriter<>(); - return new InspectableSink( - TestSinkV2.<Integer>newBuilder() - .setWriter(sinkWriter) - .setDefaultCommitter() - .build()); - } - - @Override - InspectableSink sinkWithTimeBasedWriter() { - TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new TimeBasedBufferingSinkWriter(); - return new InspectableSink( - TestSinkV2.<Integer>newBuilder() - .setWriter(sinkWriter) - .setDefaultCommitter() - .build()); - } - - @Override - InspectableSink sinkWithState(boolean withState, String stateName) { - TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = - new TestSinkV2.DefaultStatefulSinkWriter<>(); - TestSinkV2.Builder<Integer> builder = - TestSinkV2.<Integer>newBuilder() - .setWriter(sinkWriter) - .setDefaultCommitter() - .setWithPostCommitTopology(true); - builder.setWriterState(withState); - if (stateName != null) { - builder.setCompatibleStateNames(stateName); - } - return new InspectableSink(builder.build()); - } - - private static class TimeBasedBufferingSinkWriter - extends TestSinkV2.DefaultStatefulSinkWriter<Integer> - implements ProcessingTimeService.ProcessingTimeCallback { - - private final List<String> cachedCommittables = new ArrayList<>(); - private ProcessingTimeService processingTimeService; - - @Override - public void write(Integer element, Context context) { - cachedCommittables.add( - Tuple3.of(element, context.timestamp(), context.currentWatermark()).toString()); - } - - @Override - public void onProcessingTime(long time) { - elements.addAll(cachedCommittables); - cachedCommittables.clear(); - this.processingTimeService.registerTimer(time + 1000, this); - } - - @Override - public void init(WriterInitContext context) { - this.processingTimeService = context.getProcessingTimeService(); - this.processingTimeService.registerTimer(1000, this); - } - } - - static class InspectableSink - extends AbstractInspectableSink<org.apache.flink.api.connector.sink2.Sink<Integer>> { - private final TestSinkV2<Integer> sink; - - InspectableSink(TestSinkV2<Integer> sink) { - super(sink); - this.sink = sink; - } - - @Override - public long getLastCheckpointId() { - return sink.getWriter().lastCheckpointId; - } - - @Override - public List<String> getRecordsOfCurrentCheckpoint() { - return sink.getWriter().elements; - } - - @Override - public List<Watermark> getWatermarks() { - return sink.getWriter().watermarks; - } - - @Override - public int getRecordCountFromState() { - TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = sink.getWriter(); - if (sinkWriter instanceof TestSinkV2.DefaultStatefulSinkWriter) { - return ((TestSinkV2.DefaultStatefulSinkWriter<Integer>) sinkWriter) - .getRecordCount(); - } else { - return 0; - } - } - } -}
