krishan1390 commented on code in PR #15563:
URL: https://github.com/apache/pinot/pull/15563#discussion_r2052300098
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PauseStatusDetails.java:
##########
@@ -65,4 +66,11 @@ public String getComment() {
public String getTimestamp() {
return _timestamp;
}
+
+ @Override
+ public String toString() {
Review Comment:
added this for easier debuggability of tests to log the pause status details
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java:
##########
@@ -824,6 +831,86 @@ public void
enableResourceConfigForLeadControllerResource(boolean enable) {
}
}
+ public void runRealtimeSegmentValidationTask(String tableName)
+ throws IOException {
+ runPeriodicTask("RealtimeSegmentValidationManager", tableName, "REALTIME");
+ }
+
+ public void runPeriodicTask(String taskName, String tableName, String
tableType)
+ throws IOException {
+ sendGetRequest(getControllerRequestURLBuilder().forPeriodTaskRun(taskName,
tableName, tableType));
+ }
+
+ public void pauseTable(String tableName)
Review Comment:
these methods are shitfted from
KafkaIncreaseDecreasePartitionsIntegrationTest to make it available for other
tests.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1651,13 +1655,17 @@ private void createNewConsumingSegment(TableConfig
tableConfig, StreamConfig str
}
private Map<Integer, StreamPartitionMsgOffset>
fetchPartitionGroupIdToSmallestOffset(
- List<StreamConfig> streamConfigs) {
+ List<StreamConfig> streamConfigs, IdealState idealState) {
Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset =
new HashMap<>();
for (StreamConfig streamConfig : streamConfigs) {
+
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
+ getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
+
OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
List<PartitionGroupMetadata> partitionGroupMetadataList =
- getNewPartitionGroupMetadataList(streamConfigs,
Collections.emptyList());
+ getNewPartitionGroupMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList);
Review Comment:
if we send empty list to getNewPartitionGroupMetadataList, the function
assumes that no shards are currently being consumed.
for kinesis, we need to completely consume a parent shard before starting
consumption for a child shard.
so if parent shard doesn't exist, the function doesn't return child shards
because it asks us to consume parent shard first.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -994,15 +994,17 @@ Set<Integer> getPartitionIds(StreamConfig streamConfig)
@VisibleForTesting
Set<Integer> getPartitionIds(List<StreamConfig> streamConfigs, IdealState
idealState) {
Set<Integer> partitionIds = new HashSet<>();
- boolean allPartitionIdsFetched = true;
+ boolean allPartitionIdsFetched = false;
for (int i = 0; i < streamConfigs.size(); i++) {
final int index = i;
try {
partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream()
.map(partitionId ->
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId,
index))
.collect(Collectors.toSet()));
+ allPartitionIdsFetched = true;
+ } catch (UnsupportedOperationException ignored) {
+ // Stream does not support fetching partition ids. There is a log in
the fallback code which is sufficient
Review Comment:
Not logging the exception in such cases to avoid noise
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/RealtimeKinesisIntegrationTest.java:
##########
@@ -16,125 +16,65 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.integration.tests;
-
-import cloud.localstack.Localstack;
-import cloud.localstack.ServiceName;
-import cloud.localstack.docker.annotation.LocalstackDockerAnnotationProcessor;
-import cloud.localstack.docker.annotation.LocalstackDockerConfiguration;
-import cloud.localstack.docker.annotation.LocalstackDockerProperties;
-import cloud.localstack.docker.command.Command;
+package org.apache.pinot.integration.tests.realtime.ingestion;
+
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.google.common.base.Function;
import java.io.BufferedReader;
-import java.io.File;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.net.URI;
-import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Statement;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.activation.UnsupportedDataTypeException;
import javax.annotation.Nullable;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.client.ResultSet;
-import org.apache.pinot.plugin.stream.kinesis.KinesisConfig;
-import org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.StringUtil;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
-import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.core.SdkBytes;
-import software.amazon.awssdk.http.SdkHttpConfigurationOption;
-import software.amazon.awssdk.http.apache.ApacheSdkHttpService;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
-import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
-import software.amazon.awssdk.utils.AttributeMap;
-@LocalstackDockerProperties(services = {ServiceName.KINESIS}, imageTag =
"2.3.2")
-public class RealtimeKinesisIntegrationTest extends
BaseClusterIntegrationTestSet {
+public class RealtimeKinesisIntegrationTest extends BaseKinesisIntegrationTest
{
Review Comment:
a lot of common part of the code wrt setting up kinesis has been moved to
BaseKinesisIntegrationTest
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/ingestion/KafkaIncreaseDecreasePartitionsIntegrationTest.java:
##########
@@ -38,41 +36,6 @@ public class KafkaIncreaseDecreasePartitionsIntegrationTest
extends BaseRealtime
private static final String KAFKA_TOPIC = "meetup";
private static final int NUM_PARTITIONS = 1;
- String getExternalView(String tableName)
Review Comment:
these methods have moved to ControllerTest
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]