This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink.git
commit e34696eff15ce07739183920fa0eb23e29b65000 Author: Arvid Heise <[email protected]> AuthorDate: Sun Apr 13 09:38:22 2025 +0200 [FLINK-37605][runtime] Remove obsolete sink tests With the removal of SinkV1, all adapter tests have also been testing V2. We can remove the adapter tests and simplify test hierarchy. (cherry picked from commit fe04a6e9d0f8be33cb9801b4e021c0cd7d0a5b29) --- .../operators/sink/CommitterOperatorTestBase.java | 359 ------------- .../sink/SinkV2CommitterOperatorTest.java | 335 ++++++++++++- .../sink/SinkV2SinkWriterOperatorTest.java | 509 ++++++++++++++++++- .../operators/sink/SinkWriterOperatorTestBase.java | 558 --------------------- .../sink/WithAdapterCommitterOperatorTest.java | 70 --- .../sink/WithAdapterSinkWriterOperatorTest.java | 135 ----- 6 files changed, 828 insertions(+), 1138 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java deleted file mode 100644 index 5fdb36a3953..00000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java +++ /dev/null @@ -1,359 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.sink; - -import org.apache.flink.api.connector.sink2.SupportsCommitter; -import org.apache.flink.configuration.SinkOptions; -import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; -import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; -import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; -import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert; -import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; - -import org.assertj.core.api.AbstractThrowableAssert; -import org.assertj.core.api.ListAssert; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; - -import java.util.function.IntSupplier; - -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; -import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; -import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; -import static org.assertj.core.api.Assertions.as; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatCode; - -abstract class CommitterOperatorTestBase { - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testEmitCommittables(boolean withPostCommitTopology) throws Exception { - SinkAndCounters sinkAndCounters; - if (withPostCommitTopology) { - // Insert global committer to simulate post commit topology - sinkAndCounters = sinkWithPostCommit(); - } else { - sinkAndCounters = sinkWithoutPostCommit(); - } - final OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - testHarness = - new OneInputStreamOperatorTestHarness<>( - new CommitterOperatorFactory<>(sinkAndCounters.sink, false, true)); - testHarness.open(); - - final CommittableSummary<String> committableSummary = - new CommittableSummary<>(1, 1, 1L, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableWithLineage<String> committableWithLineage = - new CommittableWithLineage<>("1", 1L, 1); - testHarness.processElement(new StreamRecord<>(committableWithLineage)); - - // Trigger commit - testHarness.notifyOfCompletedCheckpoint(1); - - assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1); - if (withPostCommitTopology) { - ListAssert<CommittableMessage<String>> records = - assertThat(testHarness.extractOutputValues()).hasSize(2); - records.element(0, as(committableSummary())) - .hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()) - .hasOverallCommittables(committableSummary.getNumberOfCommittables()); - records.element(1, as(committableWithLineage())) - .isEqualTo(committableWithLineage.withSubtaskId(0)); - } else { - assertThat(testHarness.getOutput()).isEmpty(); - } - testHarness.close(); - } - - @Test - void ensureAllCommittablesArrivedBeforeCommitting() throws Exception { - SinkAndCounters sinkAndCounters = sinkWithPostCommit(); - final OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - testHarness = createTestHarness(sinkAndCounters.sink, false, true); - testHarness.open(); - testHarness.setProcessingTime(0); - - // Only send first committable - final CommittableSummary<String> committableSummary = - new CommittableSummary<>(1, 1, 1L, 2, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableWithLineage<String> first = new CommittableWithLineage<>("1", 1L, 1); - testHarness.processElement(new StreamRecord<>(first)); - - assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1)) - .hasMessageContaining("Trying to commit incomplete batch of committables"); - - assertThat(testHarness.getOutput()).isEmpty(); - assertThat(sinkAndCounters.commitCounter.getAsInt()).isZero(); - - final CommittableWithLineage<String> second = new CommittableWithLineage<>("2", 1L, 1); - testHarness.processElement(new StreamRecord<>(second)); - - assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1)).doesNotThrowAnyException(); - - assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2); - ListAssert<CommittableMessage<String>> records = - assertThat(testHarness.extractOutputValues()).hasSize(3); - records.element(0, as(committableSummary())) - .hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()) - .hasOverallCommittables(committableSummary.getNumberOfCommittables()); - records.element(1, as(committableWithLineage())).isEqualTo(first.withSubtaskId(0)); - records.element(2, as(committableWithLineage())).isEqualTo(second.withSubtaskId(0)); - testHarness.close(); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws Exception { - SinkAndCounters sinkAndCounters = sinkWithPostCommit(); - final OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - testHarness = createTestHarness(sinkAndCounters.sink, isBatchMode, !isBatchMode); - testHarness.open(); - - final CommittableSummary<String> committableSummary = - new CommittableSummary<>(1, 2, EOI, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableSummary<String> committableSummary2 = - new CommittableSummary<>(2, 2, EOI, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary2)); - - final CommittableWithLineage<String> first = new CommittableWithLineage<>("1", EOI, 1); - testHarness.processElement(new StreamRecord<>(first)); - final CommittableWithLineage<String> second = new CommittableWithLineage<>("1", EOI, 2); - testHarness.processElement(new StreamRecord<>(second)); - - testHarness.endInput(); - if (!isBatchMode) { - assertThat(testHarness.getOutput()).isEmpty(); - // notify final checkpoint complete - testHarness.notifyOfCompletedCheckpoint(1); - } - - ListAssert<CommittableMessage<String>> records = - assertThat(testHarness.extractOutputValues()).hasSize(3); - records.element(0, as(committableSummary())) - .hasFailedCommittables(0) - .hasOverallCommittables(2); - records.element(1, as(committableWithLineage())).isEqualTo(first.withSubtaskId(0)); - records.element(2, as(committableWithLineage())).isEqualTo(second.withSubtaskId(0)); - testHarness.close(); - } - - @Test - void testStateRestore() throws Exception { - - final int originalSubtaskId = 0; - final int subtaskIdAfterRecovery = 9; - - final OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - testHarness = - createTestHarness( - sinkWithPostCommitWithRetry().sink, - false, - true, - 1, - 1, - originalSubtaskId); - testHarness.open(); - - // We cannot test a different checkpoint thant 0 because when using the OperatorTestHarness - // for recovery the lastCompleted checkpoint is always reset to 0. - long checkpointId = 0L; - - final CommittableSummary<String> committableSummary = - new CommittableSummary<>(originalSubtaskId, 1, checkpointId, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableWithLineage<String> first = - new CommittableWithLineage<>("1", checkpointId, originalSubtaskId); - testHarness.processElement(new StreamRecord<>(first)); - - // another committable for the same checkpointId but from different subtask. - final CommittableSummary<String> committableSummary2 = - new CommittableSummary<>(originalSubtaskId + 1, 1, checkpointId, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary2)); - final CommittableWithLineage<String> second = - new CommittableWithLineage<>("2", checkpointId, originalSubtaskId + 1); - testHarness.processElement(new StreamRecord<>(second)); - - final OperatorSubtaskState snapshot = testHarness.snapshot(checkpointId, 2L); - assertThat(testHarness.getOutput()).isEmpty(); - testHarness.close(); - - // create new testHarness but with different parallelism level and subtaskId that original - // one. - // we will make sure that new subtaskId was used during committable recovery. - SinkAndCounters sinkAndCounters = sinkWithPostCommit(); - final OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - restored = - createTestHarness( - sinkAndCounters.sink, false, true, 10, 10, subtaskIdAfterRecovery); - - restored.initializeState(snapshot); - restored.open(); - - // Previous committables are immediately committed if possible - assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2); - ListAssert<CommittableMessage<String>> records = - assertThat(restored.extractOutputValues()).hasSize(3); - CommittableSummaryAssert<Object> objectCommittableSummaryAssert = - records.element(0, as(committableSummary())) - .hasCheckpointId(checkpointId) - .hasFailedCommittables(0) - .hasSubtaskId(subtaskIdAfterRecovery); - objectCommittableSummaryAssert.hasOverallCommittables(2); - - // Expect the same checkpointId that the original snapshot was made with. - records.element(1, as(committableWithLineage())) - .hasCheckpointId(checkpointId) - .hasSubtaskId(subtaskIdAfterRecovery) - .hasCommittable(first.getCommittable()); - records.element(2, as(committableWithLineage())) - .hasCheckpointId(checkpointId) - .hasSubtaskId(subtaskIdAfterRecovery) - .hasCommittable(second.getCommittable()); - restored.close(); - } - - @ParameterizedTest - @ValueSource(ints = {0, 1}) - void testNumberOfRetries(int numRetries) throws Exception { - try (OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - testHarness = - createTestHarness( - sinkWithPostCommitWithRetry().sink, false, true, 1, 1, 0)) { - testHarness - .getStreamConfig() - .getConfiguration() - .set(SinkOptions.COMMITTER_RETRIES, numRetries); - testHarness.open(); - - long ckdId = 1L; - testHarness.processElement( - new StreamRecord<>(new CommittableSummary<>(0, 1, ckdId, 1, 0))); - testHarness.processElement( - new StreamRecord<>(new CommittableWithLineage<>("1", ckdId, 0))); - AbstractThrowableAssert<?, ? extends Throwable> throwableAssert = - assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(ckdId)); - if (numRetries == 0) { - throwableAssert.hasMessageContaining("Failed to commit 1 committables"); - } else { - throwableAssert.doesNotThrowAnyException(); - } - } - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception { - final SinkAndCounters sinkAndCounters = sinkWithPostCommit(); - - try (OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - testHarness = - new OneInputStreamOperatorTestHarness<>( - new CommitterOperatorFactory<>( - sinkAndCounters.sink, false, isCheckpointingEnabled))) { - testHarness.open(); - - final CommittableSummary<String> committableSummary = - new CommittableSummary<>(1, 1, 1L, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableWithLineage<String> committableWithLineage = - new CommittableWithLineage<>("1", 1L, 1); - testHarness.processElement(new StreamRecord<>(committableWithLineage)); - - testHarness.endInput(); - - // If checkpointing enabled endInput does not emit anything because a final checkpoint - // follows - if (isCheckpointingEnabled) { - testHarness.notifyOfCompletedCheckpoint(1); - } - - ListAssert<CommittableMessage<String>> records = - assertThat(testHarness.extractOutputValues()).hasSize(2); - CommittableSummaryAssert<Object> objectCommittableSummaryAssert = - records.element(0, as(committableSummary())).hasCheckpointId(1L); - objectCommittableSummaryAssert.hasOverallCommittables(1); - records.element(1, as(committableWithLineage())) - .isEqualTo(committableWithLineage.withSubtaskId(0)); - - // Future emission calls should change the output - testHarness.notifyOfCompletedCheckpoint(2); - testHarness.endInput(); - - assertThat(testHarness.getOutput()).hasSize(2); - } - } - - private OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - createTestHarness( - SupportsCommitter<String> sink, - boolean isBatchMode, - boolean isCheckpointingEnabled) - throws Exception { - return new OneInputStreamOperatorTestHarness<>( - new CommitterOperatorFactory<>(sink, isBatchMode, isCheckpointingEnabled)); - } - - private OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - createTestHarness( - SupportsCommitter<String> sink, - boolean isBatchMode, - boolean isCheckpointingEnabled, - int maxParallelism, - int parallelism, - int subtaskId) - throws Exception { - return new OneInputStreamOperatorTestHarness<>( - new CommitterOperatorFactory<>(sink, isBatchMode, isCheckpointingEnabled), - maxParallelism, - parallelism, - subtaskId); - } - - abstract SinkAndCounters sinkWithPostCommit(); - - abstract SinkAndCounters sinkWithPostCommitWithRetry(); - - abstract SinkAndCounters sinkWithoutPostCommit(); - - static class SinkAndCounters { - SupportsCommitter<String> sink; - IntSupplier commitCounter; - - public SinkAndCounters(SupportsCommitter<String> sink, IntSupplier commitCounter) { - this.sink = sink; - this.commitCounter = commitCounter; - } - } -} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java index 0112b5cf862..4c7291dd44c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java @@ -19,11 +19,32 @@ package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.api.connector.sink2.SupportsCommitter; +import org.apache.flink.configuration.SinkOptions; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import org.assertj.core.api.AbstractThrowableAssert; +import org.assertj.core.api.ListAssert; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.Collection; +import java.util.function.IntSupplier; + +import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; +import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; +import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; +import static org.assertj.core.api.Assertions.as; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; -class SinkV2CommitterOperatorTest extends CommitterOperatorTestBase { - @Override +class SinkV2CommitterOperatorTest { SinkAndCounters sinkWithPostCommit() { ForwardingCommitter committer = new ForwardingCommitter(); return new SinkAndCounters( @@ -35,9 +56,8 @@ class SinkV2CommitterOperatorTest extends CommitterOperatorTestBase { () -> committer.successfulCommits); } - @Override SinkAndCounters sinkWithPostCommitWithRetry() { - return new CommitterOperatorTestBase.SinkAndCounters( + return new SinkAndCounters( (SupportsCommitter<String>) TestSinkV2.newBuilder() .setCommitter(new TestSinkV2.RetryOnceCommitter()) @@ -46,7 +66,6 @@ class SinkV2CommitterOperatorTest extends CommitterOperatorTestBase { () -> 0); } - @Override SinkAndCounters sinkWithoutPostCommit() { ForwardingCommitter committer = new ForwardingCommitter(); return new SinkAndCounters( @@ -58,6 +77,302 @@ class SinkV2CommitterOperatorTest extends CommitterOperatorTestBase { () -> committer.successfulCommits); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testEmitCommittables(boolean withPostCommitTopology) throws Exception { + SinkAndCounters sinkAndCounters; + if (withPostCommitTopology) { + // Insert global committer to simulate post commit topology + sinkAndCounters = sinkWithPostCommit(); + } else { + sinkAndCounters = sinkWithoutPostCommit(); + } + final OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + testHarness = + new OneInputStreamOperatorTestHarness<>( + new CommitterOperatorFactory<>(sinkAndCounters.sink, false, true)); + testHarness.open(); + + final CommittableSummary<String> committableSummary = + new CommittableSummary<>(1, 1, 1L, 1, 0); + testHarness.processElement(new StreamRecord<>(committableSummary)); + final CommittableWithLineage<String> committableWithLineage = + new CommittableWithLineage<>("1", 1L, 1); + testHarness.processElement(new StreamRecord<>(committableWithLineage)); + + // Trigger commit + testHarness.notifyOfCompletedCheckpoint(1); + + assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1); + if (withPostCommitTopology) { + ListAssert<CommittableMessage<String>> records = + assertThat(testHarness.extractOutputValues()).hasSize(2); + records.element(0, as(committableSummary())) + .hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()) + .hasOverallCommittables(committableSummary.getNumberOfCommittables()); + records.element(1, as(committableWithLineage())) + .isEqualTo(committableWithLineage.withSubtaskId(0)); + } else { + assertThat(testHarness.getOutput()).isEmpty(); + } + testHarness.close(); + } + + @Test + void ensureAllCommittablesArrivedBeforeCommitting() throws Exception { + SinkAndCounters sinkAndCounters = sinkWithPostCommit(); + final OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + testHarness = createTestHarness(sinkAndCounters.sink, false, true); + testHarness.open(); + testHarness.setProcessingTime(0); + + // Only send first committable + final CommittableSummary<String> committableSummary = + new CommittableSummary<>(1, 1, 1L, 2, 0); + testHarness.processElement(new StreamRecord<>(committableSummary)); + final CommittableWithLineage<String> first = new CommittableWithLineage<>("1", 1L, 1); + testHarness.processElement(new StreamRecord<>(first)); + + assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1)) + .hasMessageContaining("Trying to commit incomplete batch of committables"); + + assertThat(testHarness.getOutput()).isEmpty(); + assertThat(sinkAndCounters.commitCounter.getAsInt()).isZero(); + + final CommittableWithLineage<String> second = new CommittableWithLineage<>("2", 1L, 1); + testHarness.processElement(new StreamRecord<>(second)); + + assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1)).doesNotThrowAnyException(); + + assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2); + ListAssert<CommittableMessage<String>> records = + assertThat(testHarness.extractOutputValues()).hasSize(3); + records.element(0, as(committableSummary())) + .hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()) + .hasOverallCommittables(committableSummary.getNumberOfCommittables()); + records.element(1, as(committableWithLineage())).isEqualTo(first.withSubtaskId(0)); + records.element(2, as(committableWithLineage())).isEqualTo(second.withSubtaskId(0)); + testHarness.close(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws Exception { + SinkAndCounters sinkAndCounters = sinkWithPostCommit(); + final OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + testHarness = createTestHarness(sinkAndCounters.sink, isBatchMode, !isBatchMode); + testHarness.open(); + + final CommittableSummary<String> committableSummary = + new CommittableSummary<>(1, 2, EOI, 1, 0); + testHarness.processElement(new StreamRecord<>(committableSummary)); + final CommittableSummary<String> committableSummary2 = + new CommittableSummary<>(2, 2, EOI, 1, 0); + testHarness.processElement(new StreamRecord<>(committableSummary2)); + + final CommittableWithLineage<String> first = new CommittableWithLineage<>("1", EOI, 1); + testHarness.processElement(new StreamRecord<>(first)); + final CommittableWithLineage<String> second = new CommittableWithLineage<>("1", EOI, 2); + testHarness.processElement(new StreamRecord<>(second)); + + testHarness.endInput(); + if (!isBatchMode) { + assertThat(testHarness.getOutput()).isEmpty(); + // notify final checkpoint complete + testHarness.notifyOfCompletedCheckpoint(1); + } + + ListAssert<CommittableMessage<String>> records = + assertThat(testHarness.extractOutputValues()).hasSize(3); + records.element(0, as(committableSummary())) + .hasFailedCommittables(0) + .hasOverallCommittables(2); + records.element(1, as(committableWithLineage())).isEqualTo(first.withSubtaskId(0)); + records.element(2, as(committableWithLineage())).isEqualTo(second.withSubtaskId(0)); + testHarness.close(); + } + + @Test + void testStateRestore() throws Exception { + + final int originalSubtaskId = 0; + final int subtaskIdAfterRecovery = 9; + + final OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + testHarness = + createTestHarness( + sinkWithPostCommitWithRetry().sink, + false, + true, + 1, + 1, + originalSubtaskId); + testHarness.open(); + + // We cannot test a different checkpoint thant 0 because when using the OperatorTestHarness + // for recovery the lastCompleted checkpoint is always reset to 0. + long checkpointId = 0L; + + final CommittableSummary<String> committableSummary = + new CommittableSummary<>(originalSubtaskId, 1, checkpointId, 1, 0); + testHarness.processElement(new StreamRecord<>(committableSummary)); + final CommittableWithLineage<String> first = + new CommittableWithLineage<>("1", checkpointId, originalSubtaskId); + testHarness.processElement(new StreamRecord<>(first)); + + // another committable for the same checkpointId but from different subtask. + final CommittableSummary<String> committableSummary2 = + new CommittableSummary<>(originalSubtaskId + 1, 1, checkpointId, 1, 0); + testHarness.processElement(new StreamRecord<>(committableSummary2)); + final CommittableWithLineage<String> second = + new CommittableWithLineage<>("2", checkpointId, originalSubtaskId + 1); + testHarness.processElement(new StreamRecord<>(second)); + + final OperatorSubtaskState snapshot = testHarness.snapshot(checkpointId, 2L); + assertThat(testHarness.getOutput()).isEmpty(); + testHarness.close(); + + // create new testHarness but with different parallelism level and subtaskId that original + // one. + // we will make sure that new subtaskId was used during committable recovery. + SinkAndCounters sinkAndCounters = sinkWithPostCommit(); + final OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + restored = + createTestHarness( + sinkAndCounters.sink, false, true, 10, 10, subtaskIdAfterRecovery); + + restored.initializeState(snapshot); + restored.open(); + + // Previous committables are immediately committed if possible + assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2); + ListAssert<CommittableMessage<String>> records = + assertThat(restored.extractOutputValues()).hasSize(3); + CommittableSummaryAssert<Object> objectCommittableSummaryAssert = + records.element(0, as(committableSummary())) + .hasCheckpointId(checkpointId) + .hasFailedCommittables(0) + .hasSubtaskId(subtaskIdAfterRecovery); + objectCommittableSummaryAssert.hasOverallCommittables(2); + + // Expect the same checkpointId that the original snapshot was made with. + records.element(1, as(committableWithLineage())) + .hasCheckpointId(checkpointId) + .hasSubtaskId(subtaskIdAfterRecovery) + .hasCommittable(first.getCommittable()); + records.element(2, as(committableWithLineage())) + .hasCheckpointId(checkpointId) + .hasSubtaskId(subtaskIdAfterRecovery) + .hasCommittable(second.getCommittable()); + restored.close(); + } + + @ParameterizedTest + @ValueSource(ints = {0, 1}) + void testNumberOfRetries(int numRetries) throws Exception { + try (OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + testHarness = + createTestHarness( + sinkWithPostCommitWithRetry().sink, false, true, 1, 1, 0)) { + testHarness + .getStreamConfig() + .getConfiguration() + .set(SinkOptions.COMMITTER_RETRIES, numRetries); + testHarness.open(); + + long ckdId = 1L; + testHarness.processElement( + new StreamRecord<>(new CommittableSummary<>(0, 1, ckdId, 1, 0))); + testHarness.processElement( + new StreamRecord<>(new CommittableWithLineage<>("1", ckdId, 0))); + AbstractThrowableAssert<?, ? extends Throwable> throwableAssert = + assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(ckdId)); + if (numRetries == 0) { + throwableAssert.hasMessageContaining("Failed to commit 1 committables"); + } else { + throwableAssert.doesNotThrowAnyException(); + } + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception { + final SinkAndCounters sinkAndCounters = sinkWithPostCommit(); + + try (OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + testHarness = + new OneInputStreamOperatorTestHarness<>( + new CommitterOperatorFactory<>( + sinkAndCounters.sink, false, isCheckpointingEnabled))) { + testHarness.open(); + + final CommittableSummary<String> committableSummary = + new CommittableSummary<>(1, 1, 1L, 1, 0); + testHarness.processElement(new StreamRecord<>(committableSummary)); + final CommittableWithLineage<String> committableWithLineage = + new CommittableWithLineage<>("1", 1L, 1); + testHarness.processElement(new StreamRecord<>(committableWithLineage)); + + testHarness.endInput(); + + // If checkpointing enabled endInput does not emit anything because a final checkpoint + // follows + if (isCheckpointingEnabled) { + testHarness.notifyOfCompletedCheckpoint(1); + } + + ListAssert<CommittableMessage<String>> records = + assertThat(testHarness.extractOutputValues()).hasSize(2); + CommittableSummaryAssert<Object> objectCommittableSummaryAssert = + records.element(0, as(committableSummary())).hasCheckpointId(1L); + objectCommittableSummaryAssert.hasOverallCommittables(1); + records.element(1, as(committableWithLineage())) + .isEqualTo(committableWithLineage.withSubtaskId(0)); + + // Future emission calls should change the output + testHarness.notifyOfCompletedCheckpoint(2); + testHarness.endInput(); + + assertThat(testHarness.getOutput()).hasSize(2); + } + } + + private OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + createTestHarness( + SupportsCommitter<String> sink, + boolean isBatchMode, + boolean isCheckpointingEnabled) + throws Exception { + return new OneInputStreamOperatorTestHarness<>( + new CommitterOperatorFactory<>(sink, isBatchMode, isCheckpointingEnabled)); + } + + private OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + createTestHarness( + SupportsCommitter<String> sink, + boolean isBatchMode, + boolean isCheckpointingEnabled, + int maxParallelism, + int parallelism, + int subtaskId) + throws Exception { + return new OneInputStreamOperatorTestHarness<>( + new CommitterOperatorFactory<>(sink, isBatchMode, isCheckpointingEnabled), + maxParallelism, + parallelism, + subtaskId); + } + private static class ForwardingCommitter extends TestSinkV2.DefaultCommitter { private int successfulCommits = 0; @@ -69,4 +384,14 @@ class SinkV2CommitterOperatorTest extends CommitterOperatorTestBase { @Override public void close() throws Exception {} } + + static class SinkAndCounters { + SupportsCommitter<String> sink; + IntSupplier commitCounter; + + public SinkAndCounters(SupportsCommitter<String> sink, IntSupplier commitCounter) { + this.sink = sink; + this.commitCounter = commitCounter; + } + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java index 46974679e83..6f2c7f48de5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java @@ -18,26 +18,119 @@ package org.apache.flink.streaming.runtime.operators.sink; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineageAssert; +import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; +import org.apache.flink.streaming.runtime.operators.sink.committables.SinkV1CommittableDeserializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableList; +import org.assertj.core.api.ListAssert; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import static org.apache.flink.api.connector.sink2.InitContext.INITIAL_CHECKPOINT_ID; +import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; +import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; +import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; +import static org.assertj.core.api.Assertions.as; +import static org.assertj.core.api.Assertions.assertThat; + +class SinkV2SinkWriterOperatorTest { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testLoadPreviousSinkState(boolean stateful) throws Exception { + // 1. Build previous sink state + final List<String> previousSinkInputs = + Arrays.asList( + "bit", "mention", "thick", "stick", "stir", "easy", "sleep", "forth", + "cost", "prompt"); + + InspectableSink sink = sinkWithState(stateful, CompatibleStateSinkOperator.SINK_STATE_NAME); + int expectedState = 5; + final OneInputStreamOperatorTestHarness<String, String> previousSink = + new OneInputStreamOperatorTestHarness<>( + new CompatibleStateSinkOperator<>( + TestSinkV2.WRITER_SERIALIZER, expectedState), + StringSerializer.INSTANCE); + + OperatorSubtaskState previousSinkState = + TestHarnessUtil.buildSubtaskState(previousSink, previousSinkInputs); + + // 2. Load previous sink state and verify state + Sink<Integer> sink3 = sink.getSink(); + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> + compatibleWriterOperator = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink3)); + + // load the state from previous sink + compatibleWriterOperator.initializeState(previousSinkState); + assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? expectedState : 0); + + // 3. do another snapshot and check if this also can be restored without compabitible state + // name + compatibleWriterOperator.prepareSnapshotPreBarrier(1L); + OperatorSubtaskState snapshot = compatibleWriterOperator.snapshot(1L, 1L); + + compatibleWriterOperator.close(); + + // 4. Restore the sink without previous sink's state + InspectableSink sink2 = + sinkWithState(stateful, CompatibleStateSinkOperator.SINK_STATE_NAME); + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> + restoredSinkOperator = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink2.getSink())); + + restoredSinkOperator.initializeState(snapshot); + assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? expectedState : 0); + + restoredSinkOperator.close(); + } -class SinkV2SinkWriterOperatorTest extends SinkWriterOperatorTestBase { - @Override InspectableSink sinkWithoutCommitter() { TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new TestSinkV2.DefaultSinkWriter<>(); return new InspectableSink(TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build()); } - @Override InspectableSink sinkWithCommitter() { TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new TestSinkV2.DefaultCommittingSinkWriter<>(); @@ -48,7 +141,6 @@ class SinkV2SinkWriterOperatorTest extends SinkWriterOperatorTestBase { .build()); } - @Override InspectableSink sinkWithTimeBasedWriter() { TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new TimeBasedBufferingSinkWriter(); return new InspectableSink( @@ -58,7 +150,6 @@ class SinkV2SinkWriterOperatorTest extends SinkWriterOperatorTestBase { .build()); } - @Override InspectableSink sinkWithState(boolean withState, String stateName) { TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new TestSinkV2.DefaultStatefulSinkWriter<>(); @@ -76,6 +167,308 @@ class SinkV2SinkWriterOperatorTest extends SinkWriterOperatorTestBase { return new InspectableSink(builder.build()); } + @Test + void testNotEmitCommittablesWithoutCommitter() throws Exception { + InspectableSink sink = sinkWithoutCommitter(); + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink.getSink())); + testHarness.open(); + testHarness.processElement(1, 1); + + assertThat(testHarness.extractOutputValues()).isEmpty(); + assertThat(sink.getRecordsOfCurrentCheckpoint()) + .containsOnly("(1,1," + Long.MIN_VALUE + ")"); + + testHarness.prepareSnapshotPreBarrier(1); + assertThat(testHarness.extractOutputValues()).isEmpty(); + // Elements are flushed + assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty(); + testHarness.close(); + } + + @Test + void testWatermarkPropagatedToSinkWriter() throws Exception { + final long initialTime = 0; + + InspectableSink sink = sinkWithoutCommitter(); + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink.getSink())); + testHarness.open(); + + testHarness.processWatermark(initialTime); + testHarness.processWatermark(initialTime + 1); + + assertThat(testHarness.getOutput()) + .containsExactly( + new org.apache.flink.streaming.api.watermark.Watermark(initialTime), + new org.apache.flink.streaming.api.watermark.Watermark(initialTime + 1)); + assertThat(sink.getWatermarks()) + .containsExactly(new Watermark(initialTime), new Watermark(initialTime + 1)); + testHarness.close(); + } + + @Test + void testTimeBasedBufferingSinkWriter() throws Exception { + final long initialTime = 0; + + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sinkWithTimeBasedWriter().getSink())); + + testHarness.open(); + + testHarness.setProcessingTime(0L); + + testHarness.processElement(1, initialTime + 1); + testHarness.processElement(2, initialTime + 2); + + testHarness.prepareSnapshotPreBarrier(1L); + + // Expect empty committableSummary + assertBasicOutput(testHarness.extractOutputValues(), 0, 1L); + + testHarness.getProcessingTimeService().setCurrentTime(2001); + + testHarness.prepareSnapshotPreBarrier(2L); + + assertBasicOutput( + testHarness.extractOutputValues().stream().skip(1).collect(Collectors.toList()), + 2, + 2L); + testHarness.close(); + } + + @Test + void testEmitOnFlushWithCommitter() throws Exception { + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink())); + + testHarness.open(); + assertThat(testHarness.extractOutputValues()).isEmpty(); + + testHarness.processElement(1, 1); + testHarness.processElement(2, 2); + + // flush + testHarness.prepareSnapshotPreBarrier(1); + + assertBasicOutput(testHarness.extractOutputValues(), 2, 1L); + testHarness.close(); + } + + @Test + void testEmitOnEndOfInputInBatchMode() throws Exception { + final SinkWriterOperatorFactory<Integer, Integer> writerOperatorFactory = + new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink()); + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(writerOperatorFactory); + + testHarness.open(); + assertThat(testHarness.extractOutputValues()).isEmpty(); + + testHarness.processElement(1, 1); + testHarness.endInput(); + assertBasicOutput(testHarness.extractOutputValues(), 1, EOI); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testStateRestore(boolean stateful) throws Exception { + + final long initialTime = 0; + + final InspectableSink sink = sinkWithState(stateful, null); + Sink<Integer> sink2 = sink.getSink(); + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(new SinkWriterOperatorFactory<>(sink2)); + + testHarness.open(); + + testHarness.processWatermark(initialTime); + testHarness.processElement(1, initialTime + 1); + testHarness.processElement(2, initialTime + 2); + + testHarness.prepareSnapshotPreBarrier(1L); + OperatorSubtaskState snapshot = testHarness.snapshot(1L, 1L); + + assertThat(sink.getRecordCountFromState()).isEqualTo(2); + assertThat(sink.getLastCheckpointId()).isEqualTo(stateful ? 1L : -1L); + + testHarness.close(); + + final InspectableSink restoredSink = sinkWithState(stateful, null); + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> + restoredTestHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(restoredSink.getSink())); + + restoredTestHarness.initializeState(snapshot); + restoredTestHarness.open(); + + // check that the previous state is correctly restored + assertThat(restoredSink.getRecordCountFromState()).isEqualTo(stateful ? 2 : 0); + + restoredTestHarness.close(); + } + + @Test + void testRestoreCommitterState() throws Exception { + final List<String> committables = Arrays.asList("state1", "state2"); + + InspectableSink sink = sinkWithCommitter(); + final OneInputStreamOperatorTestHarness<String, String> committer = + new OneInputStreamOperatorTestHarness<>( + new TestCommitterOperator(TestSinkV2.COMMITTABLE_SERIALIZER), + StringSerializer.INSTANCE); + + final OperatorSubtaskState committerState = + TestHarnessUtil.buildSubtaskState(committer, committables); + + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink.getSink())); + + testHarness.initializeState(committerState); + + testHarness.open(); + + testHarness.prepareSnapshotPreBarrier(2); + + final ListAssert<CommittableMessage<Integer>> records = + assertThat(testHarness.extractOutputValues()).hasSize(4); + + records.element(0, as(committableSummary())) + .hasCheckpointId(INITIAL_CHECKPOINT_ID) + .hasOverallCommittables(committables.size()); + records.<CommittableWithLineageAssert<String>>element(1, as(committableWithLineage())) + .hasCommittable(committables.get(0)) + .hasCheckpointId(INITIAL_CHECKPOINT_ID) + .hasSubtaskId(0); + records.<CommittableWithLineageAssert<String>>element(2, as(committableWithLineage())) + .hasCommittable(committables.get(1)) + .hasCheckpointId(INITIAL_CHECKPOINT_ID) + .hasSubtaskId(0); + records.element(3, as(committableSummary())).hasCheckpointId(2L).hasOverallCommittables(0); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception { + InspectableSink sink = sinkWithCommitter(); + final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<String>> testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink.getSink())); + testHarness.open(); + testHarness.processElement(1, 1); + + assertThat(testHarness.extractOutputValues()).isEmpty(); + final String record = "(1,1," + Long.MIN_VALUE + ")"; + assertThat(sink.getRecordsOfCurrentCheckpoint()).containsOnly(record); + + testHarness.endInput(); + + if (isCheckpointingEnabled) { + testHarness.prepareSnapshotPreBarrier(1); + } + + List<String> committables = Collections.singletonList(record); + + ListAssert<CommittableMessage<String>> records = + assertThat(testHarness.extractOutputValues()).hasSize(committables.size() + 1); + records.element(0, as(committableSummary())).hasOverallCommittables(committables.size()); + + records.filteredOn(message -> message instanceof CommittableWithLineage) + .map(message -> ((CommittableWithLineage<String>) message).getCommittable()) + .containsExactlyInAnyOrderElementsOf(committables); + assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty(); + + testHarness.close(); + } + + @Test + void testInitContext() throws Exception { + final AtomicReference<WriterInitContext> initContext = new AtomicReference<>(); + final Sink<String> sink = + context -> { + initContext.set(context); + return null; + }; + + final int subtaskId = 1; + final int parallelism = 10; + final TypeSerializer<String> typeSerializer = StringSerializer.INSTANCE; + final JobID jobID = new JobID(); + + final MockEnvironment environment = + MockEnvironment.builder() + .setSubtaskIndex(subtaskId) + .setParallelism(parallelism) + .setMaxParallelism(parallelism) + .setJobID(jobID) + .setExecutionConfig(new ExecutionConfig().enableObjectReuse()) + .build(); + + final OneInputStreamOperatorTestHarness<String, CommittableMessage<String>> testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink), typeSerializer, environment); + testHarness.open(); + + assertThat(initContext.get().getUserCodeClassLoader()).isNotNull(); + assertThat(initContext.get().getMailboxExecutor()).isNotNull(); + assertThat(initContext.get().getProcessingTimeService()).isNotNull(); + assertThat(initContext.get().getTaskInfo().getIndexOfThisSubtask()).isEqualTo(subtaskId); + assertThat(initContext.get().getTaskInfo().getNumberOfParallelSubtasks()) + .isEqualTo(parallelism); + assertThat(initContext.get().getTaskInfo().getAttemptNumber()).isZero(); + assertThat(initContext.get().metricGroup()).isNotNull(); + assertThat(initContext.get().getRestoredCheckpointId()).isNotPresent(); + assertThat(initContext.get().isObjectReuseEnabled()).isTrue(); + assertThat(initContext.get().createInputSerializer()).isEqualTo(typeSerializer); + assertThat(initContext.get().getJobInfo().getJobId()).isEqualTo(jobID); + + testHarness.close(); + } + + private static void assertContextsEqual( + WriterInitContext initContext, WriterInitContext original) { + assertThat(initContext.getUserCodeClassLoader().asClassLoader()) + .isEqualTo(original.getUserCodeClassLoader().asClassLoader()); + assertThat(initContext.getMailboxExecutor()).isEqualTo(original.getMailboxExecutor()); + assertThat(initContext.getProcessingTimeService()) + .isEqualTo(original.getProcessingTimeService()); + assertThat(initContext.getTaskInfo().getIndexOfThisSubtask()) + .isEqualTo(original.getTaskInfo().getIndexOfThisSubtask()); + assertThat(initContext.getTaskInfo().getNumberOfParallelSubtasks()) + .isEqualTo(original.getTaskInfo().getNumberOfParallelSubtasks()); + assertThat(initContext.getTaskInfo().getAttemptNumber()) + .isEqualTo(original.getTaskInfo().getAttemptNumber()); + assertThat(initContext.metricGroup()).isEqualTo(original.metricGroup()); + assertThat(initContext.getRestoredCheckpointId()) + .isEqualTo(original.getRestoredCheckpointId()); + assertThat(initContext.isObjectReuseEnabled()).isEqualTo(original.isObjectReuseEnabled()); + assertThat(initContext.createInputSerializer()).isEqualTo(original.createInputSerializer()); + assertThat(initContext.getJobInfo().getJobId()).isEqualTo(original.getJobInfo().getJobId()); + assertThat(initContext.metadataConsumer()).isEqualTo(original.metadataConsumer()); + } + + private static void assertBasicOutput( + List<CommittableMessage<Integer>> output, int numberOfCommittables, long checkpointId) { + ListAssert<CommittableMessage<Integer>> records = + assertThat(output).hasSize(numberOfCommittables + 1); + CommittableSummaryAssert<Object> objectCommittableSummaryAssert = + records.element(0, as(committableSummary())) + .hasOverallCommittables(numberOfCommittables); + records.filteredOn(r -> r instanceof CommittableWithLineage) + .allSatisfy( + cl -> + SinkV2Assertions.assertThat((CommittableWithLineage<?>) cl) + .hasCheckpointId(checkpointId) + .hasSubtaskId(0)); + } + private static class TimeBasedBufferingSinkWriter extends TestSinkV2.DefaultCommittingSinkWriter<Integer> implements ProcessingTimeService.ProcessingTimeCallback { @@ -131,30 +524,124 @@ class SinkV2SinkWriterOperatorTest extends SinkWriterOperatorTestBase { } } - static class InspectableSink extends AbstractInspectableSink<TestSinkV2<Integer>> { + static class InspectableSink { + private final TestSinkV2<Integer> sink; + InspectableSink(TestSinkV2<Integer> sink) { - super(sink); + this.sink = sink; + } + + public TestSinkV2<Integer> getSink() { + return sink; } - @Override public long getLastCheckpointId() { return getSink().getWriter().lastCheckpointId; } - @Override public List<String> getRecordsOfCurrentCheckpoint() { return getSink().getWriter().elements; } - @Override public List<Watermark> getWatermarks() { return getSink().getWriter().watermarks; } - @Override public int getRecordCountFromState() { return ((TestSinkV2.DefaultStatefulSinkWriter<?>) getSink().getWriter()) .getRecordCount(); } } + + private static class TestCommitterOperator extends AbstractStreamOperator<String> + implements OneInputStreamOperator<String, String> { + + private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC = + new ListStateDescriptor<>( + "streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE); + private ListState<List<String>> committerState; + private final List<String> buffer = new ArrayList<>(); + private final SimpleVersionedSerializer<String> serializer; + + public TestCommitterOperator(SimpleVersionedSerializer<String> serializer) { + this.serializer = serializer; + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + committerState = + new SimpleVersionedListState<>( + context.getOperatorStateStore() + .getListState(STREAMING_COMMITTER_RAW_STATES_DESC), + new TestingCommittableSerializer(serializer)); + } + + @Override + public void processElement(StreamRecord<String> element) throws Exception { + buffer.add(element.getValue()); + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + committerState.add(buffer); + } + } + + /** Writes state to test whether the sink can read from alternative state names. */ + private static class CompatibleStateSinkOperator<T> extends AbstractStreamOperator<String> + implements OneInputStreamOperator<String, String> { + + static final String SINK_STATE_NAME = "compatible_sink_state"; + + static final ListStateDescriptor<byte[]> SINK_STATE_DESC = + new ListStateDescriptor<>(SINK_STATE_NAME, BytePrimitiveArraySerializer.INSTANCE); + ListState<T> sinkState; + private final SimpleVersionedSerializer<T> serializer; + private final T initialState; + + public CompatibleStateSinkOperator( + SimpleVersionedSerializer<T> serializer, T initialState) { + this.serializer = serializer; + this.initialState = initialState; + } + + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + sinkState = + new SimpleVersionedListState<>( + context.getOperatorStateStore().getListState(SINK_STATE_DESC), + serializer); + if (!context.isRestored()) { + sinkState.add(initialState); + } + } + + @Override + public void processElement(StreamRecord<String> element) { + // do nothing + } + } + + private static class TestingCommittableSerializer + extends SinkV1WriterCommittableSerializer<String> { + + private final SimpleVersionedSerializer<String> committableSerializer; + + public TestingCommittableSerializer( + SimpleVersionedSerializer<String> committableSerializer) { + super(committableSerializer); + this.committableSerializer = committableSerializer; + } + + @Override + public byte[] serialize(List<String> obj) throws IOException { + final DataOutputSerializer out = new DataOutputSerializer(256); + out.writeInt(SinkV1CommittableDeserializer.MAGIC_NUMBER); + SimpleVersionedSerialization.writeVersionAndSerializeList( + committableSerializer, obj, out); + return out.getCopyOfBuffer(); + } + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java deleted file mode 100644 index 6bde86216b9..00000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java +++ /dev/null @@ -1,558 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.sink; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; -import org.apache.flink.api.connector.sink2.Sink; -import org.apache.flink.api.connector.sink2.WriterInitContext; -import org.apache.flink.core.io.SimpleVersionedSerialization; -import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.core.memory.DataOutputSerializer; -import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; -import org.apache.flink.runtime.operators.testutils.MockEnvironment; -import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.runtime.state.StateSnapshotContext; -import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; -import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert; -import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; -import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineageAssert; -import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.operators.sink.committables.SinkV1CommittableDeserializer; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.TestHarnessUtil; - -import org.assertj.core.api.ListAssert; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; - -import static org.apache.flink.api.connector.sink2.InitContext.INITIAL_CHECKPOINT_ID; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; -import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; -import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; -import static org.assertj.core.api.Assertions.as; -import static org.assertj.core.api.Assertions.assertThat; - -abstract class SinkWriterOperatorTestBase { - - @Test - void testNotEmitCommittablesWithoutCommitter() throws Exception { - InspectableSink sink = sinkWithoutCommitter(); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink.getSink())); - testHarness.open(); - testHarness.processElement(1, 1); - - assertThat(testHarness.extractOutputValues()).isEmpty(); - assertThat(sink.getRecordsOfCurrentCheckpoint()) - .containsOnly("(1,1," + Long.MIN_VALUE + ")"); - - testHarness.prepareSnapshotPreBarrier(1); - assertThat(testHarness.extractOutputValues()).isEmpty(); - // Elements are flushed - assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty(); - testHarness.close(); - } - - @Test - void testWatermarkPropagatedToSinkWriter() throws Exception { - final long initialTime = 0; - - InspectableSink sink = sinkWithoutCommitter(); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink.getSink())); - testHarness.open(); - - testHarness.processWatermark(initialTime); - testHarness.processWatermark(initialTime + 1); - - assertThat(testHarness.getOutput()) - .containsExactly(new Watermark(initialTime), new Watermark(initialTime + 1)); - assertThat(sink.getWatermarks()) - .containsExactly( - new org.apache.flink.api.common.eventtime.Watermark(initialTime), - new org.apache.flink.api.common.eventtime.Watermark(initialTime + 1)); - testHarness.close(); - } - - @Test - void testTimeBasedBufferingSinkWriter() throws Exception { - final long initialTime = 0; - - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sinkWithTimeBasedWriter().getSink())); - - testHarness.open(); - - testHarness.setProcessingTime(0L); - - testHarness.processElement(1, initialTime + 1); - testHarness.processElement(2, initialTime + 2); - - testHarness.prepareSnapshotPreBarrier(1L); - - // Expect empty committableSummary - assertBasicOutput(testHarness.extractOutputValues(), 0, 1L); - - testHarness.getProcessingTimeService().setCurrentTime(2001); - - testHarness.prepareSnapshotPreBarrier(2L); - - assertBasicOutput( - testHarness.extractOutputValues().stream().skip(1).collect(Collectors.toList()), - 2, - 2L); - testHarness.close(); - } - - @Test - void testEmitOnFlushWithCommitter() throws Exception { - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink())); - - testHarness.open(); - assertThat(testHarness.extractOutputValues()).isEmpty(); - - testHarness.processElement(1, 1); - testHarness.processElement(2, 2); - - // flush - testHarness.prepareSnapshotPreBarrier(1); - - assertBasicOutput(testHarness.extractOutputValues(), 2, 1L); - testHarness.close(); - } - - @Test - void testEmitOnEndOfInputInBatchMode() throws Exception { - final SinkWriterOperatorFactory<Integer, Integer> writerOperatorFactory = - new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink()); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>(writerOperatorFactory); - - testHarness.open(); - assertThat(testHarness.extractOutputValues()).isEmpty(); - - testHarness.processElement(1, 1); - testHarness.endInput(); - assertBasicOutput(testHarness.extractOutputValues(), 1, EOI); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testStateRestore(boolean stateful) throws Exception { - - final long initialTime = 0; - - final InspectableSink sink = sinkWithState(stateful, null); - Sink<Integer> sink2 = sink.getSink(); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>(new SinkWriterOperatorFactory<>(sink2)); - - testHarness.open(); - - testHarness.processWatermark(initialTime); - testHarness.processElement(1, initialTime + 1); - testHarness.processElement(2, initialTime + 2); - - testHarness.prepareSnapshotPreBarrier(1L); - OperatorSubtaskState snapshot = testHarness.snapshot(1L, 1L); - - assertThat(sink.getRecordCountFromState()).isEqualTo(2); - assertThat(sink.getLastCheckpointId()).isEqualTo(stateful ? 1L : -1L); - - testHarness.close(); - - final InspectableSink restoredSink = sinkWithState(stateful, null); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> - restoredTestHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(restoredSink.getSink())); - - restoredTestHarness.initializeState(snapshot); - restoredTestHarness.open(); - - // check that the previous state is correctly restored - assertThat(restoredSink.getRecordCountFromState()).isEqualTo(stateful ? 2 : 0); - - restoredTestHarness.close(); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testLoadPreviousSinkState(boolean stateful) throws Exception { - // 1. Build previous sink state - final List<String> previousSinkInputs = - Arrays.asList( - "bit", "mention", "thick", "stick", "stir", "easy", "sleep", "forth", - "cost", "prompt"); - - InspectableSink sink = sinkWithState(stateful, CompatibleStateSinkOperator.SINK_STATE_NAME); - int expectedState = 5; - final OneInputStreamOperatorTestHarness<String, String> previousSink = - new OneInputStreamOperatorTestHarness<>( - new CompatibleStateSinkOperator<>( - TestSinkV2.WRITER_SERIALIZER, expectedState), - StringSerializer.INSTANCE); - - OperatorSubtaskState previousSinkState = - TestHarnessUtil.buildSubtaskState(previousSink, previousSinkInputs); - - // 2. Load previous sink state and verify state - Sink<Integer> sink3 = sink.getSink(); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> - compatibleWriterOperator = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink3)); - - // load the state from previous sink - compatibleWriterOperator.initializeState(previousSinkState); - assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? expectedState : 0); - - // 3. do another snapshot and check if this also can be restored without compabitible state - // name - compatibleWriterOperator.prepareSnapshotPreBarrier(1L); - OperatorSubtaskState snapshot = compatibleWriterOperator.snapshot(1L, 1L); - - compatibleWriterOperator.close(); - - // 4. Restore the sink without previous sink's state - InspectableSink sink2 = - sinkWithState(stateful, CompatibleStateSinkOperator.SINK_STATE_NAME); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> - restoredSinkOperator = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink2.getSink())); - - restoredSinkOperator.initializeState(snapshot); - assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? expectedState : 0); - - restoredSinkOperator.close(); - } - - @Test - void testRestoreCommitterState() throws Exception { - final List<String> committables = Arrays.asList("state1", "state2"); - - InspectableSink sink = sinkWithCommitter(); - final OneInputStreamOperatorTestHarness<String, String> committer = - new OneInputStreamOperatorTestHarness<>( - new TestCommitterOperator(TestSinkV2.COMMITTABLE_SERIALIZER), - StringSerializer.INSTANCE); - - final OperatorSubtaskState committerState = - TestHarnessUtil.buildSubtaskState(committer, committables); - - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink.getSink())); - - testHarness.initializeState(committerState); - - testHarness.open(); - - testHarness.prepareSnapshotPreBarrier(2); - - final ListAssert<CommittableMessage<Integer>> records = - assertThat(testHarness.extractOutputValues()).hasSize(4); - - records.element(0, as(committableSummary())) - .hasCheckpointId(INITIAL_CHECKPOINT_ID) - .hasOverallCommittables(committables.size()); - records.<CommittableWithLineageAssert<String>>element(1, as(committableWithLineage())) - .hasCommittable(committables.get(0)) - .hasCheckpointId(INITIAL_CHECKPOINT_ID) - .hasSubtaskId(0); - records.<CommittableWithLineageAssert<String>>element(2, as(committableWithLineage())) - .hasCommittable(committables.get(1)) - .hasCheckpointId(INITIAL_CHECKPOINT_ID) - .hasSubtaskId(0); - records.element(3, as(committableSummary())).hasCheckpointId(2L).hasOverallCommittables(0); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception { - InspectableSink sink = sinkWithCommitter(); - final OneInputStreamOperatorTestHarness<Integer, CommittableMessage<String>> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink.getSink())); - testHarness.open(); - testHarness.processElement(1, 1); - - assertThat(testHarness.extractOutputValues()).isEmpty(); - final String record = "(1,1," + Long.MIN_VALUE + ")"; - assertThat(sink.getRecordsOfCurrentCheckpoint()).containsOnly(record); - - testHarness.endInput(); - - if (isCheckpointingEnabled) { - testHarness.prepareSnapshotPreBarrier(1); - } - - List<String> committables = Collections.singletonList(record); - - ListAssert<CommittableMessage<String>> records = - assertThat(testHarness.extractOutputValues()).hasSize(committables.size() + 1); - records.element(0, as(committableSummary())).hasOverallCommittables(committables.size()); - - records.filteredOn(message -> message instanceof CommittableWithLineage) - .map(message -> ((CommittableWithLineage<String>) message).getCommittable()) - .containsExactlyInAnyOrderElementsOf(committables); - assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty(); - - testHarness.close(); - } - - @Test - void testInitContext() throws Exception { - final AtomicReference<org.apache.flink.api.connector.sink2.WriterInitContext> initContext = - new AtomicReference<>(); - final org.apache.flink.api.connector.sink2.Sink<String> sink = - context -> { - initContext.set(context); - return null; - }; - - final int subtaskId = 1; - final int parallelism = 10; - final TypeSerializer<String> typeSerializer = StringSerializer.INSTANCE; - final JobID jobID = new JobID(); - - final MockEnvironment environment = - MockEnvironment.builder() - .setSubtaskIndex(subtaskId) - .setParallelism(parallelism) - .setMaxParallelism(parallelism) - .setJobID(jobID) - .setExecutionConfig(new ExecutionConfig().enableObjectReuse()) - .build(); - - final OneInputStreamOperatorTestHarness<String, CommittableMessage<String>> testHarness = - new OneInputStreamOperatorTestHarness<>( - new SinkWriterOperatorFactory<>(sink), typeSerializer, environment); - testHarness.open(); - - assertThat(initContext.get().getUserCodeClassLoader()).isNotNull(); - assertThat(initContext.get().getMailboxExecutor()).isNotNull(); - assertThat(initContext.get().getProcessingTimeService()).isNotNull(); - assertThat(initContext.get().getTaskInfo().getIndexOfThisSubtask()).isEqualTo(subtaskId); - assertThat(initContext.get().getTaskInfo().getNumberOfParallelSubtasks()) - .isEqualTo(parallelism); - assertThat(initContext.get().getTaskInfo().getAttemptNumber()).isZero(); - assertThat(initContext.get().metricGroup()).isNotNull(); - assertThat(initContext.get().getRestoredCheckpointId()).isNotPresent(); - assertThat(initContext.get().isObjectReuseEnabled()).isTrue(); - assertThat(initContext.get().createInputSerializer()).isEqualTo(typeSerializer); - assertThat(initContext.get().getJobInfo().getJobId()).isEqualTo(jobID); - - testHarness.close(); - } - - private static void assertContextsEqual( - WriterInitContext initContext, WriterInitContext original) { - assertThat(initContext.getUserCodeClassLoader().asClassLoader()) - .isEqualTo(original.getUserCodeClassLoader().asClassLoader()); - assertThat(initContext.getMailboxExecutor()).isEqualTo(original.getMailboxExecutor()); - assertThat(initContext.getProcessingTimeService()) - .isEqualTo(original.getProcessingTimeService()); - assertThat(initContext.getTaskInfo().getIndexOfThisSubtask()) - .isEqualTo(original.getTaskInfo().getIndexOfThisSubtask()); - assertThat(initContext.getTaskInfo().getNumberOfParallelSubtasks()) - .isEqualTo(original.getTaskInfo().getNumberOfParallelSubtasks()); - assertThat(initContext.getTaskInfo().getAttemptNumber()) - .isEqualTo(original.getTaskInfo().getAttemptNumber()); - assertThat(initContext.metricGroup()).isEqualTo(original.metricGroup()); - assertThat(initContext.getRestoredCheckpointId()) - .isEqualTo(original.getRestoredCheckpointId()); - assertThat(initContext.isObjectReuseEnabled()).isEqualTo(original.isObjectReuseEnabled()); - assertThat(initContext.createInputSerializer()).isEqualTo(original.createInputSerializer()); - assertThat(initContext.getJobInfo().getJobId()).isEqualTo(original.getJobInfo().getJobId()); - assertThat(initContext.metadataConsumer()).isEqualTo(original.metadataConsumer()); - } - - private static void assertBasicOutput( - List<CommittableMessage<Integer>> output, int numberOfCommittables, long checkpointId) { - ListAssert<CommittableMessage<Integer>> records = - assertThat(output).hasSize(numberOfCommittables + 1); - CommittableSummaryAssert<Object> objectCommittableSummaryAssert = - records.element(0, as(committableSummary())) - .hasOverallCommittables(numberOfCommittables); - records.filteredOn(r -> r instanceof CommittableWithLineage) - .allSatisfy( - cl -> - SinkV2Assertions.assertThat((CommittableWithLineage<?>) cl) - .hasCheckpointId(checkpointId) - .hasSubtaskId(0)); - } - - private static class TestCommitterOperator extends AbstractStreamOperator<String> - implements OneInputStreamOperator<String, String> { - - private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC = - new ListStateDescriptor<>( - "streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE); - private ListState<List<String>> committerState; - private final List<String> buffer = new ArrayList<>(); - private final SimpleVersionedSerializer<String> serializer; - - public TestCommitterOperator(SimpleVersionedSerializer<String> serializer) { - this.serializer = serializer; - } - - @Override - public void initializeState(StateInitializationContext context) throws Exception { - super.initializeState(context); - committerState = - new SimpleVersionedListState<>( - context.getOperatorStateStore() - .getListState(STREAMING_COMMITTER_RAW_STATES_DESC), - new TestingCommittableSerializer(serializer)); - } - - @Override - public void processElement(StreamRecord<String> element) throws Exception { - buffer.add(element.getValue()); - } - - @Override - public void snapshotState(StateSnapshotContext context) throws Exception { - super.snapshotState(context); - committerState.add(buffer); - } - } - - /** Writes state to test whether the sink can read from alternative state names. */ - private static class CompatibleStateSinkOperator<T> extends AbstractStreamOperator<String> - implements OneInputStreamOperator<String, String> { - - static final String SINK_STATE_NAME = "compatible_sink_state"; - - static final ListStateDescriptor<byte[]> SINK_STATE_DESC = - new ListStateDescriptor<>(SINK_STATE_NAME, BytePrimitiveArraySerializer.INSTANCE); - ListState<T> sinkState; - private final SimpleVersionedSerializer<T> serializer; - private final T initialState; - - public CompatibleStateSinkOperator( - SimpleVersionedSerializer<T> serializer, T initialState) { - this.serializer = serializer; - this.initialState = initialState; - } - - public void initializeState(StateInitializationContext context) throws Exception { - super.initializeState(context); - sinkState = - new SimpleVersionedListState<>( - context.getOperatorStateStore().getListState(SINK_STATE_DESC), - serializer); - if (!context.isRestored()) { - sinkState.add(initialState); - } - } - - @Override - public void processElement(StreamRecord<String> element) { - // do nothing - } - } - - private static class TestingCommittableSerializer - extends SinkV1WriterCommittableSerializer<String> { - - private final SimpleVersionedSerializer<String> committableSerializer; - - public TestingCommittableSerializer( - SimpleVersionedSerializer<String> committableSerializer) { - super(committableSerializer); - this.committableSerializer = committableSerializer; - } - - @Override - public byte[] serialize(List<String> obj) throws IOException { - final DataOutputSerializer out = new DataOutputSerializer(256); - out.writeInt(SinkV1CommittableDeserializer.MAGIC_NUMBER); - SimpleVersionedSerialization.writeVersionAndSerializeList( - committableSerializer, obj, out); - return out.getCopyOfBuffer(); - } - } - - abstract InspectableSink sinkWithoutCommitter(); - - abstract InspectableSink sinkWithTimeBasedWriter(); - - abstract InspectableSink sinkWithState(boolean withState, String stateName); - - abstract InspectableSink sinkWithCommitter(); - - /** - * Basic abstraction to access the different flavors of sinks. Remove once the older interfaces - * are removed. - */ - interface InspectableSink { - long getLastCheckpointId(); - - List<String> getRecordsOfCurrentCheckpoint(); - - List<org.apache.flink.api.common.eventtime.Watermark> getWatermarks(); - - int getRecordCountFromState(); - - Sink<Integer> getSink(); - } - - abstract static class AbstractInspectableSink< - S extends org.apache.flink.api.connector.sink2.Sink<Integer>> - implements InspectableSink { - private final S sink; - - protected AbstractInspectableSink(S sink) { - this.sink = sink; - } - - @Override - public S getSink() { - return sink; - } - } -} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java deleted file mode 100644 index bb7795e38d4..00000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.sink; - -import org.apache.flink.api.connector.sink2.SupportsCommitter; - -import java.util.Collection; - -class WithAdapterCommitterOperatorTest extends CommitterOperatorTestBase { - - @Override - SinkAndCounters sinkWithPostCommit() { - ForwardingCommitter committer = new ForwardingCommitter(); - return new SinkAndCounters( - (SupportsCommitter<String>) - TestSinkV2.<String>newBuilder() - .setCommitter(committer) - .setWithPostCommitTopology(true) - .build(), - () -> committer.successfulCommits); - } - - @Override - SinkAndCounters sinkWithPostCommitWithRetry() { - return new SinkAndCounters( - (SupportsCommitter<String>) - TestSinkV2.<String>newBuilder() - .setCommitter(new TestSinkV2.RetryOnceCommitter()) - .setWithPostCommitTopology(true) - .build(), - () -> 0); - } - - @Override - SinkAndCounters sinkWithoutPostCommit() { - ForwardingCommitter committer = new ForwardingCommitter(); - return new SinkAndCounters( - (SupportsCommitter<String>) - TestSinkV2.<String>newBuilder().setCommitter(committer).build(), - () -> committer.successfulCommits); - } - - private static class ForwardingCommitter extends TestSinkV2.DefaultCommitter { - private int successfulCommits = 0; - - @Override - public void commit(Collection<CommitRequest<String>> committables) { - successfulCommits += committables.size(); - } - - @Override - public void close() throws Exception {} - } -} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java deleted file mode 100644 index b0bf67fe4d2..00000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.sink; - -import org.apache.flink.api.common.eventtime.Watermark; -import org.apache.flink.api.common.operators.ProcessingTimeService; -import org.apache.flink.api.connector.sink2.WriterInitContext; -import org.apache.flink.api.java.tuple.Tuple3; - -import java.util.ArrayList; -import java.util.List; - -class WithAdapterSinkWriterOperatorTest extends SinkWriterOperatorTestBase { - @Override - InspectableSink sinkWithoutCommitter() { - TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new TestSinkV2.DefaultSinkWriter<>(); - return new InspectableSink(TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build()); - } - - @Override - InspectableSink sinkWithCommitter() { - TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = - new TestSinkV2.DefaultCommittingSinkWriter<>(); - return new InspectableSink( - TestSinkV2.<Integer>newBuilder() - .setWriter(sinkWriter) - .setDefaultCommitter() - .build()); - } - - @Override - InspectableSink sinkWithTimeBasedWriter() { - TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new TimeBasedBufferingSinkWriter(); - return new InspectableSink( - TestSinkV2.<Integer>newBuilder() - .setWriter(sinkWriter) - .setDefaultCommitter() - .build()); - } - - @Override - InspectableSink sinkWithState(boolean withState, String stateName) { - TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = - new TestSinkV2.DefaultStatefulSinkWriter<>(); - TestSinkV2.Builder<Integer> builder = - TestSinkV2.<Integer>newBuilder() - .setWriter(sinkWriter) - .setDefaultCommitter() - .setWithPostCommitTopology(true); - builder.setWriterState(withState); - if (stateName != null) { - builder.setCompatibleStateNames(stateName); - } - return new InspectableSink(builder.build()); - } - - private static class TimeBasedBufferingSinkWriter - extends TestSinkV2.DefaultStatefulSinkWriter<Integer> - implements ProcessingTimeService.ProcessingTimeCallback { - - private final List<String> cachedCommittables = new ArrayList<>(); - private ProcessingTimeService processingTimeService; - - @Override - public void write(Integer element, Context context) { - cachedCommittables.add( - Tuple3.of(element, context.timestamp(), context.currentWatermark()).toString()); - } - - @Override - public void onProcessingTime(long time) { - elements.addAll(cachedCommittables); - cachedCommittables.clear(); - this.processingTimeService.registerTimer(time + 1000, this); - } - - @Override - public void init(WriterInitContext context) { - this.processingTimeService = context.getProcessingTimeService(); - this.processingTimeService.registerTimer(1000, this); - } - } - - static class InspectableSink - extends AbstractInspectableSink<org.apache.flink.api.connector.sink2.Sink<Integer>> { - private final TestSinkV2<Integer> sink; - - InspectableSink(TestSinkV2<Integer> sink) { - super(sink); - this.sink = sink; - } - - @Override - public long getLastCheckpointId() { - return sink.getWriter().lastCheckpointId; - } - - @Override - public List<String> getRecordsOfCurrentCheckpoint() { - return sink.getWriter().elements; - } - - @Override - public List<Watermark> getWatermarks() { - return sink.getWriter().watermarks; - } - - @Override - public int getRecordCountFromState() { - TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = sink.getWriter(); - if (sinkWriter instanceof TestSinkV2.DefaultStatefulSinkWriter) { - return ((TestSinkV2.DefaultStatefulSinkWriter<Integer>) sinkWriter) - .getRecordCount(); - } else { - return 0; - } - } - } -}
