This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d208da3334 Adding a testcase for testing a table where kafka partition
has been reduced after creating the table (#15028)
d208da3334 is described below
commit d208da3334e4afc2a9a9c1255bba12a1a3c339ff
Author: soumitra-st <[email protected]>
AuthorDate: Thu Feb 13 11:03:23 2025 -0800
Adding a testcase for testing a table where kafka partition has been
reduced after creating the table (#15028)
Adding a testcase to reproduce the behavior fixed in #14392 PR.
---
...aIncreaseDecreasePartitionsIntegrationTest.java | 158 +++++++++++++++++++++
.../simpleMeetup_realtime_table_config.json | 40 ++++++
.../src/test/resources/simpleMeetup_schema.json | 21 +++
.../kafka20/server/KafkaDataServerStartable.java | 37 ++++-
.../spi/stream/StreamDataServerStartable.java | 19 +++
.../utils/builder/ControllerRequestURLBuilder.java | 8 ++
6 files changed, 282 insertions(+), 1 deletion(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaIncreaseDecreasePartitionsIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaIncreaseDecreasePartitionsIntegrationTest.java
new file mode 100644
index 0000000000..4ea04f9895
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaIncreaseDecreasePartitionsIntegrationTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.controller.api.resources.PauseStatusDetails;
+import org.apache.pinot.controller.api.resources.TableViews;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.tools.utils.KafkaStarterUtils;
+import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+
+public class KafkaIncreaseDecreasePartitionsIntegrationTest extends
BaseRealtimeClusterIntegrationTest {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaIncreaseDecreasePartitionsIntegrationTest.class);
+
+ private static final String KAFKA_TOPIC = "meetup";
+ private static final int NUM_PARTITIONS = 1;
+
+ String getExternalView(String tableName)
+ throws IOException {
+ return
sendGetRequest(getControllerRequestURLBuilder().forExternalView(tableName));
+ }
+
+ void pauseTable(String tableName)
+ throws IOException {
+
sendPostRequest(getControllerRequestURLBuilder().forPauseConsumption(tableName));
+ TestUtils.waitForCondition((aVoid) -> {
+ try {
+ PauseStatusDetails pauseStatusDetails =
+
JsonUtils.stringToObject(sendGetRequest(getControllerRequestURLBuilder().forPauseStatus(tableName)),
+ PauseStatusDetails.class);
+ return pauseStatusDetails.getConsumingSegments().isEmpty();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }, 60_000L, "Failed to pause table: " + tableName);
+ }
+
+ void resumeTable(String tableName)
+ throws IOException {
+
sendPostRequest(getControllerRequestURLBuilder().forResumeConsumption(tableName));
+ TestUtils.waitForCondition((aVoid) -> {
+ try {
+ PauseStatusDetails pauseStatusDetails =
+
JsonUtils.stringToObject(sendGetRequest(getControllerRequestURLBuilder().forPauseStatus(tableName)),
+ PauseStatusDetails.class);
+ return !pauseStatusDetails.getConsumingSegments().isEmpty();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }, 60_000L, "Failed to resume table: " + tableName);
+ }
+
+ String createTable()
+ throws IOException {
+ Schema schema = createSchema("simpleMeetup_schema.json");
+ addSchema(schema);
+ TableConfig tableConfig = JsonUtils.inputStreamToObject(
+
getClass().getClassLoader().getResourceAsStream("simpleMeetup_realtime_table_config.json"),
TableConfig.class);
+ addTableConfig(tableConfig);
+ return tableConfig.getTableName();
+ }
+
+ void waitForNumConsumingSegmentsInEV(String tableName, int
desiredNumConsumingSegments) {
+ TestUtils.waitForCondition((aVoid) -> {
+ try {
+ AtomicInteger numConsumingSegments = new AtomicInteger(0);
+ String state = getExternalView(tableName);
+ TableViews.TableView tableView = JsonUtils.stringToObject(state,
TableViews.TableView.class);
+ tableView._realtime.values().forEach((v) -> {
+ numConsumingSegments.addAndGet((int)
v.values().stream().filter((v1) -> v1.equals("CONSUMING")).count());
+ });
+ return numConsumingSegments.get() == desiredNumConsumingSegments;
+ } catch (IOException e) {
+ LOGGER.error("Exception in waitForNumConsumingSegments: {}",
e.getMessage());
+ return false;
+ }
+ }, 5000, 300_000L,
+ "Failed to wait for " + desiredNumConsumingSegments + " consuming
segments for table: " + tableName);
+ }
+
+ @Test
+ public void testDecreasePartitions()
+ throws Exception {
+ LOGGER.info("Starting testDecreasePartitions");
+ LOGGER.info("Creating Kafka topic with {} partitions", NUM_PARTITIONS + 2);
+ _kafkaStarters.get(0).createTopic(KAFKA_TOPIC,
KafkaStarterUtils.getTopicCreationProps(NUM_PARTITIONS + 2));
+ String tableName = createTable();
+ waitForNumConsumingSegmentsInEV(tableName, NUM_PARTITIONS + 2);
+
+ pauseTable(tableName);
+
+ LOGGER.info("Deleting Kafka topic");
+ _kafkaStarters.get(0).deleteTopic(KAFKA_TOPIC);
+ LOGGER.info("Creating Kafka topic with {} partitions", NUM_PARTITIONS);
+ _kafkaStarters.get(0).createTopic(KAFKA_TOPIC,
KafkaStarterUtils.getTopicCreationProps(NUM_PARTITIONS));
+
+ resumeTable(tableName);
+ waitForNumConsumingSegmentsInEV(tableName, NUM_PARTITIONS);
+ }
+
+ @Test(enabled = false)
+ public void testDictionaryBasedQueries(boolean useMultiStageQueryEngine) {
+ // Do nothing
+ }
+
+ @Test(enabled = false)
+ public void testGeneratedQueries(boolean useMultiStageQueryEngine) {
+ // Do nothing
+ }
+
+ @Test(enabled = false)
+ public void testHardcodedQueries(boolean useMultiStageQueryEngine) {
+ // Do nothing
+ }
+
+ @Test(enabled = false)
+ public void testInstanceShutdown() {
+ // Do nothing
+ }
+
+ @Test(enabled = false)
+ public void testQueriesFromQueryFile(boolean useMultiStageQueryEngine) {
+ // Do nothing
+ }
+
+ @Test(enabled = false)
+ public void testQueryExceptions(boolean useMultiStageQueryEngine) {
+ // Do nothing
+ }
+
+ @Test(enabled = false)
+ public void testHardcodedServerPartitionedSqlQueries() {
+ // Do nothing
+ }
+}
diff --git
a/pinot-integration-tests/src/test/resources/simpleMeetup_realtime_table_config.json
b/pinot-integration-tests/src/test/resources/simpleMeetup_realtime_table_config.json
new file mode 100644
index 0000000000..a6ab5afd99
--- /dev/null
+++
b/pinot-integration-tests/src/test/resources/simpleMeetup_realtime_table_config.json
@@ -0,0 +1,40 @@
+{
+ "tableName": "upsertMeetupRsvp",
+ "tableType": "REALTIME",
+ "segmentsConfig": {
+ "timeColumnName": "mtime",
+ "timeType": "MILLISECONDS",
+ "segmentPushType": "APPEND",
+ "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
+ "schemaName": "upsertMeetupRsvp",
+ "replicasPerPartition": "1",
+ "replicaGroupStrategyConfig": {
+ "partitionColumn": "event_id",
+ "numInstancesPerPartition": 1
+ }
+ },
+ "tenants": {},
+ "tableIndexConfig": {
+ "loadMode": "MMAP",
+ "streamConfigs": {
+ "stream.kafka.topic.name": "meetup",
+ "bootstrap.servers": "localhost:19092",
+ "stream.kafka.broker.list": "localhost:19092",
+ "streamType": "kafka",
+ "stream.kafka.consumer.type": "lowLevel",
+ "stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
+ "stream.kafka.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
+ "realtime.segment.flush.threshold.size": 30,
+ "realtime.segment.flush.threshold.rows": 30
+ }
+ },
+ "fieldConfigList": [
+ ],
+ "metadata": {
+ "customConfigs": {}
+ },
+ "routing": {
+ "segmentPrunerTypes": ["partition"],
+ "instanceSelectorType": "strictReplicaGroup"
+ }
+}
diff --git
a/pinot-integration-tests/src/test/resources/simpleMeetup_schema.json
b/pinot-integration-tests/src/test/resources/simpleMeetup_schema.json
new file mode 100644
index 0000000000..0b98e87012
--- /dev/null
+++ b/pinot-integration-tests/src/test/resources/simpleMeetup_schema.json
@@ -0,0 +1,21 @@
+{
+ "metricFieldSpecs": [],
+ "dimensionFieldSpecs": [
+ {
+ "dataType": "STRING",
+ "name": "event_id"
+ }
+ ],
+ "dateTimeFieldSpecs": [
+ {
+ "name": "mtime",
+ "dataType": "TIMESTAMP",
+ "format": "1:MILLISECONDS:TIMESTAMP",
+ "granularity": "1:MILLISECONDS"
+ }
+ ],
+ "schemaName": "upsertMeetupRsvp",
+ "primaryKeyColumns": [
+ "event_id"
+ ]
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.java
index e843b5c5a0..15e26b7f3f 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.java
@@ -22,6 +22,7 @@ import com.google.common.base.Function;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -34,6 +35,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.utils.Time;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
@@ -109,7 +111,40 @@ public class KafkaDataServerStartable implements
StreamDataServerStartable {
return null;
}
}
- }, 1000L, 30000, "Kafka topic " + topic + " is not created yet");
+ }, 1000L, 300000, "Kafka topic " + topic + " is not created yet");
+ }
+
+ @Override
+ public void deleteTopic(String topic) {
+ try {
+ _adminClient.deleteTopics(Collections.singletonList(topic)).all().get();
+ } catch (Exception e) {
+ LOGGER.error("Could not delete topic: {}", topic);
+ }
+
+ waitForCondition(new Function<Void, Boolean>() {
+ @Nullable
+ @Override
+ public Boolean apply(@Nullable Void aVoid) {
+ try {
+ return !_adminClient.listTopics().names().get().contains(topic);
+ } catch (Exception e) {
+ LOGGER.warn("Could not fetch Kafka topics", e);
+ return null;
+ }
+ }
+ }, 1000L, 300000, "Kafka topic " + topic + " is not deleted yet");
+ }
+
+ @Override
+ public void createPartitions(String topic, int numPartitions) {
+ try {
+ Map<String, NewPartitions> newPartitionMap = new HashMap<>();
+ newPartitionMap.put(topic, NewPartitions.increaseTo(numPartitions));
+ _adminClient.createPartitions(newPartitionMap).all().get();
+ } catch (Exception e) {
+ LOGGER.error("Failed to create partitions for topic: {}", topic, e);
+ }
}
@Override
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataServerStartable.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataServerStartable.java
index f1ce97cbdf..9e66beeffb 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataServerStartable.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataServerStartable.java
@@ -19,6 +19,7 @@
package org.apache.pinot.spi.stream;
import java.util.Properties;
+import org.apache.commons.lang3.NotImplementedException;
/**
@@ -54,6 +55,24 @@ public interface StreamDataServerStartable {
*/
void createTopic(String topic, Properties topicProps);
+ /**
+ * Delete a data stream (e.g Kafka topic) in the server.
+ *
+ * @param topic
+ */
+ default void deleteTopic(String topic) {
+ throw new NotImplementedException("deleteTopic is not implemented!");
+ }
+
+ /**
+ *
+ * @param topic
+ * @param numPartitions
+ */
+ default void createPartitions(String topic, int numPartitions) {
+ throw new NotImplementedException("createPartitions is not implemented!");
+ }
+
/**
* Get the port of the server.
*/
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 9c0ede2683..54c7ac9cb4 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -618,4 +618,12 @@ public class ControllerRequestURLBuilder {
public String forCancelQueryByClientId(String clientRequestId) {
return StringUtil.join("/", _baseUrl, "clientQuery", clientRequestId);
}
+
+ public String forExternalView(String tableName) {
+ return StringUtil.join("/", _baseUrl, "tables", tableName, "externalview");
+ }
+
+ public String forIdealState(String tableName) {
+ return StringUtil.join("/", _baseUrl, "tables", tableName, "idealstate");
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]