fapaul commented on a change in pull request #113: URL: https://github.com/apache/bahir-flink/pull/113#discussion_r597836925
########## File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotSink.java ########## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot; + +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable; +import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommittable; +import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommitter; +import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor; +import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer; +import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter; +import org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator; +import org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator; +import org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkCommittableSerializer; +import org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkGlobalCommittableSerializer; +import org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkWriterStateSerializer; +import org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter; +import org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriterState; +import org.apache.flink.streaming.connectors.pinot.writer.PinotWriterSegment; +import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver; +import org.apache.pinot.core.segment.name.SegmentNameGenerator; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.tools.admin.command.UploadSegmentCommand; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Apache Pinot sink that stores objects from upstream Flink tasks in a Apache Pinot table. The sink + * can be operated in {@code RuntimeExecutionMode.STREAMING} or {@code RuntimeExecutionMode.BATCH} + * mode. But ensure to enable checkpointing when using in streaming mode. + * + * <p>We advise you to use the provided {@link PinotSink.Builder} to build and configure the + * PinotSink. All the communication with the Pinot cluster's table is managed via the Pinot + * controller. Thus you need to provide its host and port as well as the target Pinot table. + * The {@link TableConfig} and {@link Schema} is automatically retrieved via the Pinot controller API + * and therefore does not need to be provided. + * + * <p>Whenever an element is received by the sink it gets stored in a {@link PinotWriterSegment}. A + * {@link PinotWriterSegment} represents exactly one segment that will be pushed to the Pinot + * cluster later on. Its size is determined by the customizable {@code maxRowsPerSegment} parameter. + * Please note that the maximum segment size that can be handled by this sink is limited by the + * lower bound of memory available at each subTask. + * Each subTask holds a list of {@link PinotWriterSegment}s of which at most one is active. An + * active {@link PinotWriterSegment} is capable of accepting at least one more element. If a + * {@link PinotWriterSegment} switches from active to inactive it flushes its + * {@code maxRowsPerSegment} elements to disk. The data file is stored in the local filesystem's + * temporary directory and contains serialized elements. We use the {@link JsonSerializer} to + * serialize elements to JSON. + * + * <p>On checkpointing all not in-progress {@link PinotWriterSegment}s are transformed into + * committables. As the data files need to be shared across nodes, the sink requires access to a + * shared filesystem. We use the {@link FileSystemAdapter} for that purpose. + * A {@link FileSystemAdapter} is capable of copying a file from the local to the shared filesystem + * and vice-versa. A {@link PinotSinkCommittable} contains a reference to a data file on the shared + * filesystem as well as the minimum and maximum timestamp contained in the data file. A timestamp - + * usually the event time - is extracted from each received element via {@link EventTimeExtractor}. + * The timestamps are later on required to follow the guideline for naming Pinot segments. + * An eventually existent in-progress {@link PinotWriterSegment}'s state is saved in the snapshot + * taken when checkpointing. This ensures that the at-most-once delivery guarantee can be fulfilled + * when recovering from failures. + * + * <p>We use the {@link PinotSinkGlobalCommitter} to collect all created + * {@link PinotSinkCommittable}s, create segments from the referenced data files and finally push them + * to the Pinot table. Therefore, the minimum and maximum timestamp of all + * {@link PinotSinkCommittable} is determined. The segment names are then generated using the + * {@link PinotSinkSegmentNameGenerator} which gets the minimum and maximum timestamp as input. + * The segment generation starts with downloading the referenced data file from the shared file system + * using the provided {@link FileSystemAdapter}. Once this is was completed, we use Pinot's + * {@link SegmentIndexCreationDriver} to generate the final segment. Each segment is thereby stored + * in a temporary directory on the local filesystem. Next, the segment is uploaded to the Pinot + * controller using Pinot's {@link UploadSegmentCommand}. + * + * <p>To ensure that possible failures are handled accordingly each segment name is checked for + * existence within the Pinot cluster before uploading a segment. In case a segment name already + * exists, i.e. if the last commit failed partially with some segments already been uploaded, the + * existing segment is deleted first. When the elements since the last checkpoint are replayed the + * minimum and maximum timestamp of all received elements will be the same. Thus the same set of + * segment names is generated and we can delete previous segments by checking for segment name + * presence. Note: The {@link PinotSinkSegmentNameGenerator} must be deterministic. We also provide + * a {@link SimpleSegmentNameGenerator} which is a simple but for most users suitable segment name + * generator. + * + * <p>Please note that we use the {@link GlobalCommitter} to ensure consistent segment naming. This + * comes with performance limitations as a {@link GlobalCommitter} always runs at a parallelism of 1 + * which results in a clear bottleneck at the {@link PinotSinkGlobalCommitter} that does all the + * computational intensive work (i.e. generating and uploading segments). In order to overcome this + * issue we introduce a custom multithreading approach within the {@link PinotSinkGlobalCommitter} + * to parallelize the segment creation and upload process. + * + * @param <IN> Type of incoming elements + */ +public class PinotSink<IN> implements Sink<IN, PinotSinkCommittable, PinotSinkWriterState, PinotSinkGlobalCommittable> { + + private final String pinotControllerHost; + private final String pinotControllerPort; + private final String tableName; + private final int maxRowsPerSegment; + private final String tempDirPrefix; + private final JsonSerializer<IN> jsonSerializer; + private final SegmentNameGenerator segmentNameGenerator; + private final FileSystemAdapter fsAdapter; + private final EventTimeExtractor<IN> eventTimeExtractor; + private final int numCommitThreads; + + /** + * @param pinotControllerHost Host of the Pinot controller + * @param pinotControllerPort Port of the Pinot controller + * @param tableName Target table's name + * @param maxRowsPerSegment Maximum number of rows to be stored within a Pinot segment + * @param tempDirPrefix Prefix for temp directories used + * @param jsonSerializer Serializer used to convert elements to JSON + * @param eventTimeExtractor Defines the way event times are extracted from received objects + * @param segmentNameGenerator Pinot segment name generator + * @param fsAdapter Filesystem adapter used to save files for sharing files across nodes + * @param numCommitThreads Number of threads used in the {@link PinotSinkGlobalCommitter} for committing segments + */ + public PinotSink(String pinotControllerHost, String pinotControllerPort, String tableName, Review comment: You should **not** have a public constructor and offer a builder. I'd recommend making the constructor private. ########## File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotControllerHttpClient.java ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot; + +import org.apache.flink.annotation.Internal; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.*; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Helpers to interact with the Pinot controller via its public API. + */ +@Internal +public class PinotControllerHttpClient implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(PinotControllerHttpClient.class); + protected final String controllerHostPort; + protected final CloseableHttpClient httpClient; Review comment: private ########## File path: flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.streaming.connectors.pinot; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException; +import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor; +import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer; +import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter; +import org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator; +import org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator; +import org.apache.pinot.client.ResultSet; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.opentest4j.AssertionFailedError; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * E2e tests for Pinot Sink using BATCH and STREAMING execution mode + */ +public class PinotSinkTest extends PinotTestBase { + + /** + * Tests the BATCH execution of the {@link PinotSink}. + * + * @throws Exception + */ + @Test + public void testBatchSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + env.setParallelism(2); + + List<SingleColumnTableRow> data = getTestData(12); + this.setupDataStream(env, data); + + // Run + executeOnMiniCluster(env.getStreamGraph().getJobGraph()); + + checkForDataInPinotWithRetry(data, 20); + } + + /** + * Tests the STREAMING execution of the {@link PinotSink}. + * + * @throws Exception + */ + @Test + public void testStreamingSink() throws Exception { + final Configuration conf = new Configuration(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(conf); + env.setRuntimeMode(RuntimeExecutionMode.STREAMING); + env.setParallelism(2); + env.enableCheckpointing(50); + + List<SingleColumnTableRow> data = getTestData(12); + this.setupDataStream(env, data); + + // Run + executeOnMiniCluster(env.getStreamGraph().getJobGraph()); + + // We only expect the first 100 elements to be already committed to Pinot. + // The remaining would follow once we increase the input data size. + // The stream executions stops once the last input tuple was sent to the sink. + checkForDataInPinotWithRetry(data, 20); + } + + /** + * Generates a small test dataset consisting of {@link SingleColumnTableRow}s. + * + * @return List of SingleColumnTableRow + */ + private List<SingleColumnTableRow> getTestData(int numItems) { + return IntStream.range(1, numItems + 1) + .mapToObj(num -> "ColValue" + num) + .map(col1 -> new SingleColumnTableRow(col1, System.currentTimeMillis())) + .collect(Collectors.toList()); + } + + /** + * Executes a given JobGraph on a MiniCluster. + * + * @param jobGraph JobGraph to execute + * @throws Exception + */ + private void executeOnMiniCluster(JobGraph jobGraph) throws Exception { + final Configuration config = new Configuration(); + config.setString(RestOptions.BIND_PORT, "18081-19000"); + final MiniClusterConfiguration cfg = Review comment: I think it makes more sense to specify the mini cluster with the help of `MiniClusterWithClientResource` and make it a `@ClassRule`. This way the cluster is only instantiated once and it drastically speeds up the tests. https://github.com/apache/flink/blob/fd8bae505281c443d4a74b97a6666845744921e7/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java#L95 ########## File path: flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.streaming.connectors.pinot; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException; +import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor; +import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer; +import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter; +import org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator; +import org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator; +import org.apache.pinot.client.ResultSet; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.opentest4j.AssertionFailedError; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * E2e tests for Pinot Sink using BATCH and STREAMING execution mode + */ +public class PinotSinkTest extends PinotTestBase { + + /** + * Tests the BATCH execution of the {@link PinotSink}. + * + * @throws Exception + */ + @Test + public void testBatchSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + env.setParallelism(2); + + List<SingleColumnTableRow> data = getTestData(12); + this.setupDataStream(env, data); + + // Run + executeOnMiniCluster(env.getStreamGraph().getJobGraph()); + + checkForDataInPinotWithRetry(data, 20); + } + + /** + * Tests the STREAMING execution of the {@link PinotSink}. + * + * @throws Exception + */ + @Test + public void testStreamingSink() throws Exception { + final Configuration conf = new Configuration(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(conf); + env.setRuntimeMode(RuntimeExecutionMode.STREAMING); + env.setParallelism(2); + env.enableCheckpointing(50); + + List<SingleColumnTableRow> data = getTestData(12); + this.setupDataStream(env, data); + + // Run + executeOnMiniCluster(env.getStreamGraph().getJobGraph()); + + // We only expect the first 100 elements to be already committed to Pinot. + // The remaining would follow once we increase the input data size. + // The stream executions stops once the last input tuple was sent to the sink. + checkForDataInPinotWithRetry(data, 20); + } + + /** + * Generates a small test dataset consisting of {@link SingleColumnTableRow}s. + * + * @return List of SingleColumnTableRow + */ + private List<SingleColumnTableRow> getTestData(int numItems) { + return IntStream.range(1, numItems + 1) + .mapToObj(num -> "ColValue" + num) + .map(col1 -> new SingleColumnTableRow(col1, System.currentTimeMillis())) + .collect(Collectors.toList()); + } + + /** + * Executes a given JobGraph on a MiniCluster. + * + * @param jobGraph JobGraph to execute + * @throws Exception + */ + private void executeOnMiniCluster(JobGraph jobGraph) throws Exception { + final Configuration config = new Configuration(); + config.setString(RestOptions.BIND_PORT, "18081-19000"); + final MiniClusterConfiguration cfg = + new MiniClusterConfiguration.Builder() + .setNumTaskManagers(1) + .setNumSlotsPerTaskManager(4) + .setConfiguration(config) + .build(); + + try (MiniCluster miniCluster = new MiniCluster(cfg)) { + miniCluster.start(); + miniCluster.executeJobBlocking(jobGraph); + } + } + + /** + * Sets up a DataStream using the provided execution environment and the provided input data. + * + * @param env stream execution environment + * @param data Input data + */ + private void setupDataStream(StreamExecutionEnvironment env, List<SingleColumnTableRow> data) { + // Create test stream + DataStream<SingleColumnTableRow> theData = + env.fromCollection(data) + .name("Test input"); + + String tempDirPrefix = "flink-pinot-connector-test"; + PinotSinkSegmentNameGenerator segmentNameGenerator = new SimpleSegmentNameGenerator(TABLE_NAME, "flink-connector"); + FileSystemAdapter fsAdapter = new LocalFileSystemAdapter(tempDirPrefix); + JsonSerializer<SingleColumnTableRow> jsonSerializer = new SingleColumnTableRowSerializer(); + + EventTimeExtractor<SingleColumnTableRow> eventTimeExtractor = new SingleColumnTableRowEventTimeExtractor(); + + PinotSink<SingleColumnTableRow> sink = new PinotSink.Builder<SingleColumnTableRow>(getPinotHost(), getPinotControllerPort(), TABLE_NAME) + .withMaxRowsPerSegment(5) + .withTempDirectoryPrefix(tempDirPrefix) + .withJsonSerializer(jsonSerializer) + .withEventTimeExtractor(eventTimeExtractor) + .withSegmentNameGenerator(segmentNameGenerator) + .withFileSystemAdapter(fsAdapter) + .withNumCommitThreads(2) + .build(); + + // Sink into Pinot + theData.sinkTo(sink).name("Pinot sink"); + } + + /** + * As Pinot might take some time to index the recently pushed segments we might need to retry + * the {@link #checkForDataInPinot} method multiple times. This method provides a simple wrapper + * using linear retry backoff delay. + * + * @param data Data to expect in the Pinot table + * @param retryTimeoutInSeconds Maximum duration in seconds to wait for the data to arrive + * @throws InterruptedException + */ + private void checkForDataInPinotWithRetry(List<SingleColumnTableRow> data, int retryTimeoutInSeconds) throws InterruptedException { + long endTime = System.currentTimeMillis() + 1000L * retryTimeoutInSeconds; + // Use max 10 retries with linear retry backoff delay + long retryDelay = 1000L / 10 * retryTimeoutInSeconds; + do { + try { + checkForDataInPinot(data); + // In case of no error, we can skip further retries + return; + } catch (AssertionFailedError | PinotControllerApiException e) { Review comment: Do you still allow the `AssertionFailedError` I would assume once the job is finished all data is in pinot. ########## File path: flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestBase.java ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer; +import org.apache.flink.util.TestLogger; +import org.apache.pinot.spi.config.table.*; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.time.Duration; + +/** + * Base class for PinotSink e2e tests + */ +@Testcontainers +public class PinotTestBase extends TestLogger { + + protected static final Logger LOG = LoggerFactory.getLogger(PinotTestBase.class); + + private static final String DOCKER_IMAGE_NAME = "apachepinot/pinot:0.6.0"; + private static final Integer PINOT_INTERNAL_BROKER_PORT = 8000; + private static final Integer PINOT_INTERNAL_CONTROLLER_PORT = 9000; + + protected static final TableConfig TABLE_CONFIG = PinotTableConfig.getTableConfig(); + protected static final String TABLE_NAME = TABLE_CONFIG.getTableName(); + protected static final Schema TABLE_SCHEMA = PinotTableConfig.getTableSchema(); + protected static PinotTestHelper pinotHelper; + + /** + * Creates the Pinot testcontainer. We delay the start of tests until Pinot has started all + * internal components. This is identified through a log statement. + */ + @Container + public GenericContainer<?> pinot = new GenericContainer<>(DockerImageName.parse(DOCKER_IMAGE_NAME)) Review comment: You should try to make this variable `static` then it is shared between tests otherwise every test starts a new pinot container. ########## File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotWriterSegment.java ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot.writer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable; +import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer; +import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link PinotWriterSegment} represents exactly one segment that can be found in the Pinot + * cluster once the commit has been completed. + * + * @param <IN> Type of incoming elements + */ +@Internal +public class PinotWriterSegment<IN> implements Serializable { + + private final int maxRowsPerSegment; + private final JsonSerializer<IN> jsonSerializer; + private final FileSystemAdapter fsAdapter; + + private boolean acceptsElements = true; + + private final List<String> serializedElements; + private String dataPathOnSharedFS; + private long minTimestamp = Long.MAX_VALUE; + private long maxTimestamp = Long.MIN_VALUE; + + /** + * @param maxRowsPerSegment Maximum number of rows to be stored within a Pinot segment + * @param jsonSerializer Serializer used to convert elements to JSON + * @param fsAdapter Filesystem adapter used to save files for sharing files across nodes + */ + protected PinotWriterSegment(int maxRowsPerSegment, JsonSerializer<IN> jsonSerializer, FileSystemAdapter fsAdapter) { Review comment: ```suggestion PinotWriterSegment(int maxRowsPerSegment, JsonSerializer<IN> jsonSerializer, FileSystemAdapter fsAdapter) { ``` package-private? ########## File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriter.java ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot.writer; + +import com.google.common.collect.Iterables; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable; +import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor; +import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer; +import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Accepts incoming elements and creates {@link PinotSinkCommittable}s out of them on request. + * + * @param <IN> Type of incoming elements + */ +@Internal +public class PinotSinkWriter<IN> implements SinkWriter<IN, PinotSinkCommittable, PinotSinkWriterState> { + + private static final Logger LOG = LoggerFactory.getLogger(PinotSinkWriter.class); + + private final int maxRowsPerSegment; + private final EventTimeExtractor<IN> eventTimeExtractor; + private final JsonSerializer<IN> jsonSerializer; + + private final List<PinotWriterSegment<IN>> activeSegments; + private final FileSystemAdapter fsAdapter; + + private final int subtaskId; + + /** + * @param subtaskId Subtask id provided by Flink + * @param maxRowsPerSegment Maximum number of rows to be stored within a Pinot segment + * @param eventTimeExtractor Defines the way event times are extracted from received objects + * @param jsonSerializer Serializer used to convert elements to JSON + * @param fsAdapter Filesystem adapter used to save files for sharing files across nodes + */ + public PinotSinkWriter(int subtaskId, int maxRowsPerSegment, + EventTimeExtractor<IN> eventTimeExtractor, + JsonSerializer<IN> jsonSerializer, FileSystemAdapter fsAdapter) { + this.subtaskId = subtaskId; + this.maxRowsPerSegment = maxRowsPerSegment; + this.eventTimeExtractor = checkNotNull(eventTimeExtractor); + this.jsonSerializer = checkNotNull(jsonSerializer); + this.fsAdapter = checkNotNull(fsAdapter); + this.activeSegments = new ArrayList<>(); + } + + /** + * Takes elements from an upstream tasks and writes them into {@link PinotWriterSegment} + * + * @param element Object from upstream task + * @param context SinkWriter context + * @throws IOException + */ + @Override + public void write(IN element, Context context) throws IOException { + final PinotWriterSegment<IN> inProgressSegment = getOrCreateInProgressSegment(); + inProgressSegment.write(element, eventTimeExtractor.getEventTime(element, context)); + } + + /** + * Creates {@link PinotSinkCommittable}s from elements previously received via {@link #write}. + * If flush is set, all {@link PinotWriterSegment}s are transformed into + * {@link PinotSinkCommittable}s. If flush is not set, only currently non-active + * {@link PinotSinkCommittable}s are transformed into {@link PinotSinkCommittable}s. + * To convert a {@link PinotWriterSegment} into a {@link PinotSinkCommittable} the data gets + * written to the shared filesystem. Moreover, minimum and maximum timestamps are identified. + * Finally, all {@link PinotWriterSegment}s transformed into {@link PinotSinkCommittable}s are + * removed from {@link #activeSegments}. + * + * @param flush Flush all currently known elements into the {@link PinotSinkCommittable}s + * @return List of {@link PinotSinkCommittable} to process in {@link org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommitter} + * @throws IOException + */ + @Override + public List<PinotSinkCommittable> prepareCommit(boolean flush) throws IOException { + // Identify segments to commit. If the flush argument is set all segments shall be committed. + // Otherwise, take only those PinotWriterSegments that do not accept any more elements. + List<PinotWriterSegment<IN>> segmentsToCommit = activeSegments.stream() + .filter(s -> flush || !s.acceptsElements()) + .collect(Collectors.toList()); + LOG.debug("Identified {} segments to commit [subtaskId={}]", segmentsToCommit.size(), subtaskId); + + LOG.debug("Creating committables... [subtaskId={}]", subtaskId); + List<PinotSinkCommittable> committables = new ArrayList<>(); + for (final PinotWriterSegment<IN> segment : segmentsToCommit) { + committables.add(segment.prepareCommit()); + } + LOG.debug("Created {} committables [subtaskId={}]", committables.size(), subtaskId); + + // Remove all PinotWriterSegments that will be emitted within the committables. + activeSegments.removeAll(segmentsToCommit); + return committables; + } + + /** + * Gets the {@link PinotWriterSegment} still accepting elements or creates a new one. + * + * @return {@link PinotWriterSegment} accepting at least one more element + */ + private PinotWriterSegment<IN> getOrCreateInProgressSegment() { + final PinotWriterSegment<IN> latestSegment = Iterables.getLast(activeSegments, null); + if (latestSegment == null || !latestSegment.acceptsElements()) { + final PinotWriterSegment<IN> inProgressSegment = new PinotWriterSegment<>(maxRowsPerSegment, jsonSerializer, fsAdapter); + activeSegments.add(inProgressSegment); + return inProgressSegment; + } + return latestSegment; + } + + /** + * Snapshots the latest PinotWriterSegment (if existent), so that the contained (and not yet + * committed) elements can be recovered later on in case of a failure. + * + * @return A list containing at most one PinotSinkWriterState + */ + @Override + public List<PinotSinkWriterState> snapshotState() { + final PinotWriterSegment<IN> latestSegment = Iterables.getLast(activeSegments, null); + if (latestSegment == null || !latestSegment.acceptsElements()) { + return new ArrayList<>(); + } Review comment: Looks solid 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org