This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3d46edb089 Kinesis partition split fixes (#15563)
3d46edb089 is described below
commit 3d46edb089325860a4c1d1f005dfb2d74139539f
Author: Krishan Goyal <[email protected]>
AuthorDate: Tue Apr 29 13:47:14 2025 +0530
Kinesis partition split fixes (#15563)
* Initial fixes to fix issues related to kinesis partition split
* Refactor kinesis tests to make it easy to add more tests
* Created a test case for shard increase and fixed bug related to end of
consumption
* Added more tests to test split / merge combinations with pause / resume /
RVM triggers on a old / new table
* Checkstlye fixes
* Small refactors and attempting to see if we can consume from ZK offset
always
* Fix kafka regression with a workaround flag.
* Add overrriden function for test case
* Add more testing around largest offset and concurrent pause / resume
functionality
* Avoid overridding pulsar behaviour to continue with current behaviour for
now
* Improving some documentation
* Address PR comments
* Retry message fetch outside kinesis consumer
* Checkstyle fixes
---
.../helix/core/PinotTableIdealStateBuilder.java | 7 +-
.../realtime/MissingConsumingSegmentFinder.java | 2 +-
.../realtime/PinotLLCRealtimeSegmentManager.java | 39 +-
.../pinot/controller/helix/ControllerTest.java | 88 +++++
.../PinotLLCRealtimeSegmentManagerTest.java | 7 +
.../realtime/RealtimeSegmentDataManager.java | 5 +-
.../tests/BaseClusterIntegrationTest.java | 10 +-
.../ingestion/BaseKinesisIntegrationTest.java | 236 ++++++++++++
...aIncreaseDecreasePartitionsIntegrationTest.java | 67 +---
.../realtime/ingestion/KinesisShardChangeTest.java | 425 +++++++++++++++++++++
.../ingestion}/RealtimeKinesisIntegrationTest.java | 212 +---------
.../realtime/ingestion/utils/KinesisUtils.java | 112 ++++++
.../plugin/stream/kinesis/KinesisConsumer.java | 11 +
.../kinesis/KinesisStreamMetadataProvider.java | 42 +-
.../kinesis/KinesisStreamMetadataProviderTest.java | 32 ++
.../FreshnessBasedConsumptionStatusChecker.java | 12 +-
.../IngestionBasedConsumptionStatusChecker.java | 21 +
.../helix/OffsetBasedConsumptionStatusChecker.java | 21 +-
.../spi/stream/PartitionGroupMetadataFetcher.java | 9 +-
.../pinot/spi/stream/StreamMetadataProvider.java | 21 +
.../utils/builder/ControllerRequestURLBuilder.java | 5 +
21 files changed, 1090 insertions(+), 294 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 8895d9df50..244f7853d8 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -85,11 +85,12 @@ public class PinotTableIdealStateBuilder {
* partition groups.
* The size of this list is equal
to the number of partition groups,
* and is created using the latest
segment zk metadata.
+ * @param forceGetOffsetFromStream - details in
PinotLLCRealtimeSegmentManager.fetchPartitionGroupIdToSmallestOffset
*/
public static List<PartitionGroupMetadata>
getPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
- List<PartitionGroupConsumptionStatus>
partitionGroupConsumptionStatusList) {
- PartitionGroupMetadataFetcher partitionGroupMetadataFetcher =
- new PartitionGroupMetadataFetcher(streamConfigs,
partitionGroupConsumptionStatusList);
+ List<PartitionGroupConsumptionStatus>
partitionGroupConsumptionStatusList, boolean forceGetOffsetFromStream) {
+ PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = new
PartitionGroupMetadataFetcher(
+ streamConfigs, partitionGroupConsumptionStatusList,
forceGetOffsetFromStream);
try {
DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher);
return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
index 5fe2ffe6d6..efc43246b7 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
@@ -81,7 +81,7 @@ public class MissingConsumingSegmentFinder {
return streamConfig;
});
try {
- PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs,
Collections.emptyList())
+ PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs,
Collections.emptyList(), false)
.forEach(metadata -> {
_partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(),
metadata.getStartOffset());
});
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index cead9ddfd4..765b25852a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -1001,6 +1001,9 @@ public class PinotLLCRealtimeSegmentManager {
partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream()
.map(partitionId ->
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId,
index))
.collect(Collectors.toSet()));
+ } catch (UnsupportedOperationException ignored) {
+ allPartitionIdsFetched = false;
+ // Stream does not support fetching partition ids. There is a log in
the fallback code which is sufficient
} catch (Exception e) {
allPartitionIdsFetched = false;
LOGGER.warn("Failed to fetch partition ids for stream: {}",
streamConfigs.get(i).getTopicName(), e);
@@ -1035,7 +1038,20 @@ public class PinotLLCRealtimeSegmentManager {
List<PartitionGroupMetadata>
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList) {
return
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs,
- currentPartitionGroupConsumptionStatusList);
+ currentPartitionGroupConsumptionStatusList, false);
+ }
+
+ /**
+ * Fetches the latest state of the PartitionGroups for the stream
+ * If any partition has reached end of life, and all messages of that
partition have been consumed by the segment,
+ * it will be skipped from the result
+ */
+ @VisibleForTesting
+ List<PartitionGroupMetadata>
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList,
+ boolean forceGetOffsetFromStream) {
+ return
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs,
+ currentPartitionGroupConsumptionStatusList, forceGetOffsetFromStream);
}
/**
@@ -1460,7 +1476,7 @@ public class PinotLLCRealtimeSegmentManager {
}
// Create a map from partition id to the smallest stream offset
Map<Integer, StreamPartitionMsgOffset> partitionIdToSmallestOffset = null;
- if (offsetCriteria == OffsetCriteria.SMALLEST_OFFSET_CRITERIA) {
+ if (offsetCriteria != null &&
offsetCriteria.equals(OffsetCriteria.SMALLEST_OFFSET_CRITERIA)) {
partitionIdToSmallestOffset = partitionIdToStartOffset;
}
@@ -1553,11 +1569,13 @@ public class PinotLLCRealtimeSegmentManager {
// Smallest offset is fetched from stream once and cached in
partitionIdToSmallestOffset.
if (partitionIdToSmallestOffset == null) {
- partitionIdToSmallestOffset =
fetchPartitionGroupIdToSmallestOffset(streamConfigs);
+ partitionIdToSmallestOffset =
fetchPartitionGroupIdToSmallestOffset(streamConfigs, idealState);
}
// Do not create new CONSUMING segment when the stream partition has
reached end of life.
if (!partitionIdToSmallestOffset.containsKey(partitionId)) {
+ LOGGER.info("PartitionGroup: {} has reached end of life. Skipping
creation of new segment {}",
+ partitionId, latestSegmentName);
continue;
}
@@ -1651,13 +1669,24 @@ public class PinotLLCRealtimeSegmentManager {
}
private Map<Integer, StreamPartitionMsgOffset>
fetchPartitionGroupIdToSmallestOffset(
- List<StreamConfig> streamConfigs) {
+ List<StreamConfig> streamConfigs, IdealState idealState) {
Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset =
new HashMap<>();
for (StreamConfig streamConfig : streamConfigs) {
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
+ getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
+
+ // Kinesis shard-split flow requires us to pass
currentPartitionGroupConsumptionStatusList so that
+ // we can check if its completely consumed
+ // However the kafka implementation of computePartitionGroupMetadata()
breaks if we pass the current status
+ // This leads to streamSmallestOffset set to null in selectStartOffset()
method
+ // The overall dependency isn't clean and is causing the issue and
requires refactor
+ // Temporarily, we are passing a boolean flag to indicate if we want to
use the current status
+ // The kafka implementation of computePartitionGroupMetadata() will
ignore the current status
+ // while the kinesis implementation will use it.
List<PartitionGroupMetadata> partitionGroupMetadataList =
- getNewPartitionGroupMetadataList(streamConfigs,
Collections.emptyList());
+ getNewPartitionGroupMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList, true);
streamConfig.setOffsetCriteria(originalOffsetCriteria);
for (PartitionGroupMetadata metadata : partitionGroupMetadataList) {
partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(),
metadata.getStartOffset());
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index e92a185985..a5b05031c6 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
@@ -65,6 +66,8 @@ import org.apache.pinot.controller.BaseControllerStarter;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.ControllerStarter;
import org.apache.pinot.controller.api.access.AllowAllAccessFactory;
+import org.apache.pinot.controller.api.resources.PauseStatusDetails;
+import org.apache.pinot.controller.api.resources.TableViews;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -76,6 +79,7 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -91,6 +95,9 @@ import static org.testng.Assert.*;
public class ControllerTest {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ControllerTest.class);
+
public static final String LOCAL_HOST = "localhost";
public static final String DEFAULT_DATA_DIR = new
File(FileUtils.getTempDirectoryPath(),
"test-controller-data-dir" +
System.currentTimeMillis()).getAbsolutePath();
@@ -824,6 +831,87 @@ public class ControllerTest {
}
}
+ public void runRealtimeSegmentValidationTask(String tableName)
+ throws IOException {
+ runPeriodicTask("RealtimeSegmentValidationManager", tableName,
TableType.REALTIME);
+ }
+
+ public void runPeriodicTask(String taskName, String tableName, TableType
tableType)
+ throws IOException {
+ sendGetRequest(getControllerRequestURLBuilder().forPeriodTaskRun(taskName,
tableName, tableType));
+ }
+
+ public void pauseTable(String tableName)
+ throws IOException {
+
sendPostRequest(getControllerRequestURLBuilder().forPauseConsumption(tableName));
+ TestUtils.waitForCondition((aVoid) -> {
+ try {
+ PauseStatusDetails pauseStatusDetails =
+
JsonUtils.stringToObject(sendGetRequest(getControllerRequestURLBuilder().forPauseStatus(tableName)),
+ PauseStatusDetails.class);
+ if (pauseStatusDetails.getConsumingSegments().isEmpty()) {
+ return true;
+ }
+ LOGGER.warn("Table not yet paused. Response " + pauseStatusDetails);
+ return false;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }, 2000, 60_000L, "Failed to pause table: " + tableName);
+ }
+
+ public void resumeTable(String tableName)
+ throws IOException {
+ resumeTable(tableName, "lastConsumed");
+ }
+
+ public void resumeTable(String tableName, String offsetCriteria)
+ throws IOException {
+
sendPostRequest(getControllerRequestURLBuilder().forResumeConsumption(tableName)
+ + "?consumeFrom=" + offsetCriteria);
+ TestUtils.waitForCondition((aVoid) -> {
+ try {
+ PauseStatusDetails pauseStatusDetails =
+
JsonUtils.stringToObject(sendGetRequest(getControllerRequestURLBuilder().forPauseStatus(tableName)),
+ PauseStatusDetails.class);
+ // Its possible no segment is in consuming state, so check pause flag
+ if (!pauseStatusDetails.getPauseFlag()) {
+ return true;
+ }
+ LOGGER.warn("Pause flag is not yet set to false. Response " +
pauseStatusDetails);
+ return false;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }, 2000, 60_000L, "Failed to resume table: " + tableName);
+ }
+
+ public void waitForNumSegmentsInDesiredStateInEV(String tableName, String
desiredState,
+ int desiredNumConsumingSegments, TableType type) {
+ TestUtils.waitForCondition((aVoid) -> {
+ try {
+ AtomicInteger numConsumingSegments = new AtomicInteger(0);
+ TableViews.TableView tableView = getExternalView(tableName, type);
+ Map<String, Map<String, String>> viewForType =
+ type.equals(TableType.OFFLINE) ? tableView._offline :
tableView._realtime;
+ viewForType.values().forEach((v) -> {
+ numConsumingSegments.addAndGet((int)
v.values().stream().filter((v1) -> v1.equals(desiredState)).count());
+ });
+ return numConsumingSegments.get() == desiredNumConsumingSegments;
+ } catch (IOException e) {
+ return false;
+ }
+ }, 5000, 60_000L,
+ "Failed to wait for " + desiredNumConsumingSegments + " consuming
segments for table: " + tableName
+ );
+ }
+
+ public TableViews.TableView getExternalView(String tableName, TableType type)
+ throws IOException {
+ String state =
sendGetRequest(getControllerRequestURLBuilder().forExternalView(tableName + "_"
+ type));
+ return JsonUtils.stringToObject(state, TableViews.TableView.class);
+ }
+
public static String sendGetRequest(String urlString)
throws IOException {
return sendGetRequest(urlString, null);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index a86cf62e2e..25c286d3a2 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -1832,6 +1832,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
}
}
+ @Override
+ List<PartitionGroupMetadata>
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList,
+ boolean forceGetOffsetFromStream) {
+ return getNewPartitionGroupMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList);
+ }
+
@Override
protected boolean isExceededMaxSegmentCompletionTime(String
realtimeTableName, String segmentName,
long currentTimeMs) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index d49b8484a6..ffda64c0c7 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -479,7 +479,10 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
messageBatch.getMessageCount(),
messageBatch.getUnfilteredMessageCount(),
messageBatch.isEndOfPartitionGroup());
}
- _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
+ // We need to check for both endOfPartitionGroup and messageCount ==
0, because
+ // endOfPartitionGroup can be true even if this is the last batch of
messages (has been observed for kinesis)
+ // To process the last batch of messages, we need to set
_endOfPartitionGroup to false in such a case
+ _endOfPartitionGroup = messageBatch.getMessageCount() == 0 &&
messageBatch.isEndOfPartitionGroup();
_consecutiveErrorCount = 0;
} catch (PermanentConsumerException e) {
_serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS,
1L);
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index f8e11e7c5f..3156df8f65 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -366,7 +366,12 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
*/
protected TableConfig createRealtimeTableConfig(File sampleAvroFile) {
AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile;
- return new TableConfigBuilder(TableType.REALTIME)
+ return getTableConfigBuilder(TableType.REALTIME).build();
+ }
+
+ // TODO - Use this method to create table config for all table types to
avoid redundant code
+ protected TableConfigBuilder getTableConfigBuilder(TableType tableType) {
+ return new TableConfigBuilder(tableType)
.setTableName(getTableName())
.setTimeColumnName(getTimeColumnName())
.setSortedColumn(getSortedColumn())
@@ -384,8 +389,7 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
.setIngestionConfig(getIngestionConfig())
.setQueryConfig(getQueryConfig())
.setStreamConfigs(getStreamConfigs())
- .setNullHandlingEnabled(getNullHandlingEnabled())
- .build();
+ .setNullHandlingEnabled(getNullHandlingEnabled());
}
/**
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/BaseKinesisIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/BaseKinesisIntegrationTest.java
new file mode 100644
index 0000000000..7842230084
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/BaseKinesisIntegrationTest.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.realtime.ingestion;
+
+import cloud.localstack.Localstack;
+import cloud.localstack.ServiceName;
+import cloud.localstack.docker.annotation.LocalstackDockerAnnotationProcessor;
+import cloud.localstack.docker.annotation.LocalstackDockerConfiguration;
+import cloud.localstack.docker.annotation.LocalstackDockerProperties;
+import cloud.localstack.docker.command.Command;
+import java.io.File;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
+import
org.apache.pinot.integration.tests.realtime.ingestion.utils.KinesisUtils;
+import org.apache.pinot.plugin.stream.kinesis.KinesisConfig;
+import org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.SkipException;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.apache.ApacheSdkHttpService;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.utils.AttributeMap;
+
+
+/**
+ * Creates all dependencies (docker image, kinesis server, kinesis client,
configs) for all tests requiring kinesis
+ */
+@LocalstackDockerProperties(services = {ServiceName.KINESIS}, imageTag =
BaseKinesisIntegrationTest.LOCALSTACK_IMAGE)
+abstract class BaseKinesisIntegrationTest extends BaseClusterIntegrationTest {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BaseKinesisIntegrationTest.class);
+
+ static final String LOCALSTACK_IMAGE = "2.3.2";
+ private static final LocalstackDockerAnnotationProcessor PROCESSOR = new
LocalstackDockerAnnotationProcessor();
+ private final Localstack _localstackDocker = Localstack.INSTANCE;
+ protected KinesisClient _kinesisClient;
+
+ private static final String REGION = "us-east-1";
+ private static final String LOCALSTACK_KINESIS_ENDPOINT =
"http://localhost:4566";
+ protected static final String STREAM_TYPE = "kinesis";
+ protected static final String STREAM_NAME = "kinesis-test";
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ try {
+ DockerInfoCommand dockerInfoCommand = new DockerInfoCommand();
+ dockerInfoCommand.execute();
+ } catch (IllegalStateException e) {
+ LOGGER.warn("Skipping kinesis tests! Docker is not found running", e);
+ throw new SkipException(e.getMessage());
+ }
+
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ // Start the Pinot cluster
+ startZk();
+ startController();
+ startBroker();
+ startServer();
+
+ startKinesis();
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ stopServer();
+ stopBroker();
+ stopController();
+ stopZk();
+ stopKinesis();
+ FileUtils.deleteDirectory(_tempDir);
+ }
+
+ protected void createStream(int numShards) {
+ LOGGER.warn("Stream " + STREAM_NAME + " being created");
+
_kinesisClient.createStream(CreateStreamRequest.builder().streamName(STREAM_NAME).shardCount(numShards).build());
+
+ TestUtils.waitForCondition(aVoid ->
+ KinesisUtils.isKinesisStreamActive(_kinesisClient, STREAM_NAME),
2000L, 60000,
+ "Kinesis stream " + STREAM_NAME + " is not created or is not in active
state", true);
+ }
+
+ protected void deleteStream() {
+ try {
+
_kinesisClient.deleteStream(DeleteStreamRequest.builder().streamName(STREAM_NAME).build());
+ } catch (ResourceNotFoundException ignored) {
+ return;
+ }
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ KinesisUtils.getKinesisStreamStatus(_kinesisClient, STREAM_NAME);
+ } catch (ResourceNotFoundException e) {
+ return true;
+ }
+ return false;
+ }, 2000L, 60000,
+ "Kinesis stream " + STREAM_NAME + " is not deleted", true);
+
+ LOGGER.warn("Stream " + STREAM_NAME + " deleted");
+ }
+
+ protected PutRecordResponse putRecord(String data, String partitionKey) {
+ PutRecordRequest putRecordRequest =
+
PutRecordRequest.builder().streamName(STREAM_NAME).data(SdkBytes.fromUtf8String(data))
+ .partitionKey(partitionKey).build();
+ return _kinesisClient.putRecord(putRecordRequest);
+ }
+
+ @Override
+ public Map<String, String> getStreamConfigs() {
+ Map<String, String> streamConfigMap = new HashMap<>();
+ String streamType = STREAM_TYPE;
+ streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
+
+
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
+ StreamConfigProperties.STREAM_TOPIC_NAME), STREAM_NAME);
+
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
+ StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS), "30000");
+
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
+ StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
KinesisConsumerFactory.class.getName());
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
StreamConfigProperties.STREAM_DECODER_CLASS),
+ "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder");
+ streamConfigMap.put(KinesisConfig.REGION, REGION);
+ streamConfigMap.put(KinesisConfig.SHARD_ITERATOR_TYPE,
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString());
+ streamConfigMap.put(KinesisConfig.ENDPOINT, LOCALSTACK_KINESIS_ENDPOINT);
+ streamConfigMap.put(KinesisConfig.ACCESS_KEY,
getLocalAWSCredentials().resolveCredentials().accessKeyId());
+ streamConfigMap.put(KinesisConfig.SECRET_KEY,
getLocalAWSCredentials().resolveCredentials().secretAccessKey());
+ streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS,
Integer.toString(2000));
+
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+ StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), "smallest");
+ return streamConfigMap;
+ }
+
+ @Override
+ public TableConfig createRealtimeTableConfig(File sampleAvroFile) {
+ // Calls the super class to create the table config.
+ // Properties like stream configs are overriden in the getStreamConfigs()
method.
+ return super.createRealtimeTableConfig(sampleAvroFile);
+ }
+
+ private void stopKinesis() {
+ if (_kinesisClient != null) {
+ _kinesisClient.close();
+ }
+ if (_localstackDocker.isRunning()) {
+ _localstackDocker.stop();
+ }
+ }
+
+ private void startKinesis()
+ throws Exception {
+ LocalstackDockerConfiguration dockerConfig =
PROCESSOR.process(this.getClass());
+ StopAllLocalstackDockerCommand stopAllLocalstackDockerCommand = new
StopAllLocalstackDockerCommand();
+ stopAllLocalstackDockerCommand.execute();
+ _localstackDocker.startup(dockerConfig);
+
+ _kinesisClient = KinesisClient.builder().httpClient(new
ApacheSdkHttpService().createHttpClientBuilder()
+ .buildWithDefaults(
+
AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
Boolean.TRUE).build()))
+
.credentialsProvider(getLocalAWSCredentials()).region(Region.of(REGION))
+ .endpointOverride(new URI(LOCALSTACK_KINESIS_ENDPOINT)).build();
+ }
+
+ private static class StopAllLocalstackDockerCommand extends Command {
+
+ public void execute() {
+ String runningDockerContainers =
+ dockerExe.execute(
+ Arrays.asList("ps", "-a", "-q", "-f",
"ancestor=localstack/localstack:" + LOCALSTACK_IMAGE));
+ if (StringUtils.isNotBlank(runningDockerContainers) &&
!runningDockerContainers.toLowerCase().contains("error")) {
+ String[] containerList = runningDockerContainers.split("\n");
+
+ for (String containerId : containerList) {
+ dockerExe.execute(Arrays.asList("stop", containerId));
+ }
+ }
+ }
+ }
+
+ private static class DockerInfoCommand extends Command {
+
+ public void execute() {
+ String dockerInfo = dockerExe.execute(Collections.singletonList("info"));
+
+ if (dockerInfo.toLowerCase().contains("error")) {
+ throw new IllegalStateException("Docker daemon is not running!");
+ }
+ }
+ }
+
+ private static AwsCredentialsProvider getLocalAWSCredentials() {
+ return
StaticCredentialsProvider.create(AwsBasicCredentials.create("access",
"secret"));
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaIncreaseDecreasePartitionsIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/KafkaIncreaseDecreasePartitionsIntegrationTest.java
similarity index 54%
rename from
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaIncreaseDecreasePartitionsIntegrationTest.java
rename to
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/KafkaIncreaseDecreasePartitionsIntegrationTest.java
index 4ea04f9895..3b40a6c417 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaIncreaseDecreasePartitionsIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/KafkaIncreaseDecreasePartitionsIntegrationTest.java
@@ -16,21 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.integration.tests;
+package org.apache.pinot.integration.tests.realtime.ingestion;
import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.pinot.controller.api.resources.PauseStatusDetails;
-import org.apache.pinot.controller.api.resources.TableViews;
+import org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
-import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
+import static
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.*;
+
public class KafkaIncreaseDecreasePartitionsIntegrationTest extends
BaseRealtimeClusterIntegrationTest {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaIncreaseDecreasePartitionsIntegrationTest.class);
@@ -38,41 +38,6 @@ public class KafkaIncreaseDecreasePartitionsIntegrationTest
extends BaseRealtime
private static final String KAFKA_TOPIC = "meetup";
private static final int NUM_PARTITIONS = 1;
- String getExternalView(String tableName)
- throws IOException {
- return
sendGetRequest(getControllerRequestURLBuilder().forExternalView(tableName));
- }
-
- void pauseTable(String tableName)
- throws IOException {
-
sendPostRequest(getControllerRequestURLBuilder().forPauseConsumption(tableName));
- TestUtils.waitForCondition((aVoid) -> {
- try {
- PauseStatusDetails pauseStatusDetails =
-
JsonUtils.stringToObject(sendGetRequest(getControllerRequestURLBuilder().forPauseStatus(tableName)),
- PauseStatusDetails.class);
- return pauseStatusDetails.getConsumingSegments().isEmpty();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }, 60_000L, "Failed to pause table: " + tableName);
- }
-
- void resumeTable(String tableName)
- throws IOException {
-
sendPostRequest(getControllerRequestURLBuilder().forResumeConsumption(tableName));
- TestUtils.waitForCondition((aVoid) -> {
- try {
- PauseStatusDetails pauseStatusDetails =
-
JsonUtils.stringToObject(sendGetRequest(getControllerRequestURLBuilder().forPauseStatus(tableName)),
- PauseStatusDetails.class);
- return !pauseStatusDetails.getConsumingSegments().isEmpty();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }, 60_000L, "Failed to resume table: " + tableName);
- }
-
String createTable()
throws IOException {
Schema schema = createSchema("simpleMeetup_schema.json");
@@ -83,24 +48,6 @@ public class KafkaIncreaseDecreasePartitionsIntegrationTest
extends BaseRealtime
return tableConfig.getTableName();
}
- void waitForNumConsumingSegmentsInEV(String tableName, int
desiredNumConsumingSegments) {
- TestUtils.waitForCondition((aVoid) -> {
- try {
- AtomicInteger numConsumingSegments = new AtomicInteger(0);
- String state = getExternalView(tableName);
- TableViews.TableView tableView = JsonUtils.stringToObject(state,
TableViews.TableView.class);
- tableView._realtime.values().forEach((v) -> {
- numConsumingSegments.addAndGet((int)
v.values().stream().filter((v1) -> v1.equals("CONSUMING")).count());
- });
- return numConsumingSegments.get() == desiredNumConsumingSegments;
- } catch (IOException e) {
- LOGGER.error("Exception in waitForNumConsumingSegments: {}",
e.getMessage());
- return false;
- }
- }, 5000, 300_000L,
- "Failed to wait for " + desiredNumConsumingSegments + " consuming
segments for table: " + tableName);
- }
-
@Test
public void testDecreasePartitions()
throws Exception {
@@ -108,7 +55,7 @@ public class KafkaIncreaseDecreasePartitionsIntegrationTest
extends BaseRealtime
LOGGER.info("Creating Kafka topic with {} partitions", NUM_PARTITIONS + 2);
_kafkaStarters.get(0).createTopic(KAFKA_TOPIC,
KafkaStarterUtils.getTopicCreationProps(NUM_PARTITIONS + 2));
String tableName = createTable();
- waitForNumConsumingSegmentsInEV(tableName, NUM_PARTITIONS + 2);
+ waitForNumSegmentsInDesiredStateInEV(tableName, CONSUMING, NUM_PARTITIONS
+ 2, TableType.REALTIME);
pauseTable(tableName);
@@ -118,7 +65,7 @@ public class KafkaIncreaseDecreasePartitionsIntegrationTest
extends BaseRealtime
_kafkaStarters.get(0).createTopic(KAFKA_TOPIC,
KafkaStarterUtils.getTopicCreationProps(NUM_PARTITIONS));
resumeTable(tableName);
- waitForNumConsumingSegmentsInEV(tableName, NUM_PARTITIONS);
+ waitForNumSegmentsInDesiredStateInEV(tableName, CONSUMING, NUM_PARTITIONS,
TableType.REALTIME);
}
@Test(enabled = false)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/KinesisShardChangeTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/KinesisShardChangeTest.java
new file mode 100644
index 0000000000..22220e26eb
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/KinesisShardChangeTest.java
@@ -0,0 +1,425 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.realtime.ingestion;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.pinot.controller.api.resources.TableViews;
+import
org.apache.pinot.integration.tests.realtime.ingestion.utils.KinesisUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
+
+import static
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING;
+import static
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
+
+
+public class KinesisShardChangeTest extends BaseKinesisIntegrationTest {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KinesisShardChangeTest.class);
+
+ private static final String SCHEMA_FILE_PATH =
"kinesis/airlineStats_data_reduced.schema";
+ private static final String DATA_FILE_PATH =
"kinesis/airlineStats_data_reduced.json";
+ private static final Integer NUM_SHARDS = 2;
+
+ @BeforeMethod
+ public void beforeTest()
+ throws IOException {
+ createStream(NUM_SHARDS);
+ addSchema(createSchema(SCHEMA_FILE_PATH));
+ TableConfig tableConfig = createRealtimeTableConfig(null);
+ addTableConfig(tableConfig);
+ }
+
+ @AfterMethod
+ public void afterTest()
+ throws IOException {
+ dropRealtimeTable(getTableName());
+ deleteSchema(getTableName());
+ deleteStream();
+ }
+
+ /**
+ * Data provider for shard split and merge tests with different offset
combinations.
+ * Documentation is in the test method.
+ */
+ @DataProvider(name = "shardOffsetCombinations")
+ public Object[][] shardOffsetCombinations() {
+ return new Object[][]{
+ {"split", "smallest", "lastConsumed", 100, 250, 4, 4},
+ {"split", "smallest", null, 100, 250, 4, 4},
+ {"split", "largest", "lastConsumed", 50, 200, 2, 4},
+ {"split", "largest", null, 50, 200, 2, 4},
+ {"split", "lastConsumed", "lastConsumed", 200, 200, 6, 4},
+ {"split", "lastConsumed", "largest", 200, 200, 6, 4},
+ {"split", "lastConsumed", null, 200, 200, 2, 4},
+ {"split", null, null, 200, 200, 2, 4},
+ {"merge", "smallest", "lastConsumed", 100, 250, 4, 1},
+ {"merge", "smallest", null, 100, 250, 4, 1},
+ {"merge", "largest", "lastConsumed", 50, 200, 2, 1},
+ {"merge", "largest", null, 50, 200, 2, 1},
+ {"merge", "lastConsumed", "lastConsumed", 200, 200, 3, 1},
+ {"merge", "lastConsumed", "largest", 200, 200, 3, 1},
+ {"merge", "lastConsumed", null, 200, 200, 2, 1},
+ {"merge", null, null, 200, 200, 2, 1},
+ };
+ }
+
+ /**
+ * Test case to validate shard split/merge behavior with different offset
combinations.
+ * The expectation is that
+ * 1. when "smallest" offset is used, the old parent shards would be
consumed first.
+ * New shards will not be consumed until RVM is run or resume() is called
with lastConsumed / largest offset
+ * 2. when "largest" offset is used, only new records would be consumed and
all prior records pushed to kinesis
+ * would be skipped.
+ * 3. when "lastConsumed" offset is used, data would be consumed based on
the last consumed offset.
+ * 4. when RealtimeSegmentValidationManager is triggered, the behaviour
should be same as calling resume() with
+ * "lastConsumed" offset.
+ * @param operation - "split" or "merge"
+ * @param firstOffsetCriteria - Offset criteria for the first resume call.
+ * If it's null, RealtimeSegmentValidationManager
is triggered
+ * @param secondOffsetCriteria - Offset criteria for the second resume call.
+ * If it's null,
RealtimeSegmentValidationManager is triggered
+ * @param firstExpectedRecords - Expected records after the first resume call
+ * @param secondExpectedRecords - Expected records after the second resume
call
+ * @param expectedOnlineSegments - Expected number of online segments in the
end
+ * @param expectedConsumingSegments - Expected Number of consuming segments
in the end
+ */
+ @Test(dataProvider = "shardOffsetCombinations")
+ public void testShardOperationsWithOffsets(String operation, String
firstOffsetCriteria, String secondOffsetCriteria,
+ int firstExpectedRecords, int secondExpectedRecords, int
expectedOnlineSegments,
+ int expectedConsumingSegments)
+ throws Exception {
+
+ // Publish initial records and wait for them to be consumed
+ publishRecordsToKinesis(0, 50);
+ waitForRecordsToBeConsumed(getTableName(), 50); // pinot has created 2
segments
+
+ // Perform shard operation (split or merge)
+ if ("split".equals(operation)) {
+ KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 0); // splits
shard 0 into shard 2 & 3
+ KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 1); // splits
shard 1 into shard 4 & 5
+ } else if ("merge".equals(operation)) {
+ KinesisUtils.mergeShards(_kinesisClient, STREAM_NAME, 0, 1); // merges
shard 0 & 1 into shard 2
+ }
+
+ // Publish more records after shard operation. These will go to the new
shards
+ publishRecordsToKinesis(50, 200);
+
+ if (firstOffsetCriteria != null) {
+ // Pause and resume with the first offset criteria
+ pauseTable(getTableName()); // This will commit the current segments
+ resumeTable(getTableName(), firstOffsetCriteria);
+ } else {
+ runRealtimeSegmentValidationTask(getTableName());
+ }
+
+ waitForRecordsToBeConsumed(getTableName(), firstExpectedRecords); // Pinot
will create new segments
+
+ if (secondOffsetCriteria != null) {
+ // Pause and resume with the second offset criteria
+ pauseTable(getTableName()); // This will commit the current segments
+ resumeTable(getTableName(), secondOffsetCriteria);
+ } else {
+ runRealtimeSegmentValidationTask(getTableName());
+ }
+
+ waitForRecordsToBeConsumed(getTableName(), secondExpectedRecords); //
Pinot will create new segments
+
+ // Publish more records after shard operation. These will go to the new
shards
+ publishRecordsToKinesis(100, 200);
+ if (secondOffsetCriteria != null &&
secondOffsetCriteria.equals("largest")) {
+ // TODO - Fix this. Remove the check for largest offset. If largest
offset is used,
+ // we should have consumed the 100 records published after table was
resumed.
+ // Currently this is not happening. Thus the assertion is without the
new records
+ // We currently rely on RVM to fix the consumption
+ waitForRecordsToBeConsumed(getTableName(), secondExpectedRecords);
+ } else {
+ waitForRecordsToBeConsumed(getTableName(), secondExpectedRecords + 100);
+ }
+
+ runRealtimeSegmentValidationTask(getTableName());
+ waitForRecordsToBeConsumed(getTableName(), secondExpectedRecords + 100);
+
+ // Validate the final state of segments
+ validateSegmentStates(getTableName(), expectedOnlineSegments,
expectedConsumingSegments);
+ }
+
+ /**
+ * Data provider for new table tests with different offset combinations.
+ * Documentation is in the test method.
+ */
+ @DataProvider(name = "initialOffsetCombinations")
+ public Object[][] initialOffsetCombinations() {
+ return new Object[][]{
+ {"smallest", 50, 200},
+ {"largest", 50, 200}, // TODO - Validate if table created with largest
offset should not consume old records
+ {"lastConsumed", 50, 200}
+ };
+ }
+
+ /**
+ * Test case to split shards, then create new table and check consumption
+ * For the sake of brevity, we will only test shard split and calling
Realtime Validation Manager
+ * Individually, pause and resume have been verified for shard split / merge
operations
+ */
+ @Test(dataProvider = "initialOffsetCombinations")
+ public void testNewTableAfterShardSplit(String offsetCriteria, int
firstExpectedRecords, int secondExpectedRecords)
+ throws Exception {
+ // Publish initial records
+ publishRecordsToKinesis(0, 50);
+
+ // Split the shards
+ KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 0); // splits
shard 0 into shard 2 & 3
+ KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 1); // splits
shard 1 into shard 4 & 5
+
+ // new table is created with defined offset criteria but listening to the
original stream
+ String name = getTableName() + "_" + offsetCriteria;
+ createNewSchemaAndTable(name, offsetCriteria);
+
+ waitForRecordsToBeConsumed(name, firstExpectedRecords);
+
+ // publish more records. These will go to the new shards
+ publishRecordsToKinesis(50, 200);
+ waitForRecordsToBeConsumed(name, firstExpectedRecords); // pinot doesn't
listen to new shards yet.
+
+ // Trigger RVM. This will commit the current segments and start consuming
from the new shards
+ runRealtimeSegmentValidationTask(name);
+ waitForRecordsToBeConsumed(name, secondExpectedRecords);
+
+ // Validate the final state of segments
+ validateSegmentStates(name, 2, 4);
+
+ dropNewSchemaAndTable(name);
+ }
+
+ /**
+ * Test case to first split shards, then merge some shards.
+ * For the sake of brevity, we will only test by calling Realtime Validation
Manager
+ * Individually, pause and resume have been verified for shard split / merge
operations
+ */
+ @Test
+ public void testSplitAndMergeShards()
+ throws Exception {
+ // Publish initial records
+ publishRecordsToKinesis(0, 50);
+ waitForRecordsToBeConsumed(getTableName(), 50); // pinot has created 2
segments
+
+ // Split the shards
+ KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 0); // splits
shard 0 into shard 2 & 3
+ KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 1); // splits
shard 1 into shard 4 & 5
+
+ // Publish more records after shard operation. These will go to the new
shards
+ publishRecordsToKinesis(50, 175);
+
+ // Merge some shards
+ KinesisUtils.mergeShards(_kinesisClient, STREAM_NAME, 2, 3); // merges
shard 2 & 3 into shard 6
+ KinesisUtils.mergeShards(_kinesisClient, STREAM_NAME, 4, 5); // merges
shard 4 & 5 into shard 7
+
+ // Publish more records after shard operation. These will go to the new
shards
+ publishRecordsToKinesis(175, 200);
+
+ // Trigger RVM. This will commit segments 0 and 1 and start consuming from
shards 2-5
+ runRealtimeSegmentValidationTask(getTableName());
+ waitForRecordsToBeConsumed(getTableName(), 175);
+
+ // Trigger RVM. This will commit segments 2-5 and start consuming from
shards 6-7
+ runRealtimeSegmentValidationTask(getTableName());
+ waitForRecordsToBeConsumed(getTableName(), 200);
+
+ // Validate that 8 segments are created in total
+ validateSegmentStates(getTableName(), 6, 2);
+ }
+
+ /**
+ * Test case to continuously publish records to kinesis (in a background
thread) and concurrently split shards
+ * and concurrently call pause and resume APIs or RVM and finally validate
the total count of records
+ */
+ @Test
+ public void testConcurrentShardSplit()
+ throws IOException, InterruptedException {
+ // Start a background thread to continuously publish records to kinesis
+ Thread publisherThread = new Thread(() -> {
+ try {
+ for (int i = 0; i < 200; i += 5) {
+ publishRecordsToKinesis(i, i + 5);
+ Thread.sleep(1000);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error while publishing records to kinesis", e);
+ }
+ });
+ publisherThread.start(); // This will take ~40 secs to complete with 5
records ingested per second
+
+ Thread.sleep(5000);
+
+ // Split the shards
+ KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 0); // splits
shard 0 into shard 2 & 3
+ KinesisUtils.splitNthShard(_kinesisClient, STREAM_NAME, 1); // splits
shard 1 into shard 4 & 5
+
+ Thread.sleep(5000);
+
+ // Trigger RVM. This will commit segments 0 and 1 and start consuming from
shards 2-5
+ runRealtimeSegmentValidationTask(getTableName()); // This will commit
segments 0-1 and start consuming from 2-5
+
+ // Merge some shards
+ KinesisUtils.mergeShards(_kinesisClient, STREAM_NAME, 2, 3); // merges
shard 2 & 3 into shard 6
+ KinesisUtils.mergeShards(_kinesisClient, STREAM_NAME, 4, 5); // merges
shard 4 & 5 into shard 7
+
+ Thread.sleep(5000);
+
+ // Call pause and resume APIs
+ pauseTable(getTableName()); // This will commit segments 2-5
+ resumeTable(getTableName(), "lastConsumed"); // start consuming from
shards 6-7
+
+ // Wait for the publisher thread to finish
+ try {
+ publisherThread.join();
+ } catch (InterruptedException e) {
+ LOGGER.error("Error while waiting for publisher thread to finish", e);
+ }
+
+ waitForRecordsToBeConsumed(getTableName(), 200);
+
+ // Validate that all records are consumed
+ validateSegmentStates(getTableName(), 6, 2);
+ }
+
+ private void validateSegmentStates(String tableName, int
expectedOnlineSegments, int expectedConsumingSegments)
+ throws IOException {
+ TableViews.TableView tableView = getExternalView(tableName,
TableType.REALTIME);
+ Assert.assertEquals(tableView._realtime.size(), expectedOnlineSegments +
expectedConsumingSegments);
+
+ List<String> onlineSegments = tableView._realtime.entrySet().stream()
+ .filter(x -> x.getValue().containsValue(ONLINE))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ Assert.assertEquals(onlineSegments.size(), expectedOnlineSegments);
+
+ List<String> consumingSegments = tableView._realtime.entrySet().stream()
+ .filter(x -> x.getValue().containsValue(CONSUMING))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ Assert.assertEquals(consumingSegments.size(), expectedConsumingSegments);
+ }
+
+ /**
+ * start and end offsets are essentially the start row index and end row
index of the file
+ *
+ * @param startOffset - inclusive
+ * @param endOffset - exclusive
+ */
+ private void publishRecordsToKinesis(int startOffset, int endOffset)
+ throws Exception {
+ InputStream inputStream =
RealtimeKinesisIntegrationTest.class.getClassLoader()
+ .getResourceAsStream(KinesisShardChangeTest.DATA_FILE_PATH);
+ assert inputStream != null;
+ try (BufferedReader br = new BufferedReader(new
InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+ String line;
+ int count = 0;
+ while ((line = br.readLine()) != null) {
+ // Skip the first startOffset lines
+ if (count < startOffset) {
+ count++;
+ continue;
+ }
+ if (count++ >= endOffset) {
+ break;
+ }
+ JsonNode data = JsonUtils.stringToJsonNode(line);
+ PutRecordResponse putRecordResponse = putRecord(line,
data.get("Origin").textValue());
+ if (putRecordResponse.sdkHttpResponse().statusCode() != 200) {
+ throw new RuntimeException("Failed to put record " + line + " to
Kinesis stream with status code: "
+ + putRecordResponse.sdkHttpResponse().statusCode());
+ }
+ }
+ }
+ }
+
+ private void waitForRecordsToBeConsumed(String tableName, int
expectedNumRecords)
+ throws InterruptedException {
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ long count = getPinotConnection().execute("SELECT COUNT(*) FROM " +
tableName).getResultSet(0).getLong(0);
+ if (count != expectedNumRecords) {
+ LOGGER.warn("Expected {} records, but got {} records. Retrying",
expectedNumRecords, count);
+ }
+ return count == expectedNumRecords;
+ } catch (Exception e) {
+ return false;
+ }
+ }, 2000, 60_000L, "Wait for all records to be ingested");
+ // Sleep for few secs and validate the count again (to ensure no more
records are ingested)
+ Thread.sleep(2000);
+ long count = getPinotConnection().execute("SELECT COUNT(*) FROM " +
tableName).getResultSet(0).getLong(0);
+ Assert.assertEquals(count, expectedNumRecords);
+ }
+
+ private void createNewSchemaAndTable(String name, String offsetCriteria)
+ throws IOException {
+ Schema schema = createSchema(SCHEMA_FILE_PATH);
+ schema.setSchemaName(name);
+ addSchema(schema);
+
+ TableConfigBuilder tableConfigBuilder =
getTableConfigBuilder(TableType.REALTIME);
+ tableConfigBuilder.setTableName(name);
+ Map<String, String> streamConfigs = getStreamConfigs();
+
streamConfigs.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
+ StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA),
offsetCriteria);
+ tableConfigBuilder.setStreamConfigs(streamConfigs);
+ TableConfig tableConfig = tableConfigBuilder.build();
+ addTableConfig(tableConfig);
+ }
+
+ private void dropNewSchemaAndTable(String name)
+ throws IOException {
+ dropRealtimeTable(name);
+ deleteSchema(name);
+ }
+
+ @Override
+ public List<String> getNoDictionaryColumns() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public String getSortedColumn() {
+ return null;
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/RealtimeKinesisIntegrationTest.java
similarity index 51%
rename from
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java
rename to
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/RealtimeKinesisIntegrationTest.java
index 1e33a0b7fe..1f5569a603 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/RealtimeKinesisIntegrationTest.java
@@ -16,29 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.integration.tests;
-
-import cloud.localstack.Localstack;
-import cloud.localstack.ServiceName;
-import cloud.localstack.docker.annotation.LocalstackDockerAnnotationProcessor;
-import cloud.localstack.docker.annotation.LocalstackDockerConfiguration;
-import cloud.localstack.docker.annotation.LocalstackDockerProperties;
-import cloud.localstack.docker.command.Command;
+package org.apache.pinot.integration.tests.realtime.ingestion;
+
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.google.common.base.Function;
import java.io.BufferedReader;
-import java.io.File;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.net.URI;
-import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Statement;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -46,95 +36,45 @@ import java.util.List;
import java.util.Map;
import javax.activation.UnsupportedDataTypeException;
import javax.annotation.Nullable;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.client.ResultSet;
-import org.apache.pinot.plugin.stream.kinesis.KinesisConfig;
-import org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.StringUtil;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
-import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.core.SdkBytes;
-import software.amazon.awssdk.http.SdkHttpConfigurationOption;
-import software.amazon.awssdk.http.apache.ApacheSdkHttpService;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
-import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
-import software.amazon.awssdk.utils.AttributeMap;
-@LocalstackDockerProperties(services = {ServiceName.KINESIS}, imageTag =
"2.3.2")
-public class RealtimeKinesisIntegrationTest extends
BaseClusterIntegrationTestSet {
+public class RealtimeKinesisIntegrationTest extends BaseKinesisIntegrationTest
{
private static final Logger LOGGER =
LoggerFactory.getLogger(RealtimeKinesisIntegrationTest.class);
- private static final LocalstackDockerAnnotationProcessor PROCESSOR = new
LocalstackDockerAnnotationProcessor();
- private static final String STREAM_NAME = "kinesis-test";
- private static final String STREAM_TYPE = "kinesis";
-
- public static final String REGION = "us-east-1";
- public static final String LOCALSTACK_KINESIS_ENDPOINT =
"http://localhost:4566";
- public static final int NUM_SHARDS = 10;
+ private static final int NUM_SHARDS = 10;
// Localstack Kinesis doesn't support large rows.
// So, this airlineStats data file consists of only few fields and rows from
the original data
- public static final String SCHEMA_FILE_PATH =
"kinesis/airlineStats_data_reduced.schema";
- public static final String DATA_FILE_PATH =
"kinesis/airlineStats_data_reduced.json";
-
- private final Localstack _localstackDocker = Localstack.INSTANCE;
-
- private static KinesisClient _kinesisClient = null;
+ private static final String SCHEMA_FILE_PATH =
"kinesis/airlineStats_data_reduced.schema";
+ private static final String DATA_FILE_PATH =
"kinesis/airlineStats_data_reduced.json";
private long _totalRecordsPushedInStream = 0;
List<String> _h2FieldNameAndTypes = new ArrayList<>();
- private boolean _skipTestNoDockerInstalled = false;
-
@BeforeClass
public void setUp()
throws Exception {
- try {
- DockerInfoCommand dockerInfoCommand = new DockerInfoCommand();
- dockerInfoCommand.execute();
- } catch (IllegalStateException e) {
- _skipTestNoDockerInstalled = true;
- LOGGER.warn("Skipping test! Docker is not found running", e);
- throw new SkipException(e.getMessage());
- }
-
- TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
- // Start the Pinot cluster
- startZk();
- startController();
- startBroker();
- startServer();
+ super.setUp();
- // Start Kinesis
- startKinesis();
+ // Create new stream
+ createStream(NUM_SHARDS);
// Create and upload the schema and table config
- addSchema(createKinesisSchema());
- addTableConfig(createKinesisTableConfig());
+ addSchema(createSchema(SCHEMA_FILE_PATH));
+ addTableConfig(createRealtimeTableConfig(null));
createH2ConnectionAndTable();
@@ -145,13 +85,6 @@ public class RealtimeKinesisIntegrationTest extends
BaseClusterIntegrationTestSe
waitForAllDocsLoadedKinesis(120_000L);
}
- public Schema createKinesisSchema()
- throws Exception {
- URL resourceUrl =
BaseClusterIntegrationTest.class.getClassLoader().getResource(SCHEMA_FILE_PATH);
- Assert.assertNotNull(resourceUrl);
- return Schema.fromFile(new File(resourceUrl.getFile()));
- }
-
protected void waitForAllDocsLoadedKinesis(long timeoutMs)
throws Exception {
waitForAllDocsLoadedKinesis(timeoutMs, true);
@@ -172,79 +105,14 @@ public class RealtimeKinesisIntegrationTest extends
BaseClusterIntegrationTestSe
}, 1000L, timeoutMs, "Failed to load " + _totalRecordsPushedInStream + "
documents", raiseError);
}
- public TableConfig createKinesisTableConfig() {
- return new
TableConfigBuilder(TableType.REALTIME).setTableName(getTableName())
-
.setTimeColumnName("DaysSinceEpoch").setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
-
.setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig())
-
.setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
-
.setStreamConfigs(createKinesisStreamConfig()).setNullHandlingEnabled(getNullHandlingEnabled()).build();
- }
-
- public Map<String, String> createKinesisStreamConfig() {
- Map<String, String> streamConfigMap = new HashMap<>();
- String streamType = "kinesis";
- streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
-
- streamConfigMap.put(
- StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
StreamConfigProperties.STREAM_TOPIC_NAME),
- STREAM_NAME);
-
- streamConfigMap.put(
- StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS),
- "30000");
-
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
- StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
KinesisConsumerFactory.class.getName());
- streamConfigMap.put(
- StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
StreamConfigProperties.STREAM_DECODER_CLASS),
- "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder");
- streamConfigMap.put(KinesisConfig.REGION, REGION);
- streamConfigMap.put(KinesisConfig.SHARD_ITERATOR_TYPE,
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString());
- streamConfigMap.put(KinesisConfig.ENDPOINT, LOCALSTACK_KINESIS_ENDPOINT);
- streamConfigMap.put(KinesisConfig.ACCESS_KEY,
getLocalAWSCredentials().resolveCredentials().accessKeyId());
- streamConfigMap.put(KinesisConfig.SECRET_KEY,
getLocalAWSCredentials().resolveCredentials().secretAccessKey());
- streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS,
Integer.toString(200));
-
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
- StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), "smallest");
- return streamConfigMap;
- }
-
- public void startKinesis()
- throws Exception {
- final LocalstackDockerConfiguration dockerConfig =
PROCESSOR.process(this.getClass());
- StopAllLocalstackDockerCommand stopAllLocalstackDockerCommand = new
StopAllLocalstackDockerCommand();
- stopAllLocalstackDockerCommand.execute();
- _localstackDocker.startup(dockerConfig);
-
- _kinesisClient = KinesisClient.builder().httpClient(new
ApacheSdkHttpService().createHttpClientBuilder()
- .buildWithDefaults(
-
AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
Boolean.TRUE).build()))
-
.credentialsProvider(getLocalAWSCredentials()).region(Region.of(REGION))
- .endpointOverride(new URI(LOCALSTACK_KINESIS_ENDPOINT)).build();
-
-
_kinesisClient.createStream(CreateStreamRequest.builder().streamName(STREAM_NAME).shardCount(NUM_SHARDS).build());
-
- TestUtils.waitForCondition(new Function<Void, Boolean>() {
- @Nullable
- @Override
- public Boolean apply(@Nullable Void aVoid) {
- try {
- String kinesisStreamStatus =
-
_kinesisClient.describeStream(DescribeStreamRequest.builder().streamName(STREAM_NAME).build())
- .streamDescription().streamStatusAsString();
-
- return kinesisStreamStatus.contentEquals("ACTIVE");
- } catch (Exception e) {
- LOGGER.warn("Could not fetch kinesis stream status", e);
- return null;
- }
- }
- }, 1000L, 30000, "Kinesis stream " + STREAM_NAME + " is not created or is
not in active state", true);
+ @Override
+ public List<String> getNoDictionaryColumns() {
+ return Collections.emptyList();
}
- public void stopKinesis() {
- if (_localstackDocker.isRunning()) {
- _localstackDocker.stop();
- }
+ @Override
+ public String getSortedColumn() {
+ return null;
}
private void publishRecordsToKinesis() {
@@ -264,10 +132,7 @@ public class RealtimeKinesisIntegrationTest extends
BaseClusterIntegrationTestSe
while ((line = br.readLine()) != null) {
JsonNode data = JsonUtils.stringToJsonNode(line);
- PutRecordRequest putRecordRequest =
-
PutRecordRequest.builder().streamName(STREAM_NAME).data(SdkBytes.fromUtf8String(line))
- .partitionKey(data.get("Origin").textValue()).build();
- PutRecordResponse putRecordResponse =
_kinesisClient.putRecord(putRecordRequest);
+ PutRecordResponse putRecordResponse = putRecord(line,
data.get("Origin").textValue());
if (putRecordResponse.sdkHttpResponse().statusCode() == 200) {
if (StringUtils.isNotBlank(putRecordResponse.sequenceNumber()) &&
StringUtils.isNotBlank(
putRecordResponse.shardId())) {
@@ -303,10 +168,6 @@ public class RealtimeKinesisIntegrationTest extends
BaseClusterIntegrationTestSe
}
}
- private static AwsCredentialsProvider getLocalAWSCredentials() {
- return
StaticCredentialsProvider.create(AwsBasicCredentials.create("access",
"secret"));
- }
-
@Test
public void testRecords()
throws Exception {
@@ -435,42 +296,7 @@ public class RealtimeKinesisIntegrationTest extends
BaseClusterIntegrationTestSe
@AfterClass
public void tearDown()
throws Exception {
- if (_skipTestNoDockerInstalled) {
- return;
- }
-
dropRealtimeTable(getTableName());
- stopServer();
- stopBroker();
- stopController();
- stopZk();
- stopKinesis();
- FileUtils.deleteDirectory(_tempDir);
- }
-
- public static class StopAllLocalstackDockerCommand extends Command {
-
- public void execute() {
- String runningDockerContainers =
- dockerExe.execute(Arrays.asList("ps", "-a", "-q", "-f",
"ancestor=localstack/localstack"));
- if (StringUtils.isNotBlank(runningDockerContainers) &&
!runningDockerContainers.toLowerCase().contains("error")) {
- String[] containerList = runningDockerContainers.split("\n");
-
- for (String containerId : containerList) {
- dockerExe.execute(Arrays.asList("stop", containerId));
- }
- }
- }
- }
-
- public static class DockerInfoCommand extends Command {
-
- public void execute() {
- String dockerInfo = dockerExe.execute(Collections.singletonList("info"));
-
- if (dockerInfo.toLowerCase().contains("error")) {
- throw new IllegalStateException("Docker daemon is not running!");
- }
- }
+ super.tearDown();
}
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/utils/KinesisUtils.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/utils/KinesisUtils.java
new file mode 100644
index 0000000000..8f72d6f81e
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/utils/KinesisUtils.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.realtime.ingestion.utils;
+
+import java.math.BigInteger;
+import java.time.Duration;
+import java.util.List;
+import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
+import software.amazon.awssdk.services.kinesis.model.MergeShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.MergeShardsResponse;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+import software.amazon.awssdk.services.kinesis.model.SplitShardRequest;
+import software.amazon.awssdk.services.kinesis.model.SplitShardResponse;
+
+
+public class KinesisUtils {
+
+ private KinesisUtils() {
+ }
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KinesisUtils.class);
+
+ public static void splitNthShard(KinesisClient kinesisClient, String stream,
int index) {
+ List<Shard> shards = getShards(kinesisClient, stream);
+ int initialSize = shards.size();
+ splitShard(kinesisClient, stream, shards.get(index));
+ LOGGER.info("Splitted shard with ID: " + shards.get(index).shardId());
+
+ TestUtils.waitForCondition((avoid) -> isKinesisStreamActive(kinesisClient,
stream)
+ && getShards(kinesisClient, stream).size() == initialSize + 2,
+ 2000, Duration.ofMinutes(1).toMillis(), "Waiting for Kinesis stream to
be active and shards to be split");
+ }
+
+ public static void mergeShards(KinesisClient kinesisClient, String stream,
int index1, int index2) {
+ List<Shard> shards = getShards(kinesisClient, stream);
+ int initialSize = shards.size();
+ mergeShard(kinesisClient, stream, shards.get(index1), shards.get(index2));
+ LOGGER.info("Merged shard with ID: " + shards.get(index1).shardId() + "
and " + shards.get(index2).shardId());
+
+ TestUtils.waitForCondition((avoid) -> isKinesisStreamActive(kinesisClient,
stream)
+ && getShards(kinesisClient, stream).size() == initialSize + 1,
+ 2000, Duration.ofMinutes(1).toMillis(), "Waiting for Kinesis stream to
be active and shards to be merged");
+ }
+
+ public static boolean isKinesisStreamActive(KinesisClient kinesisClient,
String streamName) {
+ try {
+ String kinesisStreamStatus = getKinesisStreamStatus(kinesisClient,
streamName);
+ boolean isActive = kinesisStreamStatus.contentEquals("ACTIVE");
+ if (!isActive) {
+ LOGGER.warn("Kinesis stream " + streamName + " in state" +
kinesisStreamStatus);
+ }
+ return isActive;
+ } catch (ResourceNotFoundException e) {
+ LOGGER.warn("Kinesis stream " + streamName + " not found");
+ return false;
+ }
+ }
+
+ public static String getKinesisStreamStatus(KinesisClient kinesisClient,
String streamName) {
+ return
kinesisClient.describeStream(DescribeStreamRequest.builder().streamName(streamName).build())
+ .streamDescription().streamStatusAsString();
+ }
+
+ private static List<Shard> getShards(KinesisClient kinesisClient, String
stream) {
+ ListShardsResponse listShardsResponse =
+
kinesisClient.listShards(ListShardsRequest.builder().streamName(stream).build());
+ return listShardsResponse.shards();
+ }
+
+ private static SplitShardResponse splitShard(KinesisClient kinesisClient,
String stream, Shard shard) {
+ BigInteger startHash = new
BigInteger(shard.hashKeyRange().startingHashKey());
+ BigInteger endHash = new BigInteger(shard.hashKeyRange().endingHashKey());
+ BigInteger newStartingHashKey = startHash.add(endHash).divide(new
BigInteger("2"));
+ return kinesisClient.splitShard(SplitShardRequest.builder()
+ .shardToSplit(shard.shardId())
+ .streamName(stream)
+ .newStartingHashKey(newStartingHashKey.toString())
+ .build());
+ }
+
+ private static MergeShardsResponse mergeShard(KinesisClient kinesisClient,
String stream, Shard shard1,
+ Shard shard2) {
+ return kinesisClient.mergeShards(MergeShardsRequest.builder()
+ .shardToMerge(shard1.shardId())
+ .adjacentShardToMerge(shard2.shardId())
+ .streamName(stream)
+ .build());
+ }
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index de876b3071..f5a905e111 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -61,6 +61,16 @@ public class KinesisConsumer extends
KinesisConnectionHandler implements Partiti
super(config, kinesisClient);
}
+ /**
+ * Based on Kinesis documentation, we might get a response with empty
records but a non-null nextShardIterator.
+ * Known cases are:
+ * 1. When the shard has ended (has been split or merged) and we need a
couple of calls to getRecords() to reach
+ * a null iterator
+ * 2. When there are no new messages in the shard but the shard is active.
We will continue to get a non-null
+ * nextShardIterator in this case
+ * 3. When there are some messages in the shard, but we need a few
iterations to get them.
+ * This needs to be handled by the client based on appropriate retry
strategy.
+ */
@Override
public synchronized KinesisMessageBatch
fetchMessages(StreamPartitionMsgOffset startMsgOffset, int timeoutMs) {
try {
@@ -98,6 +108,7 @@ public class KinesisConsumer extends
KinesisConnectionHandler implements Partiti
GetRecordsRequest getRecordRequest =
GetRecordsRequest.builder().shardIterator(shardIterator).limit(_config.getNumMaxRecordsToFetch()).build();
GetRecordsResponse getRecordsResponse =
_kinesisClient.getRecords(getRecordRequest);
+
List<Record> records = getRecordsResponse.records();
List<BytesStreamMessage> messages;
KinesisPartitionGroupOffset offsetOfNextBatch;
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
index 612ea38098..d9b5f17e39 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
@@ -197,6 +197,23 @@ public class KinesisStreamMetadataProvider implements
StreamMetadataProvider {
return newPartitionGroupMetadataList;
}
+ /**
+ * Refer documentation for {@link #computePartitionGroupMetadata(String,
StreamConfig, List, int)}
+ * @param forceGetOffsetFromStream - the flag is not required for Kinesis
stream. Kinesis implementation
+ * takes care of returning non-null offsets
for all old and new partitions.
+ * The flag is primarily required for Kafka
stream which requires refactoring
+ * to avoid this flag. More details in {@link
+ *
StreamMetadataProvider#computePartitionGroupMetadata(
+ * String, StreamConfig, List, int, boolean)}
+ */
+ @Override
+ public List<PartitionGroupMetadata> computePartitionGroupMetadata(String
clientId, StreamConfig streamConfig,
+ List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatuses,
int timeoutMillis,
+ boolean forceGetOffsetFromStream)
+ throws IOException, TimeoutException {
+ return computePartitionGroupMetadata(clientId, streamConfig,
partitionGroupConsumptionStatuses, timeoutMillis);
+ }
+
/**
* Converts a shardId string to a partitionGroupId integer by parsing the
digits of the shardId
* e.g. "shardId-000000000001" becomes 1
@@ -213,8 +230,29 @@ public class KinesisStreamMetadataProvider implements
StreamMetadataProvider {
throws IOException, TimeoutException {
try (PartitionGroupConsumer partitionGroupConsumer =
_kinesisStreamConsumerFactory.createPartitionGroupConsumer(
_clientId, partitionGroupConsumptionStatus)) {
- MessageBatch<?> messageBatch =
partitionGroupConsumer.fetchMessages(startCheckpoint, _fetchTimeoutMs);
- return messageBatch.getMessageCount() == 0 &&
messageBatch.isEndOfPartitionGroup();
+ int attempts = 0;
+ while (true) {
+ MessageBatch<?> messageBatch =
partitionGroupConsumer.fetchMessages(startCheckpoint, _fetchTimeoutMs);
+ if (messageBatch.getMessageCount() > 0) {
+ // There are messages left to be consumed so we haven't consumed the
shard fully
+ return false;
+ }
+ if (messageBatch.isEndOfPartitionGroup()) {
+ // Shard can't be iterated further. We have consumed all the
messages because message count = 0
+ return true;
+ }
+ // Even though message count = 0, shard can be iterated further.
+ // Based on kinesis documentation, there might be more records to be
consumed.
+ // So we need to fetch messages again to check if we have reached end
of shard.
+ // To prevent an infinite loop (known cases listed in
fetchMessages()), we will limit the number of attempts
+ attempts++;
+ if (attempts >= 5) {
+ LOGGER.warn("Reached max attempts to check if end of shard reached
from checkpoint {}. "
+ + " Assuming we have not consumed till end of shard.",
startCheckpoint);
+ return false;
+ }
+ // continue to fetch messages. reusing the partitionGroupConsumer
ensures we use new shard iterator
+ }
}
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.java
index c6cf493370..7ad46919f2 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.java
@@ -179,6 +179,38 @@ public class KinesisStreamMetadataProviderTest {
Assert.assertEquals(result.size(), 1);
Assert.assertEquals(result.get(0).getPartitionGroupId(), 1);
Assert.assertEquals(partitionGroupMetadataCapture.getValue().getSequenceNumber(),
1);
+
+ // Simulate the case where initial calls to fetchMessages returns empty
messages but non-null next shard iterator
+ when(_partitionGroupConsumer.fetchMessages(checkpointArgs.capture(),
intArguments.capture()))
+ .thenReturn(new KinesisMessageBatch(new ArrayList<>(),
kinesisPartitionGroupOffset, false))
+ .thenReturn(new KinesisMessageBatch(new ArrayList<>(),
kinesisPartitionGroupOffset, false))
+ .thenReturn(new KinesisMessageBatch(new ArrayList<>(),
kinesisPartitionGroupOffset, true));
+ result =
+
_kinesisStreamMetadataProvider.computePartitionGroupMetadata(CLIENT_ID,
getStreamConfig(),
+ currentPartitionGroupMeta, TIMEOUT);
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.get(0).getPartitionGroupId(), 1);
+
Assert.assertEquals(partitionGroupMetadataCapture.getValue().getSequenceNumber(),
1);
+
+ // Simulate the case where all calls to fetchMessages returns empty
messages and non-null next shard iterator
+ when(_partitionGroupConsumer.fetchMessages(checkpointArgs.capture(),
intArguments.capture()))
+ .thenReturn(new KinesisMessageBatch(new ArrayList<>(),
kinesisPartitionGroupOffset, false))
+ .thenReturn(new KinesisMessageBatch(new ArrayList<>(),
kinesisPartitionGroupOffset, false))
+ .thenReturn(new KinesisMessageBatch(new ArrayList<>(),
kinesisPartitionGroupOffset, false))
+ .thenReturn(new KinesisMessageBatch(new ArrayList<>(),
kinesisPartitionGroupOffset, false))
+ .thenReturn(new KinesisMessageBatch(new ArrayList<>(),
kinesisPartitionGroupOffset, false))
+ .thenReturn(new KinesisMessageBatch(new ArrayList<>(),
kinesisPartitionGroupOffset, false))
+ .thenReturn(new KinesisMessageBatch(new ArrayList<>(),
kinesisPartitionGroupOffset, false));
+
+ result =
+
_kinesisStreamMetadataProvider.computePartitionGroupMetadata(CLIENT_ID,
getStreamConfig(),
+ currentPartitionGroupMeta, TIMEOUT);
+
+ Assert.assertEquals(result.size(), 2);
+ Assert.assertEquals(result.get(0).getPartitionGroupId(), 0);
+
Assert.assertEquals(partitionGroupMetadataCapture.getValue().getSequenceNumber(),
1);
+ Assert.assertEquals(result.get(1).getPartitionGroupId(), 1);
+
Assert.assertEquals(partitionGroupMetadataCapture.getValue().getSequenceNumber(),
1);
}
@Test
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
index 77eac3832e..01f429a511 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
@@ -47,16 +47,6 @@ public class FreshnessBasedConsumptionStatusChecker extends
IngestionBasedConsum
_idleTimeoutMs = idleTimeoutMs;
}
- private boolean isOffsetCaughtUp(StreamPartitionMsgOffset currentOffset,
StreamPartitionMsgOffset latestOffset) {
- if (currentOffset != null && latestOffset != null) {
- // Kafka's "latest" offset is actually the next available offset.
Therefore it will be 1 ahead of the
- // current offset in the case we are caught up.
- // TODO: implement a way to have this work correctly for kafka consumers
- return currentOffset.compareTo(latestOffset) >= 0;
- }
- return false;
- }
-
private boolean segmentHasBeenIdleLongerThanThreshold(long segmentIdleTime) {
return _idleTimeoutMs > 0 && segmentIdleTime > _idleTimeoutMs;
}
@@ -84,7 +74,7 @@ public class FreshnessBasedConsumptionStatusChecker extends
IngestionBasedConsum
// the stream consumer to check partition count if we're already caught up.
StreamPartitionMsgOffset currentOffset =
rtSegmentDataManager.getCurrentOffset();
StreamPartitionMsgOffset latestStreamOffset =
rtSegmentDataManager.fetchLatestStreamOffset(5000);
- if (isOffsetCaughtUp(currentOffset, latestStreamOffset)) {
+ if (isOffsetCaughtUp(segmentName, currentOffset, latestStreamOffset)) {
_logger.info("Segment {} with freshness {}ms has not caught up within
min freshness {}. "
+ "But the current ingested offset is equal to the latest
available offset {}.", segmentName, freshnessMs,
_minFreshnessMs, currentOffset);
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
index c6fe0d16d6..18d08dd3d5 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
@@ -29,6 +29,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -135,4 +136,24 @@ public abstract class
IngestionBasedConsumptionStatusChecker {
}
protected abstract boolean isSegmentCaughtUp(String segmentName,
RealtimeSegmentDataManager rtSegmentDataManager);
+
+ protected boolean isOffsetCaughtUp(String segmentName,
+ StreamPartitionMsgOffset currentOffset, StreamPartitionMsgOffset
latestOffset) {
+ if (currentOffset != null && latestOffset != null) {
+ // Kafka's "latest" offset is actually the next available offset.
Therefore it will be 1 ahead of the
+ // current offset in the case we are caught up.
+ // TODO: implement a way to have this work correctly for kafka consumers
+ _logger.info("Null offset found for segment {} - current offset: {},
latest offset: {}. "
+ + "Will check consumption status later", segmentName, currentOffset,
latestOffset);
+ try {
+ return currentOffset.compareTo(latestOffset) >= 0;
+ } catch (NullPointerException e) {
+ // This can happen if the offsets are not comparable,
+ // Eg: Sequence number missing for a kinesis shard
+ _logger.info("Unable to compare offsets for segment {} - current
offset: {}, latest offset: {}. "
+ + "Will check consumption status later", segmentName,
currentOffset, latestOffset);
+ }
+ }
+ return false;
+ }
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
index ad7d2905ba..b4f2ba12e2 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
@@ -45,18 +45,15 @@ public class OffsetBasedConsumptionStatusChecker extends
IngestionBasedConsumpti
protected boolean isSegmentCaughtUp(String segmentName,
RealtimeSegmentDataManager rtSegmentDataManager) {
StreamPartitionMsgOffset latestIngestedOffset =
rtSegmentDataManager.getCurrentOffset();
StreamPartitionMsgOffset latestStreamOffset =
rtSegmentDataManager.getLatestStreamOffsetAtStartupTime();
- if (latestStreamOffset == null || latestIngestedOffset == null) {
- _logger.info("Null offset found for segment {} - latest stream offset:
{}, latest ingested offset: {}. "
- + "Will check consumption status later", segmentName,
latestStreamOffset, latestIngestedOffset);
- return false;
- }
- if (latestIngestedOffset.compareTo(latestStreamOffset) < 0) {
- _logger.info("Latest ingested offset {} in segment {} is smaller than
stream latest available offset {} ",
- latestIngestedOffset, segmentName, latestStreamOffset);
- return false;
+
+ if (isOffsetCaughtUp(segmentName, latestIngestedOffset,
latestStreamOffset)) {
+ _logger.info("Segment {} with latest ingested offset {} has caught up to
the latest stream offset {}",
+ segmentName, latestIngestedOffset, latestStreamOffset);
+ return true;
}
- _logger.info("Segment {} with latest ingested offset {} has caught up to
the latest stream offset {}", segmentName,
- latestIngestedOffset, latestStreamOffset);
- return true;
+
+ _logger.info("Latest ingested offset {} in segment {} is smaller than
stream latest available offset {} ",
+ latestIngestedOffset, segmentName, latestStreamOffset);
+ return false;
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index 30cbe8bd63..53f0e33ed1 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -40,13 +40,16 @@ public class PartitionGroupMetadataFetcher implements
Callable<Boolean> {
private final List<PartitionGroupConsumptionStatus>
_partitionGroupConsumptionStatusList;
private Exception _exception;
private final List<String> _topicNames;
+ private final boolean _forceGetOffsetFromStream;
public PartitionGroupMetadataFetcher(List<StreamConfig> streamConfigs,
- List<PartitionGroupConsumptionStatus>
partitionGroupConsumptionStatusList) {
+ List<PartitionGroupConsumptionStatus>
partitionGroupConsumptionStatusList,
+ boolean forceGetOffsetFromStream) {
_topicNames =
streamConfigs.stream().map(StreamConfig::getTopicName).collect(Collectors.toList());
_streamConfigs = streamConfigs;
_partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList;
_newPartitionGroupMetadataList = new ArrayList<>();
+ _forceGetOffsetFromStream = forceGetOffsetFromStream;
}
public List<PartitionGroupMetadata> getPartitionGroupMetadataList() {
@@ -83,8 +86,8 @@ public class PartitionGroupMetadataFetcher implements
Callable<Boolean> {
_newPartitionGroupMetadataList.addAll(
streamMetadataProvider.computePartitionGroupMetadata(StreamConsumerFactory.getUniqueClientId(clientId),
_streamConfigs.get(i),
- topicPartitionGroupConsumptionStatusList,
/*maxWaitTimeMs=*/15000).stream().map(
- metadata -> new PartitionGroupMetadata(
+ topicPartitionGroupConsumptionStatusList,
/*maxWaitTimeMs=*/15000, _forceGetOffsetFromStream).stream()
+ .map(metadata -> new PartitionGroupMetadata(
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(
metadata.getPartitionGroupId(), index),
metadata.getStartOffset())).collect(Collectors.toList())
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 64770d3f83..66bf9768b5 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -21,6 +21,7 @@ package org.apache.pinot.spi.stream;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -98,6 +99,26 @@ public interface StreamMetadataProvider extends Closeable {
return newPartitionGroupMetadataList;
}
+ /**
+ * @param forceGetOffsetFromStream - the flag is a workaround to not use
partitionGroupConsumptionStatuses.
+ * This is required because
PinotLLCRealtimeSegmentManager.selectStartOffset()
+ * actually requires the offsets from the
stream, but was originally relying on
+ * passing an empty
partitionGroupConsumptionStatuses to the method.
+ * The change for <a
href="https://github.com/apache/pinot/issues/15608">...</a>
+ * required to pass the actual
partitionGroupConsumptionStatuses
+ * TODO - Remove the flag and fix the
clients calling computePartitionGroupMetadata()
+ */
+ default List<PartitionGroupMetadata> computePartitionGroupMetadata(String
clientId, StreamConfig streamConfig,
+ List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatuses,
int timeoutMillis,
+ boolean forceGetOffsetFromStream)
+ throws IOException, TimeoutException {
+ if (forceGetOffsetFromStream) {
+ return computePartitionGroupMetadata(clientId, streamConfig,
Collections.emptyList(), timeoutMillis);
+ } else {
+ return computePartitionGroupMetadata(clientId, streamConfig,
partitionGroupConsumptionStatuses, timeoutMillis);
+ }
+ }
+
default Map<String, PartitionLagState> getCurrentPartitionLagState(
Map<String, ConsumerPartitionState> currentPartitionStateMap) {
Map<String, PartitionLagState> result = new HashMap<>();
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 48a3e63f75..2804eac53e 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -113,6 +113,11 @@ public class ControllerRequestURLBuilder {
return StringUtil.join("/", _baseUrl, "periodictask", "run?taskname=" +
taskName);
}
+ public String forPeriodTaskRun(String taskName, String tableName, TableType
tableType) {
+ return StringUtil.join("/", _baseUrl, "periodictask", "run?taskname=" +
taskName + "&tableName=" + tableName
+ + "&type=" + tableType);
+ }
+
public String forUpdateUserConfig(String username, String componentTypeStr,
boolean passwordChanged) {
StringBuilder params = new StringBuilder();
if (StringUtils.isNotBlank(username)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]