fapaul commented on a change in pull request #113:
URL: https://github.com/apache/bahir-flink/pull/113#discussion_r597836925



##########
File path: 
flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotSink.java
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
+import 
org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommittable;
+import 
org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommitter;
+import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import 
org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import 
org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator;
+import 
org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator;
+import 
org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkCommittableSerializer;
+import 
org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkGlobalCommittableSerializer;
+import 
org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkWriterStateSerializer;
+import org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter;
+import org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriterState;
+import org.apache.flink.streaming.connectors.pinot.writer.PinotWriterSegment;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.tools.admin.command.UploadSegmentCommand;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Apache Pinot sink that stores objects from upstream Flink tasks in a Apache 
Pinot table. The sink
+ * can be operated in {@code RuntimeExecutionMode.STREAMING} or {@code 
RuntimeExecutionMode.BATCH}
+ * mode. But ensure to enable checkpointing when using in streaming mode.
+ *
+ * <p>We advise you to use the provided {@link PinotSink.Builder} to build and 
configure the
+ * PinotSink. All the communication with the Pinot cluster's table is managed 
via the Pinot
+ * controller. Thus you need to provide its host and port as well as the 
target Pinot table.
+ * The {@link TableConfig} and {@link Schema} is automatically retrieved via 
the Pinot controller API
+ * and therefore does not need to be provided.
+ *
+ * <p>Whenever an element is received by the sink it gets stored in a {@link 
PinotWriterSegment}. A
+ * {@link PinotWriterSegment} represents exactly one segment that will be 
pushed to the Pinot
+ * cluster later on. Its size is determined by the customizable {@code 
maxRowsPerSegment} parameter.
+ * Please note that the maximum segment size that can be handled by this sink 
is limited by the
+ * lower bound of memory available at each subTask.
+ * Each subTask holds a list of {@link PinotWriterSegment}s of which at most 
one is active. An
+ * active {@link PinotWriterSegment} is capable of accepting at least one more 
element. If a
+ * {@link PinotWriterSegment} switches from active to inactive it flushes its
+ * {@code maxRowsPerSegment} elements to disk. The data file is stored in the 
local filesystem's
+ * temporary directory and contains serialized elements. We use the {@link 
JsonSerializer} to
+ * serialize elements to JSON.
+ *
+ * <p>On checkpointing all not in-progress {@link PinotWriterSegment}s are 
transformed into
+ * committables. As the data files need to be shared across nodes, the sink 
requires access to a
+ * shared filesystem. We use the {@link FileSystemAdapter} for that purpose.
+ * A {@link FileSystemAdapter} is capable of copying a file from the local to 
the shared filesystem
+ * and vice-versa. A {@link PinotSinkCommittable} contains a reference to a 
data file on the shared
+ * filesystem as well as the minimum and maximum timestamp contained in the 
data file. A timestamp -
+ * usually the event time - is extracted from each received element via {@link 
EventTimeExtractor}.
+ * The timestamps are later on required to follow the guideline for naming 
Pinot segments.
+ * An eventually existent in-progress {@link PinotWriterSegment}'s state is 
saved in the snapshot
+ * taken when checkpointing. This ensures that the at-most-once delivery 
guarantee can be fulfilled
+ * when recovering from failures.
+ *
+ * <p>We use the {@link PinotSinkGlobalCommitter} to collect all created
+ * {@link PinotSinkCommittable}s, create segments from the referenced data 
files and finally push them
+ * to the Pinot table. Therefore, the minimum and maximum timestamp of all
+ * {@link PinotSinkCommittable} is determined. The segment names are then 
generated using the
+ * {@link PinotSinkSegmentNameGenerator} which gets the minimum and maximum 
timestamp as input.
+ * The segment generation starts with downloading the referenced data file 
from the shared file system
+ * using the provided {@link FileSystemAdapter}. Once this is was completed, 
we use Pinot's
+ * {@link SegmentIndexCreationDriver} to generate the final segment. Each 
segment is thereby stored
+ * in a temporary directory on the local filesystem. Next, the segment is 
uploaded to the Pinot
+ * controller using Pinot's {@link UploadSegmentCommand}.
+ *
+ * <p>To ensure that possible failures are handled accordingly each segment 
name is checked for
+ * existence within the Pinot cluster before uploading a segment. In case a 
segment name already
+ * exists, i.e. if the last commit failed partially with some segments already 
been uploaded, the
+ * existing segment is deleted first. When the elements since the last 
checkpoint are replayed the
+ * minimum and maximum timestamp of all received elements will be the same. 
Thus the same set of
+ * segment names is generated and we can delete previous segments by checking 
for segment name
+ * presence. Note: The {@link PinotSinkSegmentNameGenerator} must be 
deterministic. We also provide
+ * a {@link SimpleSegmentNameGenerator} which is a simple but for most users 
suitable segment name
+ * generator.
+ *
+ * <p>Please note that we use the {@link GlobalCommitter} to ensure consistent 
segment naming. This
+ * comes with performance limitations as a {@link GlobalCommitter} always runs 
at a parallelism of 1
+ * which results in a clear bottleneck at the {@link PinotSinkGlobalCommitter} 
that does all the
+ * computational intensive work (i.e. generating and uploading segments). In 
order to overcome this
+ * issue we introduce a custom multithreading approach within the {@link 
PinotSinkGlobalCommitter}
+ * to parallelize the segment creation and upload process.
+ *
+ * @param <IN> Type of incoming elements
+ */
+public class PinotSink<IN> implements Sink<IN, PinotSinkCommittable, 
PinotSinkWriterState, PinotSinkGlobalCommittable> {
+
+    private final String pinotControllerHost;
+    private final String pinotControllerPort;
+    private final String tableName;
+    private final int maxRowsPerSegment;
+    private final String tempDirPrefix;
+    private final JsonSerializer<IN> jsonSerializer;
+    private final SegmentNameGenerator segmentNameGenerator;
+    private final FileSystemAdapter fsAdapter;
+    private final EventTimeExtractor<IN> eventTimeExtractor;
+    private final int numCommitThreads;
+
+    /**
+     * @param pinotControllerHost  Host of the Pinot controller
+     * @param pinotControllerPort  Port of the Pinot controller
+     * @param tableName            Target table's name
+     * @param maxRowsPerSegment    Maximum number of rows to be stored within 
a Pinot segment
+     * @param tempDirPrefix        Prefix for temp directories used
+     * @param jsonSerializer       Serializer used to convert elements to JSON
+     * @param eventTimeExtractor   Defines the way event times are extracted 
from received objects
+     * @param segmentNameGenerator Pinot segment name generator
+     * @param fsAdapter            Filesystem adapter used to save files for 
sharing files across nodes
+     * @param numCommitThreads     Number of threads used in the {@link 
PinotSinkGlobalCommitter} for committing segments
+     */
+    public PinotSink(String pinotControllerHost, String pinotControllerPort, 
String tableName,

Review comment:
       You should **not** have a public constructor and offer a builder. I'd 
recommend making the constructor private.

##########
File path: 
flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotControllerHttpClient.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.*;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Helpers to interact with the Pinot controller via its public API.
+ */
+@Internal
+public class PinotControllerHttpClient implements Closeable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PinotControllerHttpClient.class);
+    protected final String controllerHostPort;
+    protected final CloseableHttpClient httpClient;

Review comment:
       private

##########
File path: 
flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException;
+import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import 
org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import 
org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator;
+import 
org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator;
+import org.apache.pinot.client.ResultSet;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.opentest4j.AssertionFailedError;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * E2e tests for Pinot Sink using BATCH and STREAMING execution mode
+ */
+public class PinotSinkTest extends PinotTestBase {
+
+    /**
+     * Tests the BATCH execution of the {@link PinotSink}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testBatchSink() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.setParallelism(2);
+
+        List<SingleColumnTableRow> data = getTestData(12);
+        this.setupDataStream(env, data);
+
+        // Run
+        executeOnMiniCluster(env.getStreamGraph().getJobGraph());
+
+        checkForDataInPinotWithRetry(data, 20);
+    }
+
+    /**
+     * Tests the STREAMING execution of the {@link PinotSink}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testStreamingSink() throws Exception {
+        final Configuration conf = new Configuration();
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(conf);
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        env.setParallelism(2);
+        env.enableCheckpointing(50);
+
+        List<SingleColumnTableRow> data = getTestData(12);
+        this.setupDataStream(env, data);
+
+        // Run
+        executeOnMiniCluster(env.getStreamGraph().getJobGraph());
+
+        // We only expect the first 100 elements to be already committed to 
Pinot.
+        // The remaining would follow once we increase the input data size.
+        // The stream executions stops once the last input tuple was sent to 
the sink.
+        checkForDataInPinotWithRetry(data, 20);
+    }
+
+    /**
+     * Generates a small test dataset consisting of {@link 
SingleColumnTableRow}s.
+     *
+     * @return List of SingleColumnTableRow
+     */
+    private List<SingleColumnTableRow> getTestData(int numItems) {
+        return IntStream.range(1, numItems + 1)
+                .mapToObj(num -> "ColValue" + num)
+                .map(col1 -> new SingleColumnTableRow(col1, 
System.currentTimeMillis()))
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Executes a given JobGraph on a MiniCluster.
+     *
+     * @param jobGraph JobGraph to execute
+     * @throws Exception
+     */
+    private void executeOnMiniCluster(JobGraph jobGraph) throws Exception {
+        final Configuration config = new Configuration();
+        config.setString(RestOptions.BIND_PORT, "18081-19000");
+        final MiniClusterConfiguration cfg =

Review comment:
       I think it makes more sense to specify the mini cluster with the help of 
`MiniClusterWithClientResource` and make it a `@ClassRule`. This way the 
cluster is only instantiated once and it drastically speeds up the tests. 
https://github.com/apache/flink/blob/fd8bae505281c443d4a74b97a6666845744921e7/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java#L95

##########
File path: 
flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException;
+import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import 
org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import 
org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator;
+import 
org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator;
+import org.apache.pinot.client.ResultSet;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.opentest4j.AssertionFailedError;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * E2e tests for Pinot Sink using BATCH and STREAMING execution mode
+ */
+public class PinotSinkTest extends PinotTestBase {
+
+    /**
+     * Tests the BATCH execution of the {@link PinotSink}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testBatchSink() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.setParallelism(2);
+
+        List<SingleColumnTableRow> data = getTestData(12);
+        this.setupDataStream(env, data);
+
+        // Run
+        executeOnMiniCluster(env.getStreamGraph().getJobGraph());
+
+        checkForDataInPinotWithRetry(data, 20);
+    }
+
+    /**
+     * Tests the STREAMING execution of the {@link PinotSink}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testStreamingSink() throws Exception {
+        final Configuration conf = new Configuration();
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(conf);
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        env.setParallelism(2);
+        env.enableCheckpointing(50);
+
+        List<SingleColumnTableRow> data = getTestData(12);
+        this.setupDataStream(env, data);
+
+        // Run
+        executeOnMiniCluster(env.getStreamGraph().getJobGraph());
+
+        // We only expect the first 100 elements to be already committed to 
Pinot.
+        // The remaining would follow once we increase the input data size.
+        // The stream executions stops once the last input tuple was sent to 
the sink.
+        checkForDataInPinotWithRetry(data, 20);
+    }
+
+    /**
+     * Generates a small test dataset consisting of {@link 
SingleColumnTableRow}s.
+     *
+     * @return List of SingleColumnTableRow
+     */
+    private List<SingleColumnTableRow> getTestData(int numItems) {
+        return IntStream.range(1, numItems + 1)
+                .mapToObj(num -> "ColValue" + num)
+                .map(col1 -> new SingleColumnTableRow(col1, 
System.currentTimeMillis()))
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Executes a given JobGraph on a MiniCluster.
+     *
+     * @param jobGraph JobGraph to execute
+     * @throws Exception
+     */
+    private void executeOnMiniCluster(JobGraph jobGraph) throws Exception {
+        final Configuration config = new Configuration();
+        config.setString(RestOptions.BIND_PORT, "18081-19000");
+        final MiniClusterConfiguration cfg =
+                new MiniClusterConfiguration.Builder()
+                        .setNumTaskManagers(1)
+                        .setNumSlotsPerTaskManager(4)
+                        .setConfiguration(config)
+                        .build();
+
+        try (MiniCluster miniCluster = new MiniCluster(cfg)) {
+            miniCluster.start();
+            miniCluster.executeJobBlocking(jobGraph);
+        }
+    }
+
+    /**
+     * Sets up a DataStream using the provided execution environment and the 
provided input data.
+     *
+     * @param env  stream execution environment
+     * @param data Input data
+     */
+    private void setupDataStream(StreamExecutionEnvironment env, 
List<SingleColumnTableRow> data) {
+        // Create test stream
+        DataStream<SingleColumnTableRow> theData =
+                env.fromCollection(data)
+                        .name("Test input");
+
+        String tempDirPrefix = "flink-pinot-connector-test";
+        PinotSinkSegmentNameGenerator segmentNameGenerator = new 
SimpleSegmentNameGenerator(TABLE_NAME, "flink-connector");
+        FileSystemAdapter fsAdapter = new 
LocalFileSystemAdapter(tempDirPrefix);
+        JsonSerializer<SingleColumnTableRow> jsonSerializer = new 
SingleColumnTableRowSerializer();
+
+        EventTimeExtractor<SingleColumnTableRow> eventTimeExtractor = new 
SingleColumnTableRowEventTimeExtractor();
+
+        PinotSink<SingleColumnTableRow> sink = new 
PinotSink.Builder<SingleColumnTableRow>(getPinotHost(), 
getPinotControllerPort(), TABLE_NAME)
+                .withMaxRowsPerSegment(5)
+                .withTempDirectoryPrefix(tempDirPrefix)
+                .withJsonSerializer(jsonSerializer)
+                .withEventTimeExtractor(eventTimeExtractor)
+                .withSegmentNameGenerator(segmentNameGenerator)
+                .withFileSystemAdapter(fsAdapter)
+                .withNumCommitThreads(2)
+                .build();
+
+        // Sink into Pinot
+        theData.sinkTo(sink).name("Pinot sink");
+    }
+
+    /**
+     * As Pinot might take some time to index the recently pushed segments we 
might need to retry
+     * the {@link #checkForDataInPinot} method multiple times. This method 
provides a simple wrapper
+     * using linear retry backoff delay.
+     *
+     * @param data                  Data to expect in the Pinot table
+     * @param retryTimeoutInSeconds Maximum duration in seconds to wait for 
the data to arrive
+     * @throws InterruptedException
+     */
+    private void checkForDataInPinotWithRetry(List<SingleColumnTableRow> data, 
int retryTimeoutInSeconds) throws InterruptedException {
+        long endTime = System.currentTimeMillis() + 1000L * 
retryTimeoutInSeconds;
+        // Use max 10 retries with linear retry backoff delay
+        long retryDelay = 1000L / 10 * retryTimeoutInSeconds;
+        do {
+            try {
+                checkForDataInPinot(data);
+                // In case of no error, we can skip further retries
+                return;
+            } catch (AssertionFailedError | PinotControllerApiException e) {

Review comment:
       Do you still allow the `AssertionFailedError` I would assume once the 
job is finished all data is in pinot.

##########
File path: 
flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestBase.java
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import org.apache.flink.util.TestLogger;
+import org.apache.pinot.spi.config.table.*;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.time.Duration;
+
+/**
+ * Base class for PinotSink e2e tests
+ */
+@Testcontainers
+public class PinotTestBase extends TestLogger {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(PinotTestBase.class);
+
+    private static final String DOCKER_IMAGE_NAME = "apachepinot/pinot:0.6.0";
+    private static final Integer PINOT_INTERNAL_BROKER_PORT = 8000;
+    private static final Integer PINOT_INTERNAL_CONTROLLER_PORT = 9000;
+
+    protected static final TableConfig TABLE_CONFIG = 
PinotTableConfig.getTableConfig();
+    protected static final String TABLE_NAME = TABLE_CONFIG.getTableName();
+    protected static final Schema TABLE_SCHEMA = 
PinotTableConfig.getTableSchema();
+    protected static PinotTestHelper pinotHelper;
+
+    /**
+     * Creates the Pinot testcontainer. We delay the start of tests until 
Pinot has started all
+     * internal components. This is identified through a log statement.
+     */
+    @Container
+    public GenericContainer<?> pinot = new 
GenericContainer<>(DockerImageName.parse(DOCKER_IMAGE_NAME))

Review comment:
       You should try to make this variable `static` then it is shared between 
tests otherwise every test starts a new pinot container.

##########
File path: 
flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotWriterSegment.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.writer;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import 
org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link PinotWriterSegment} represents exactly one segment that can be 
found in the Pinot
+ * cluster once the commit has been completed.
+ *
+ * @param <IN> Type of incoming elements
+ */
+@Internal
+public class PinotWriterSegment<IN> implements Serializable {
+
+    private final int maxRowsPerSegment;
+    private final JsonSerializer<IN> jsonSerializer;
+    private final FileSystemAdapter fsAdapter;
+
+    private boolean acceptsElements = true;
+
+    private final List<String> serializedElements;
+    private String dataPathOnSharedFS;
+    private long minTimestamp = Long.MAX_VALUE;
+    private long maxTimestamp = Long.MIN_VALUE;
+
+    /**
+     * @param maxRowsPerSegment Maximum number of rows to be stored within a 
Pinot segment
+     * @param jsonSerializer    Serializer used to convert elements to JSON
+     * @param fsAdapter         Filesystem adapter used to save files for 
sharing files across nodes
+     */
+    protected PinotWriterSegment(int maxRowsPerSegment, JsonSerializer<IN> 
jsonSerializer, FileSystemAdapter fsAdapter) {

Review comment:
       ```suggestion
      PinotWriterSegment(int maxRowsPerSegment, JsonSerializer<IN> 
jsonSerializer, FileSystemAdapter fsAdapter) {
   ```
   package-private?

##########
File path: 
flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriter.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.writer;
+
+import com.google.common.collect.Iterables;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import 
org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
+import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import 
org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Accepts incoming elements and creates {@link PinotSinkCommittable}s out of 
them on request.
+ *
+ * @param <IN> Type of incoming elements
+ */
+@Internal
+public class PinotSinkWriter<IN> implements SinkWriter<IN, 
PinotSinkCommittable, PinotSinkWriterState> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PinotSinkWriter.class);
+
+    private final int maxRowsPerSegment;
+    private final EventTimeExtractor<IN> eventTimeExtractor;
+    private final JsonSerializer<IN> jsonSerializer;
+
+    private final List<PinotWriterSegment<IN>> activeSegments;
+    private final FileSystemAdapter fsAdapter;
+
+    private final int subtaskId;
+
+    /**
+     * @param subtaskId          Subtask id provided by Flink
+     * @param maxRowsPerSegment  Maximum number of rows to be stored within a 
Pinot segment
+     * @param eventTimeExtractor Defines the way event times are extracted 
from received objects
+     * @param jsonSerializer     Serializer used to convert elements to JSON
+     * @param fsAdapter          Filesystem adapter used to save files for 
sharing files across nodes
+     */
+    public PinotSinkWriter(int subtaskId, int maxRowsPerSegment,
+                           EventTimeExtractor<IN> eventTimeExtractor,
+                           JsonSerializer<IN> jsonSerializer, 
FileSystemAdapter fsAdapter) {
+        this.subtaskId = subtaskId;
+        this.maxRowsPerSegment = maxRowsPerSegment;
+        this.eventTimeExtractor = checkNotNull(eventTimeExtractor);
+        this.jsonSerializer = checkNotNull(jsonSerializer);
+        this.fsAdapter = checkNotNull(fsAdapter);
+        this.activeSegments = new ArrayList<>();
+    }
+
+    /**
+     * Takes elements from an upstream tasks and writes them into {@link 
PinotWriterSegment}
+     *
+     * @param element Object from upstream task
+     * @param context SinkWriter context
+     * @throws IOException
+     */
+    @Override
+    public void write(IN element, Context context) throws IOException {
+        final PinotWriterSegment<IN> inProgressSegment = 
getOrCreateInProgressSegment();
+        inProgressSegment.write(element, 
eventTimeExtractor.getEventTime(element, context));
+    }
+
+    /**
+     * Creates {@link PinotSinkCommittable}s from elements previously received 
via {@link #write}.
+     * If flush is set, all {@link PinotWriterSegment}s are transformed into
+     * {@link PinotSinkCommittable}s. If flush is not set, only currently 
non-active
+     * {@link PinotSinkCommittable}s are transformed into {@link 
PinotSinkCommittable}s.
+     * To convert a {@link PinotWriterSegment} into a {@link 
PinotSinkCommittable} the data gets
+     * written to the shared filesystem. Moreover, minimum and maximum 
timestamps are identified.
+     * Finally, all {@link PinotWriterSegment}s transformed into {@link 
PinotSinkCommittable}s are
+     * removed from {@link #activeSegments}.
+     *
+     * @param flush Flush all currently known elements into the {@link 
PinotSinkCommittable}s
+     * @return List of {@link PinotSinkCommittable} to process in {@link 
org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommitter}
+     * @throws IOException
+     */
+    @Override
+    public List<PinotSinkCommittable> prepareCommit(boolean flush) throws 
IOException {
+        // Identify segments to commit. If the flush argument is set all 
segments shall be committed.
+        // Otherwise, take only those PinotWriterSegments that do not accept 
any more elements.
+        List<PinotWriterSegment<IN>> segmentsToCommit = activeSegments.stream()
+                .filter(s -> flush || !s.acceptsElements())
+                .collect(Collectors.toList());
+        LOG.debug("Identified {} segments to commit [subtaskId={}]", 
segmentsToCommit.size(), subtaskId);
+
+        LOG.debug("Creating committables... [subtaskId={}]", subtaskId);
+        List<PinotSinkCommittable> committables = new ArrayList<>();
+        for (final PinotWriterSegment<IN> segment : segmentsToCommit) {
+            committables.add(segment.prepareCommit());
+        }
+        LOG.debug("Created {} committables [subtaskId={}]", 
committables.size(), subtaskId);
+
+        // Remove all PinotWriterSegments that will be emitted within the 
committables.
+        activeSegments.removeAll(segmentsToCommit);
+        return committables;
+    }
+
+    /**
+     * Gets the {@link PinotWriterSegment} still accepting elements or creates 
a new one.
+     *
+     * @return {@link PinotWriterSegment} accepting at least one more element
+     */
+    private PinotWriterSegment<IN> getOrCreateInProgressSegment() {
+        final PinotWriterSegment<IN> latestSegment = 
Iterables.getLast(activeSegments, null);
+        if (latestSegment == null || !latestSegment.acceptsElements()) {
+            final PinotWriterSegment<IN> inProgressSegment = new 
PinotWriterSegment<>(maxRowsPerSegment, jsonSerializer, fsAdapter);
+            activeSegments.add(inProgressSegment);
+            return inProgressSegment;
+        }
+        return latestSegment;
+    }
+
+    /**
+     * Snapshots the latest PinotWriterSegment (if existent), so that the 
contained (and not yet
+     * committed) elements can be recovered later on in case of a failure.
+     *
+     * @return A list containing at most one PinotSinkWriterState
+     */
+    @Override
+    public List<PinotSinkWriterState> snapshotState() {
+        final PinotWriterSegment<IN> latestSegment = 
Iterables.getLast(activeSegments, null);
+        if (latestSegment == null || !latestSegment.acceptsElements()) {
+            return new ArrayList<>();
+        }

Review comment:
       Looks solid 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to