This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a9e4e5289f [Backfill] flink based upsert table backfill support
(#13837)
a9e4e5289f is described below
commit a9e4e5289f6335c7c7010a69fbf6af754d8b1a6f
Author: rohit <[email protected]>
AuthorDate: Mon Sep 9 22:30:48 2024 +0530
[Backfill] flink based upsert table backfill support (#13837)
* flink based upsert table backfill support
* make UploadedRealtimeSegmentName mandatory only for upsert table backfill
* more integration tests, bug fix and review comments
---
pinot-connectors/pinot-flink-connector/README.md | 42 +++-
.../connector/flink/sink/FlinkSegmentWriter.java | 30 ++-
.../connector/flink/sink/PinotSinkFunction.java | 23 +-
.../sink/PinotSinkIntegrationTestUpsertTable.java | 240 +++++++++++++++++++++
.../src/test/resources/gameScores_csv.tar.gz | Bin 0 -> 266 bytes
.../src/test/resources/upsert_table_test.schema | 34 +++
.../pinot/segment/local/utils/IngestionUtils.java | 11 +
.../pinot/spi/ingestion/batch/BatchConfig.java | 14 +-
.../spi/ingestion/batch/BatchConfigProperties.java | 2 +
9 files changed, 384 insertions(+), 12 deletions(-)
diff --git a/pinot-connectors/pinot-flink-connector/README.md
b/pinot-connectors/pinot-flink-connector/README.md
index 9a594b55c5..44013718e2 100644
--- a/pinot-connectors/pinot-flink-connector/README.md
+++ b/pinot-connectors/pinot-flink-connector/README.md
@@ -23,13 +23,18 @@
Flink connector to write data to Pinot directly. This is useful for
backfilling or bootstrapping tables,
including the upsert tables. You can read more about the motivation and design
in this [design
proposal](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177045634).
-## Quick Start
+## Quick Start for offline table backfill
```java
+// Set up flink env and data source
StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
-execEnv.setParallelism(2);
-DataStream<Row> srcDs =
execEnv.fromCollection(data).returns(TEST_TYPE_INFO).keyBy(r -> r.getField(0));
+execEnv.setParallelism(2); // optional
+DataStream<Row> srcDs = execEnv.fromCollection(data).returns(TEST_TYPE_INFO)
+
+// Create a ControllerRequestClient to fetch Pinot schema and table config
+HttpClient httpClient = HttpClient.getInstance();
+ControllerRequestClient client = new ControllerRequestClient(
+ControllerRequestURLBuilder.baseUrl(DEFAULT_CONTROLLER_URL), httpClient);
-PinotControllerClient client = new PinotControllerClient();
// fetch Pinot schema
Schema schema = PinotConnectionUtils.getSchema(client, "starbucksStores");
// fetch Pinot table config
@@ -39,9 +44,34 @@ srcDs.addSink(new PinotSinkFunction<>(new
PinotRowRecordConverter(TEST_TYPE_INFO
execEnv.execute();
```
+## Quick start for realtime(upsert) table backfill
+```java
+// Set up flink env and data source
+StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+execEnv.setParallelism(2); // mandatory for upsert tables wi
+DataStream<Row> srcDs = execEnv.fromCollection(data).returns(TEST_TYPE_INFO)
+
+// Create a ControllerRequestClient to fetch Pinot schema and table config
+HttpClient httpClient = HttpClient.getInstance();
+ControllerRequestClient client = new ControllerRequestClient(
+ControllerRequestURLBuilder.baseUrl(DEFAULT_CONTROLLER_URL), httpClient);
+
+// fetch Pinot schema
+Schema schema = PinotConnectionUtils.getSchema(client, "starbucksStores");
+// fetch Pinot table config
+TableConfig tableConfig = PinotConnectionUtils.getTableConfig(client,
"starbucksStores", "REALTIME");
+
+// create Flink Pinot Sink (partition it same as the realtime stream(e.g.
kafka) in case of upsert tables)
+srcDs.partitionCustom((Partitioner<Integer>) (key, partitions) -> key %
partitions, r -> (Integer) r.getField("primaryKey"))
+ .addSink(new PinotSinkFunction<>(new
PinotRowRecordConverter(TEST_TYPE_INFO), tableConfig, schema));
+execEnv.execute();
+
+```
+
For more examples, please see
`src/main/java/org/apache/pinot/connector/flink/FlinkQuickStart.java`
## Notes for backfilling upsert table
- - To correctly partition the output segments by the primary key, the Flink
job *must* also include the partitionByKey operator before the Sink operator
- - The parallelism of the job *must* be set the same as the number of
partitions of the Pinot table, so that the sink in each task executor can
generate the segment of same partitions.
+ - To correctly partition the output segments by the primary key, the Flink
job *must* also include the partitionCustom operator with the same stream
partition logic before the Sink operator
+ - The parallelism of the job *must* be set the same as the number of
partitions of the Pinot table(same as upstream), so that the sink in each task
executor can generate the segment of same partitions.
+ - The segment naming strategy is crucial for upsert table backfill with right
segment assignmets and only UploadedRealtimeSegmentNameGenerator will be used
even if any other segment name type is specified in BatchConfig.
- It’s important to plan the resource usage to avoid capacity issues such as
out of memory. In particular, Pinot sink has an in-memory buffer of records,
and it flushes when the threshold is reached. Currently, the threshold on the
number of records is supported via the config of `segmentFlushMaxNumRecords`.
In the future, we could add other types of threshold such as the memory usage
of the buffer.
diff --git
a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
index 743d222b35..f43bd2e67d 100644
---
a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
+++
b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
@@ -28,6 +28,7 @@ import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
@@ -69,6 +70,8 @@ public class FlinkSegmentWriter implements SegmentWriter {
private static final Logger LOGGER =
LoggerFactory.getLogger(FlinkSegmentWriter.class);
private static final FileFormat BUFFER_FILE_FORMAT = FileFormat.AVRO;
+ public static final String DEFAULT_UPLOADED_REALTIME_SEGMENT_PREFIX =
"flink";
+
private final int _indexOfSubtask;
private TableConfig _tableConfig;
@@ -87,6 +90,9 @@ public class FlinkSegmentWriter implements SegmentWriter {
/** A sequence ID that increments each time a segment is flushed */
private int _seqId;
+ private final String _segmentNamePrefix;
+ private final long _segmentUploadTimeMs;
+
private org.apache.avro.Schema _avroSchema;
private DataFileWriter<GenericData.Record> _recordWriter;
private GenericData.Record _reusableRecord;
@@ -96,7 +102,15 @@ public class FlinkSegmentWriter implements SegmentWriter {
private transient volatile long _lastRecordProcessingTimeMs = 0;
public FlinkSegmentWriter(int indexOfSubtask, MetricGroup metricGroup) {
+ this(indexOfSubtask, metricGroup, null, null);
+ }
+
+ public FlinkSegmentWriter(int indexOfSubtask, MetricGroup metricGroup,
@Nullable String segmentNamePrefix,
+ @Nullable Long segmentUploadTimeMs) {
_indexOfSubtask = indexOfSubtask;
+ _segmentNamePrefix = segmentNamePrefix;
+ _segmentUploadTimeMs =
+ segmentUploadTimeMs == null ? System.currentTimeMillis() :
segmentUploadTimeMs;
registerMetrics(metricGroup);
}
@@ -124,12 +138,26 @@ public class FlinkSegmentWriter implements SegmentWriter {
"batchConfigMaps must contain only 1 BatchConfig for table: %s",
_tableNameWithType);
Map<String, String> batchConfigMap =
_batchIngestionConfig.getBatchConfigMaps().get(0);
+ batchConfigMap.put(BatchConfigProperties.SEGMENT_PARTITION_ID,
Integer.toString(_indexOfSubtask));
+ batchConfigMap.put(BatchConfigProperties.SEGMENT_UPLOAD_TIME_MS,
Long.toString(_segmentUploadTimeMs));
+ batchConfigMap.computeIfAbsent(
+ BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX + "." +
BatchConfigProperties.SEGMENT_NAME_PREFIX,
+ key -> StringUtils.isNotBlank(_segmentNamePrefix) ? _segmentNamePrefix
+ : DEFAULT_UPLOADED_REALTIME_SEGMENT_PREFIX);
+
+ // generate segment name for simple segment name generator type(non upsert
tables)
String segmentNamePostfixProp = String.format("%s.%s",
BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX,
BatchConfigProperties.SEGMENT_NAME_POSTFIX);
String segmentSuffix = batchConfigMap.get(segmentNamePostfixProp);
segmentSuffix = segmentSuffix == null ? String.valueOf(_indexOfSubtask) :
segmentSuffix + "_" + _indexOfSubtask;
batchConfigMap.put(segmentNamePostfixProp, segmentSuffix);
+ // For upsert tables must use the UploadedRealtimeSegmentName for right
assignment of segments
+ if (_tableConfig.isUpsertEnabled()) {
+ batchConfigMap.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE,
+ BatchConfigProperties.SegmentNameGeneratorType.UPLOADED_REALTIME);
+ }
+
_batchConfig = new BatchConfig(_tableNameWithType, batchConfigMap);
Preconditions.checkState(StringUtils.isNotBlank(_batchConfig.getOutputDirURI()),
@@ -222,6 +250,7 @@ public class FlinkSegmentWriter implements SegmentWriter {
batchConfigMapOverride.put(BatchConfigProperties.INPUT_DIR_URI,
_bufferFile.getAbsolutePath());
batchConfigMapOverride.put(BatchConfigProperties.OUTPUT_DIR_URI,
segmentDir.getAbsolutePath());
batchConfigMapOverride.put(BatchConfigProperties.INPUT_FORMAT,
BUFFER_FILE_FORMAT.toString());
+ batchConfigMapOverride.put(BatchConfigProperties.SEQUENCE_ID,
Integer.toString(_seqId));
BatchIngestionConfig batchIngestionConfig = new
BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride),
_batchIngestionConfig.getSegmentIngestionType(),
_batchIngestionConfig.getSegmentIngestionFrequency(),
_batchIngestionConfig.getConsistentDataPush());
@@ -229,7 +258,6 @@ public class FlinkSegmentWriter implements SegmentWriter {
// Build segment
SegmentGeneratorConfig segmentGeneratorConfig =
IngestionUtils.generateSegmentGeneratorConfig(_tableConfig, _schema,
batchIngestionConfig);
- segmentGeneratorConfig.setSequenceId(_seqId);
String segmentName = IngestionUtils.buildSegment(segmentGeneratorConfig);
LOGGER.info("Successfully built segment: {} of sequence {} for Pinot
table: {}", segmentName, _seqId,
_tableNameWithType);
diff --git
a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java
b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java
index a868a95b22..e7204ef6b6 100644
---
a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java
+++
b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java
@@ -22,6 +22,7 @@ import java.net.URI;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -57,8 +58,13 @@ public class PinotSinkFunction<T> extends
RichSinkFunction<T> implements Checkpo
private final PinotGenericRowConverter<T> _recordConverter;
- private TableConfig _tableConfig;
- private Schema _schema;
+ private final TableConfig _tableConfig;
+ private final Schema _schema;
+
+ @Nullable private final String _segmentNamePrefix;
+
+ // Used to set upload time in segment name, if not provided, current time is
used
+ @Nullable private final Long _segmentUploadTimeMs;
private transient SegmentWriter _segmentWriter;
private transient SegmentUploader _segmentUploader;
@@ -71,18 +77,27 @@ public class PinotSinkFunction<T> extends
RichSinkFunction<T> implements Checkpo
public PinotSinkFunction(PinotGenericRowConverter<T> recordConverter,
TableConfig tableConfig, Schema schema,
long segmentFlushMaxNumRecords, int executorPoolSize) {
+ this(recordConverter, tableConfig, schema, segmentFlushMaxNumRecords,
executorPoolSize, null, null);
+ }
+
+ public PinotSinkFunction(PinotGenericRowConverter<T> recordConverter,
TableConfig tableConfig, Schema schema,
+ long segmentFlushMaxNumRecords, int executorPoolSize, @Nullable String
segmentNamePrefix,
+ @Nullable Long segmentUploadTimeMs) {
_recordConverter = recordConverter;
_tableConfig = tableConfig;
_schema = schema;
_segmentFlushMaxNumRecords = segmentFlushMaxNumRecords;
_executorPoolSize = executorPoolSize;
+ _segmentNamePrefix = segmentNamePrefix;
+ _segmentUploadTimeMs = segmentUploadTimeMs;
}
@Override
public void open(Configuration parameters)
throws Exception {
int indexOfSubtask = this.getRuntimeContext().getIndexOfThisSubtask();
- _segmentWriter = new FlinkSegmentWriter(indexOfSubtask,
getRuntimeContext().getMetricGroup());
+ _segmentWriter = new FlinkSegmentWriter(indexOfSubtask,
getRuntimeContext().getMetricGroup(), _segmentNamePrefix,
+ _segmentUploadTimeMs);
_segmentWriter.init(_tableConfig, _schema);
_segmentUploader = new SegmentUploaderDefault();
_segmentUploader.init(_tableConfig);
@@ -119,7 +134,7 @@ public class PinotSinkFunction<T> extends
RichSinkFunction<T> implements Checkpo
throws Exception {
_segmentWriter.collect(_recordConverter.convertToRow(value));
_segmentNumRecord++;
- if (_segmentNumRecord > _segmentFlushMaxNumRecords) {
+ if (_segmentNumRecord >= _segmentFlushMaxNumRecords) {
flush();
}
}
diff --git
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTestUpsertTable.java
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTestUpsertTable.java
new file mode 100644
index 0000000000..7b190ca384
--- /dev/null
+++
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTestUpsertTable.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.flink.sink;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.pinot.connector.flink.common.FlinkRowGenericRowConverter;
+import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class PinotSinkIntegrationTestUpsertTable extends
BaseClusterIntegrationTest {
+
+ private List<Row> _data;
+ public RowTypeInfo _typeInfo;
+ public TableConfig _tableConfig;
+ public Schema _schema;
+ private String _rawTableName;
+
+ protected PinotTaskManager _taskManager;
+ protected PinotHelixTaskResourceManager _helixTaskResourceManager;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ // Start the Pinot cluster
+ startZk();
+ // Start a customized controller with more frequent realtime segment
validation
+ startController();
+ startBroker();
+ startServers(2);
+ startMinion();
+
+ // Start Kafka and push data into Kafka
+ startKafka();
+
+ // Push data to Kafka and set up table
+ setupTable(getTableName(), getKafkaTopic(), "gameScores_csv.tar.gz", null);
+
+ // Wait for all documents loaded
+ waitForAllDocsLoaded(60_000L);
+ assertEquals(getCurrentCountStarResult(), getCountStarResult());
+
+ // Create partial upsert table schema
+ Schema partialUpsertSchema = createSchema("upsert_table_test.schema");
+ addSchema(partialUpsertSchema);
+ _taskManager = _controllerStarter.getTaskManager();
+ _helixTaskResourceManager =
_controllerStarter.getHelixTaskResourceManager();
+
+ _schema = createSchema();
+ _rawTableName = getTableName();
+ _tableConfig = createCSVUpsertTableConfig(_rawTableName, getKafkaTopic(),
getNumKafkaPartitions(),
+ getCSVDecoderProperties(",",
"playerId,name,game,score,timestampInEpoch,deleted"), null, "playerId");
+ _typeInfo = new RowTypeInfo(
+ new TypeInformation[]{Types.INT, Types.STRING, Types.STRING,
Types.FLOAT, Types.LONG, Types.BOOLEAN},
+ new String[]{"playerId", "name", "game", "score", "timestampInEpoch",
"deleted"});
+ _data = Arrays.asList(Row.of(1, "Alice", "Football", 55F, 1571037400000L,
false),
+ Row.of(2, "BOB", "Tennis", 100F, 1571038400000L, false),
+ Row.of(3, "Carl", "Cricket", 100F, 1571039400000L, false),
+ Row.of(2, "BOB", "Badminton", 100F, 1571040400000L, false));
+
+ Map<String, String> batchConfigs = new HashMap<>();
+ batchConfigs.put(BatchConfigProperties.OUTPUT_DIR_URI,
_tarDir.getAbsolutePath());
+ batchConfigs.put(BatchConfigProperties.OVERWRITE_OUTPUT, "false");
+ batchConfigs.put(BatchConfigProperties.PUSH_CONTROLLER_URI,
getControllerBaseApiUrl());
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setBatchIngestionConfig(
+ new BatchIngestionConfig(Collections.singletonList(batchConfigs),
"APPEND", "HOURLY"));
+
+ _tableConfig.setIngestionConfig(ingestionConfig);
+ }
+
+ @Test
+ public void testPinotSinkWrite()
+ throws Exception {
+ Configuration config = new Configuration();
+ config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT,
+ Duration.ofMillis(120000)); // Set heartbeat timeout to 60 seconds
+ StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment(config);
+
+ // Single-thread write
+ execEnv.setParallelism(1);
+ DataStream<Row> srcDs = execEnv.fromCollection(_data).returns(_typeInfo);
+ srcDs.addSink(new PinotSinkFunction<>(new
FlinkRowGenericRowConverter(_typeInfo), _tableConfig, _schema,
+ PinotSinkFunction.DEFAULT_SEGMENT_FLUSH_MAX_NUM_RECORDS,
PinotSinkFunction.DEFAULT_EXECUTOR_POOL_SIZE, "batch",
+ 1724045185L));
+ execEnv.execute();
+ // 1 uploaded, 2 realtime in progress segment
+ verifySegments(3, 4);
+
+ // Parallel write with partitioned segments
+ execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ execEnv.setParallelism(2);
+
+ srcDs = execEnv.fromCollection(_data).returns(_typeInfo)
+ .partitionCustom((Partitioner<Integer>) (key, partitions) -> key %
partitions, r -> (Integer) r.getField(0));
+ srcDs.addSink(new PinotSinkFunction<>(new
FlinkRowGenericRowConverter(_typeInfo), _tableConfig, _schema,
+ PinotSinkFunction.DEFAULT_SEGMENT_FLUSH_MAX_NUM_RECORDS,
PinotSinkFunction.DEFAULT_EXECUTOR_POOL_SIZE, "batch",
+ 1724045186L));
+ execEnv.execute();
+ verifySegments(5, 8);
+
+ // Generate next sequence segments in partitioned segments
+ srcDs = execEnv.fromCollection(_data).returns(_typeInfo)
+ .partitionCustom((Partitioner<Integer>) (key, partitions) -> key %
partitions, r -> (Integer) r.getField(0));
+ srcDs.addSink(new PinotSinkFunction<>(new
FlinkRowGenericRowConverter(_typeInfo), _tableConfig, _schema, 1,
+ PinotSinkFunction.DEFAULT_EXECUTOR_POOL_SIZE, "batch", 1724045187L));
+ execEnv.execute();
+ verifySegments(9, 12);
+ verifyContainsSegments(Arrays.asList("batch__mytable__0__1724045187__0",
"batch__mytable__0__1724045187__1",
+ "batch__mytable__1__1724045187__0",
"batch__mytable__1__1724045187__1"));
+ }
+
+ private void verifyContainsSegments(List<String> segmentsToCheck)
+ throws IOException {
+ String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(_rawTableName);
+ JsonNode segments = JsonUtils.stringToJsonNode(sendGetRequest(
+ _controllerRequestURLBuilder.forSegmentListAPI(tableNameWithType,
TableType.REALTIME.toString()))).get(0)
+ .get("REALTIME");
+ Set<String> segmentNames = new HashSet<>();
+ for (int i = 0; i < segments.size(); i++) {
+ String segmentName = segments.get(i).asText();
+ segmentNames.add(segmentName);
+ }
+
+ for (String segmentName : segmentsToCheck) {
+ assertTrue(segmentNames.contains(segmentName));
+ }
+ }
+
+ private void verifySegments(int numSegments, int numTotalDocs)
+ throws IOException {
+ String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(_rawTableName);
+ JsonNode segments = JsonUtils.stringToJsonNode(sendGetRequest(
+ _controllerRequestURLBuilder.forSegmentListAPI(tableNameWithType,
TableType.REALTIME.toString()))).get(0)
+ .get("REALTIME");
+ assertEquals(segments.size(), numSegments);
+ int actualNumTotalDocs = 0;
+ // count docs in completed segments only
+ for (int i = 0; i < numSegments; i++) {
+ String segmentName = segments.get(i).asText();
+ JsonNode segmentMetadata = JsonUtils.stringToJsonNode(
+
sendGetRequest(_controllerRequestURLBuilder.forSegmentMetadata(tableNameWithType,
segmentName)));
+ if
(segmentMetadata.get("segment.realtime.status").asText().equals("IN_PROGRESS"))
{
+ continue;
+ }
+ actualNumTotalDocs += segmentMetadata.get("segment.total.docs").asInt();
+ }
+ assertEquals(actualNumTotalDocs, numTotalDocs);
+ }
+
+ private TableConfig setupTable(String tableName, String kafkaTopicName,
String inputDataFile,
+ UpsertConfig upsertConfig)
+ throws Exception {
+ // SETUP
+ // Create table with delete Record column
+ Map<String, String> csvDecoderProperties =
+ getCSVDecoderProperties(",",
"playerId,name,game,score,timestampInEpoch,deleted");
+ Schema upsertSchema = createSchema();
+ upsertSchema.setSchemaName(tableName);
+ addSchema(upsertSchema);
+ TableConfig tableConfig =
+ createCSVUpsertTableConfig(tableName, kafkaTopicName,
getNumKafkaPartitions(), csvDecoderProperties,
+ upsertConfig, "playerId");
+ addTableConfig(tableConfig);
+
+ // Push initial 10 upsert records - 3 pks 100, 101 and 102
+ List<File> dataFiles = unpackTarData(inputDataFile, _tempDir);
+ pushCsvIntoKafka(dataFiles.get(0), kafkaTopicName, 0);
+
+ return tableConfig;
+ }
+
+ @Override
+ protected String getSchemaFileName() {
+ return "upsert_table_test.schema";
+ }
+
+ @Override
+ protected String getTimeColumnName() {
+ return "timestampInEpoch";
+ }
+
+ @Override
+ protected long getCountStarResult() {
+ return 3;
+ }
+}
diff --git
a/pinot-connectors/pinot-flink-connector/src/test/resources/gameScores_csv.tar.gz
b/pinot-connectors/pinot-flink-connector/src/test/resources/gameScores_csv.tar.gz
new file mode 100644
index 0000000000..48dc5163c6
Binary files /dev/null and
b/pinot-connectors/pinot-flink-connector/src/test/resources/gameScores_csv.tar.gz
differ
diff --git
a/pinot-connectors/pinot-flink-connector/src/test/resources/upsert_table_test.schema
b/pinot-connectors/pinot-flink-connector/src/test/resources/upsert_table_test.schema
new file mode 100644
index 0000000000..c0bcebfaae
--- /dev/null
+++
b/pinot-connectors/pinot-flink-connector/src/test/resources/upsert_table_test.schema
@@ -0,0 +1,34 @@
+{
+ "schemaName": "playerScores",
+ "dimensionFieldSpecs": [
+ {
+ "name": "playerId",
+ "dataType": "INT"
+ },
+ {
+ "name": "name",
+ "dataType": "STRING"
+ },
+ {
+ "name": "game",
+ "dataType": "STRING"
+ },
+ {
+ "name": "deleted",
+ "dataType": "BOOLEAN"
+ }
+ ],
+ "metricFieldSpecs": [
+ {
+ "name": "score",
+ "dataType": "FLOAT"
+ }
+ ],
+ "dateTimeFieldSpecs": [{
+ "name": "timestampInEpoch",
+ "dataType": "LONG",
+ "format" : "1:MILLISECONDS:EPOCH",
+ "granularity": "1:MILLISECONDS"
+ }],
+ "primaryKeyColumns": [ "playerId" ]
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
index 556b025855..d34fd20cb5 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
@@ -41,6 +41,7 @@ import
org.apache.pinot.segment.spi.creator.name.FixedSegmentNameGenerator;
import
org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator;
import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator;
import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator;
+import
org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
@@ -169,6 +170,16 @@ public final class IngestionUtils {
case BatchConfigProperties.SegmentNameGeneratorType.SIMPLE:
return new SimpleSegmentNameGenerator(rawTableName,
batchConfig.getSegmentNamePostfix(),
batchConfig.isAppendUUIDToSegmentName(),
batchConfig.isExcludeTimeInSegmentName());
+ case BatchConfigProperties.SegmentNameGeneratorType.UPLOADED_REALTIME:
+ int uploadedRealtimePartitionId;
+ try {
+ uploadedRealtimePartitionId =
Integer.parseInt(batchConfig.getSegmentPartitionId());
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ String.format("Invalid segment partition id: %s",
batchConfig.getSegmentPartitionId()));
+ }
+ return new UploadedRealtimeSegmentNameGenerator(rawTableName,
uploadedRealtimePartitionId,
+ batchConfig.getSegmentUploadTimeMs(),
batchConfig.getSegmentNamePrefix(), batchConfig.getSequenceId());
default:
throw new IllegalStateException(String
.format("Unsupported segmentNameGeneratorType: %s for table: %s",
segmentNameGeneratorType,
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
index 088bdaa390..e87c85dea8 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
@@ -52,7 +52,9 @@ public class BatchConfig {
private final boolean _excludeSequenceId;
private final boolean _appendUUIDToSegmentName;
private final boolean _excludeTimeInSegmentName;
+ private final String _segmentPartitionId;
private final String _sequenceId;
+ private final long _segmentUploadTimeMs;
private final String _pushMode;
private final int _pushAttempts;
@@ -97,12 +99,14 @@ public class BatchConfig {
_segmentNamePrefix =
segmentNameGeneratorProps.get(BatchConfigProperties.SEGMENT_NAME_PREFIX);
_segmentNamePostfix =
segmentNameGeneratorProps.get(BatchConfigProperties.SEGMENT_NAME_POSTFIX);
_excludeSequenceId =
Boolean.parseBoolean(segmentNameGeneratorProps.get(BatchConfigProperties.EXCLUDE_SEQUENCE_ID));
+ _segmentPartitionId =
batchConfigsMap.get(BatchConfigProperties.SEGMENT_PARTITION_ID);
_sequenceId = batchConfigsMap.get(BatchConfigProperties.SEQUENCE_ID);
_appendUUIDToSegmentName =
Boolean.parseBoolean(segmentNameGeneratorProps.get(BatchConfigProperties.APPEND_UUID_TO_SEGMENT_NAME));
_excludeTimeInSegmentName =
Boolean.parseBoolean(segmentNameGeneratorProps.get(BatchConfigProperties.EXCLUDE_TIME_IN_SEGMENT_NAME));
-
+ _segmentUploadTimeMs =
Long.parseLong(batchConfigsMap.getOrDefault(BatchConfigProperties.SEGMENT_UPLOAD_TIME_MS,
+ String.valueOf(System.currentTimeMillis())));
_pushMode = IngestionConfigUtils.getPushMode(batchConfigsMap);
_pushAttempts = IngestionConfigUtils.getPushAttempts(batchConfigsMap);
_pushParallelism =
IngestionConfigUtils.getPushParallelism(batchConfigsMap);
@@ -185,10 +189,18 @@ public class BatchConfig {
return _excludeSequenceId;
}
+ public String getSegmentPartitionId() {
+ return _segmentPartitionId;
+ }
+
public String getSequenceId() {
return _sequenceId;
}
+ public long getSegmentUploadTimeMs() {
+ return _segmentUploadTimeMs;
+ }
+
public boolean isAppendUUIDToSegmentName() {
return _appendUUIDToSegmentName;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
index efb11bc633..5672335254 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
@@ -47,6 +47,8 @@ public class BatchConfigProperties {
public static final String SEGMENT_NAME_GENERATOR_TYPE =
"segmentNameGenerator.type";
public static final String SEGMENT_NAME_GENERATOR_PROP_PREFIX =
"segmentNameGenerator.configs";
public static final String SEGMENT_NAME = "segment.name";
+ public static final String SEGMENT_PARTITION_ID = "segment.partitionId";
+ public static final String SEGMENT_UPLOAD_TIME_MS = "segment.uploadTimeMs";
public static final String SEGMENT_NAME_PREFIX = "segment.name.prefix";
public static final String SEGMENT_NAME_POSTFIX = "segment.name.postfix";
public static final String EXCLUDE_SEQUENCE_ID = "exclude.sequence.id";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]