fapaul commented on a change in pull request #113:
URL: https://github.com/apache/bahir-flink/pull/113#discussion_r598197430



##########
File path: 
flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException;
+import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import 
org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import 
org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator;
+import 
org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator;
+import org.apache.flink.util.Preconditions;
+import org.apache.pinot.client.PinotClientException;
+import org.apache.pinot.client.ResultSet;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.opentest4j.AssertionFailedError;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * E2e tests for Pinot Sink using BATCH and STREAMING execution mode
+ */
+public class PinotSinkTest extends PinotTestBase {
+
+    private static final int MAX_ROWS_PER_SEGMENT = 5;
+    private static final long STREAMING_CHECKPOINTING_INTERVAL = 50;
+    private static final int DATA_CHECKING_TIMEOUT_SECONDS = 60;
+    private static final AtomicBoolean hasFailedOnce = new 
AtomicBoolean(false);
+    private static CountDownLatch latch;
+
+    @BeforeEach
+    public void setUp() throws IOException {
+        super.setUp();
+        // Reset hasFailedOnce flag used during failure recovery testing 
before each test.
+        hasFailedOnce.set(false);
+        // Reset latch used to keep the generator streaming source up until 
the test is completed.
+        latch = new CountDownLatch(1);
+    }
+
+    /**
+     * Tests the BATCH execution of the {@link PinotSink}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testBatchSink() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.setParallelism(2);
+
+        List<String> rawData = getRawTestData(12);
+        DataStream<SingleColumnTableRow> dataStream = 
setupBatchDataSource(env, rawData);
+        setupSink(dataStream);
+
+        // Run
+        env.execute();
+
+        // Check for data in Pinot
+        checkForDataInPinotWithRetry(rawData);
+    }
+
+    /**
+     * Tests failure recovery of the {@link PinotSink} using BATCH execution 
mode.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testFailureRecoveryInBatchingSink() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10));
+        env.setParallelism(2);
+
+        List<String> rawData = getRawTestData(12);
+        DataStream<SingleColumnTableRow> dataStream = 
setupBatchDataSource(env, rawData);
+        dataStream = setupFailingMapper(dataStream, 8);
+        setupSink(dataStream);
+
+        // Run
+        env.execute();
+
+        // Check for data in Pinot
+        checkForDataInPinotWithRetry(rawData);
+    }
+
+    /**
+     * Tests the STREAMING execution of the {@link PinotSink}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testStreamingSink() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.setParallelism(2);
+        env.enableCheckpointing(STREAMING_CHECKPOINTING_INTERVAL);
+
+        List<String> rawData = getRawTestData(20);
+        DataStream<SingleColumnTableRow> dataStream = 
setupStreamingDataSource(env, rawData);
+        setupSink(dataStream);
+
+        // Start execution of job
+        env.executeAsync();
+
+        // Check for data in Pinot
+        checkForDataInPinotWithRetry(rawData);
+
+        // Generator source can now shut down
+        latch.countDown();
+    }
+
+    /**
+     * Tests failure recovery of the {@link PinotSink} using STREAMING 
execution mode.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testFailureRecoveryInStreamingSink() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        env.setParallelism(1);
+        env.enableCheckpointing(STREAMING_CHECKPOINTING_INTERVAL);
+
+        List<String> rawData = getRawTestData(20);
+        DataStream<SingleColumnTableRow> dataStream = 
setupStreamingDataSource(env, rawData);
+        // With a segment size of MAX_ROWS_PER_SEGMENT = 5 elements and a 
parallelism of 2,
+        // the failure will be raised once the first 2 segments were committed 
to Pinot.
+        dataStream = setupFailingMapper(dataStream, 12);
+        setupSink(dataStream);
+
+        // Start execution of job
+        env.executeAsync();
+
+        // Check for data in Pinot
+        checkForDataInPinotWithRetry(rawData);
+
+        // Generator source can now shut down
+        latch.countDown();
+    }
+
+    /**
+     * Generates a small test dataset consisting of {@link 
SingleColumnTableRow}s.
+     *
+     * @return List of SingleColumnTableRow
+     */
+    private List<String> getRawTestData(int numItems) {
+        return IntStream.range(1, numItems + 1)
+                .mapToObj(num -> "ColValue" + num)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Setup the data source for STREAMING tests.
+     *
+     * @param env           Stream execution environment
+     * @param rawDataValues Data values to send
+     * @return resulting data stream
+     */
+    private DataStream<SingleColumnTableRow> 
setupStreamingDataSource(StreamExecutionEnvironment env, List<String> 
rawDataValues) {
+        SimpleStreamingSource source = new 
SimpleStreamingSource(rawDataValues, 10);
+        return env.addSource(source)
+                .name("Test input");
+    }
+
+    /**
+     * Setup the data source for BATCH tests.
+     *
+     * @param env           Stream execution environment
+     * @param rawDataValues Data values to send
+     * @return resulting data stream
+     */
+    private DataStream<SingleColumnTableRow> 
setupBatchDataSource(StreamExecutionEnvironment env, List<String> 
rawDataValues) {
+        return env.fromCollection(rawDataValues)
+                .map(value -> new SingleColumnTableRow(value, 
System.currentTimeMillis()))
+                .name("Test input");
+    }
+
+    /**
+     * Setup a mapper that fails when processing the nth element with n = 
failOnceAtNthElement.
+     *
+     * @param dataStream           Input data stream
+     * @param failOnceAtNthElement Number of elements to process before 
raising the exception
+     * @return resulting data stream
+     */
+    private DataStream<SingleColumnTableRow> 
setupFailingMapper(DataStream<SingleColumnTableRow> dataStream, int 
failOnceAtNthElement) {
+        AtomicInteger messageCounter = new AtomicInteger(0);
+
+        return dataStream.map(element -> {
+            if (!hasFailedOnce.get() && messageCounter.incrementAndGet() == 
failOnceAtNthElement) {
+                hasFailedOnce.set(true);
+                // Wait more than STREAMING_CHECKPOINTING_INTERVAL to ensure
+                // that at least one checkpoint was created before raising the 
exception.
+                Thread.sleep(4 * STREAMING_CHECKPOINTING_INTERVAL);

Review comment:
       Maybe you can find a way to also remove this timeout :) One idea would 
be to make your map function implement `CheckpointedFunction` which allows you 
to receive the checkpoint notification.
   
   I also do not really know why it is important for the failure to have 
at-least one checkpoint.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to