This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d208da3334 Adding a testcase for testing a table where kafka partition 
has been reduced after creating the table (#15028)
d208da3334 is described below

commit d208da3334e4afc2a9a9c1255bba12a1a3c339ff
Author: soumitra-st <[email protected]>
AuthorDate: Thu Feb 13 11:03:23 2025 -0800

    Adding a testcase for testing a table where kafka partition has been 
reduced after creating the table (#15028)
    
    Adding a testcase to reproduce the behavior fixed in #14392 PR.
---
 ...aIncreaseDecreasePartitionsIntegrationTest.java | 158 +++++++++++++++++++++
 .../simpleMeetup_realtime_table_config.json        |  40 ++++++
 .../src/test/resources/simpleMeetup_schema.json    |  21 +++
 .../kafka20/server/KafkaDataServerStartable.java   |  37 ++++-
 .../spi/stream/StreamDataServerStartable.java      |  19 +++
 .../utils/builder/ControllerRequestURLBuilder.java |   8 ++
 6 files changed, 282 insertions(+), 1 deletion(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaIncreaseDecreasePartitionsIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaIncreaseDecreasePartitionsIntegrationTest.java
new file mode 100644
index 0000000000..4ea04f9895
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaIncreaseDecreasePartitionsIntegrationTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.controller.api.resources.PauseStatusDetails;
+import org.apache.pinot.controller.api.resources.TableViews;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.tools.utils.KafkaStarterUtils;
+import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+
+public class KafkaIncreaseDecreasePartitionsIntegrationTest extends 
BaseRealtimeClusterIntegrationTest {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaIncreaseDecreasePartitionsIntegrationTest.class);
+
+  private static final String KAFKA_TOPIC = "meetup";
+  private static final int NUM_PARTITIONS = 1;
+
+  String getExternalView(String tableName)
+      throws IOException {
+    return 
sendGetRequest(getControllerRequestURLBuilder().forExternalView(tableName));
+  }
+
+  void pauseTable(String tableName)
+      throws IOException {
+    
sendPostRequest(getControllerRequestURLBuilder().forPauseConsumption(tableName));
+    TestUtils.waitForCondition((aVoid) -> {
+      try {
+        PauseStatusDetails pauseStatusDetails =
+            
JsonUtils.stringToObject(sendGetRequest(getControllerRequestURLBuilder().forPauseStatus(tableName)),
+                PauseStatusDetails.class);
+        return pauseStatusDetails.getConsumingSegments().isEmpty();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 60_000L, "Failed to pause table: " + tableName);
+  }
+
+  void resumeTable(String tableName)
+      throws IOException {
+    
sendPostRequest(getControllerRequestURLBuilder().forResumeConsumption(tableName));
+    TestUtils.waitForCondition((aVoid) -> {
+      try {
+        PauseStatusDetails pauseStatusDetails =
+            
JsonUtils.stringToObject(sendGetRequest(getControllerRequestURLBuilder().forPauseStatus(tableName)),
+                PauseStatusDetails.class);
+        return !pauseStatusDetails.getConsumingSegments().isEmpty();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 60_000L, "Failed to resume table: " + tableName);
+  }
+
+  String createTable()
+      throws IOException {
+    Schema schema = createSchema("simpleMeetup_schema.json");
+    addSchema(schema);
+    TableConfig tableConfig = JsonUtils.inputStreamToObject(
+        
getClass().getClassLoader().getResourceAsStream("simpleMeetup_realtime_table_config.json"),
 TableConfig.class);
+    addTableConfig(tableConfig);
+    return tableConfig.getTableName();
+  }
+
+  void waitForNumConsumingSegmentsInEV(String tableName, int 
desiredNumConsumingSegments) {
+    TestUtils.waitForCondition((aVoid) -> {
+          try {
+            AtomicInteger numConsumingSegments = new AtomicInteger(0);
+            String state = getExternalView(tableName);
+            TableViews.TableView tableView = JsonUtils.stringToObject(state, 
TableViews.TableView.class);
+            tableView._realtime.values().forEach((v) -> {
+              numConsumingSegments.addAndGet((int) 
v.values().stream().filter((v1) -> v1.equals("CONSUMING")).count());
+            });
+            return numConsumingSegments.get() == desiredNumConsumingSegments;
+          } catch (IOException e) {
+            LOGGER.error("Exception in waitForNumConsumingSegments: {}", 
e.getMessage());
+            return false;
+          }
+        }, 5000, 300_000L,
+        "Failed to wait for " + desiredNumConsumingSegments + " consuming 
segments for table: " + tableName);
+  }
+
+  @Test
+  public void testDecreasePartitions()
+      throws Exception {
+    LOGGER.info("Starting testDecreasePartitions");
+    LOGGER.info("Creating Kafka topic with {} partitions", NUM_PARTITIONS + 2);
+    _kafkaStarters.get(0).createTopic(KAFKA_TOPIC, 
KafkaStarterUtils.getTopicCreationProps(NUM_PARTITIONS + 2));
+    String tableName = createTable();
+    waitForNumConsumingSegmentsInEV(tableName, NUM_PARTITIONS + 2);
+
+    pauseTable(tableName);
+
+    LOGGER.info("Deleting Kafka topic");
+    _kafkaStarters.get(0).deleteTopic(KAFKA_TOPIC);
+    LOGGER.info("Creating Kafka topic with {} partitions", NUM_PARTITIONS);
+    _kafkaStarters.get(0).createTopic(KAFKA_TOPIC, 
KafkaStarterUtils.getTopicCreationProps(NUM_PARTITIONS));
+
+    resumeTable(tableName);
+    waitForNumConsumingSegmentsInEV(tableName, NUM_PARTITIONS);
+  }
+
+  @Test(enabled = false)
+  public void testDictionaryBasedQueries(boolean useMultiStageQueryEngine) {
+    // Do nothing
+  }
+
+  @Test(enabled = false)
+  public void testGeneratedQueries(boolean useMultiStageQueryEngine) {
+    // Do nothing
+  }
+
+  @Test(enabled = false)
+  public void testHardcodedQueries(boolean useMultiStageQueryEngine) {
+    // Do nothing
+  }
+
+  @Test(enabled = false)
+  public void testInstanceShutdown() {
+    // Do nothing
+  }
+
+  @Test(enabled = false)
+  public void testQueriesFromQueryFile(boolean useMultiStageQueryEngine) {
+    // Do nothing
+  }
+
+  @Test(enabled = false)
+  public void testQueryExceptions(boolean useMultiStageQueryEngine) {
+    // Do nothing
+  }
+
+  @Test(enabled = false)
+  public void testHardcodedServerPartitionedSqlQueries() {
+    // Do nothing
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/resources/simpleMeetup_realtime_table_config.json
 
b/pinot-integration-tests/src/test/resources/simpleMeetup_realtime_table_config.json
new file mode 100644
index 0000000000..a6ab5afd99
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/resources/simpleMeetup_realtime_table_config.json
@@ -0,0 +1,40 @@
+{
+  "tableName": "upsertMeetupRsvp",
+  "tableType": "REALTIME",
+  "segmentsConfig": {
+    "timeColumnName": "mtime",
+    "timeType": "MILLISECONDS",
+    "segmentPushType": "APPEND",
+    "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
+    "schemaName": "upsertMeetupRsvp",
+    "replicasPerPartition": "1",
+    "replicaGroupStrategyConfig": {
+      "partitionColumn": "event_id",
+      "numInstancesPerPartition": 1
+    }
+  },
+  "tenants": {},
+  "tableIndexConfig": {
+    "loadMode": "MMAP",
+    "streamConfigs": {
+      "stream.kafka.topic.name": "meetup",
+      "bootstrap.servers": "localhost:19092",
+      "stream.kafka.broker.list": "localhost:19092",
+      "streamType": "kafka",
+      "stream.kafka.consumer.type": "lowLevel",
+      "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
+      "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
+      "realtime.segment.flush.threshold.size": 30,
+      "realtime.segment.flush.threshold.rows": 30
+    }
+  },
+  "fieldConfigList": [
+  ],
+  "metadata": {
+    "customConfigs": {}
+  },
+  "routing": {
+    "segmentPrunerTypes": ["partition"],
+    "instanceSelectorType": "strictReplicaGroup"
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/resources/simpleMeetup_schema.json 
b/pinot-integration-tests/src/test/resources/simpleMeetup_schema.json
new file mode 100644
index 0000000000..0b98e87012
--- /dev/null
+++ b/pinot-integration-tests/src/test/resources/simpleMeetup_schema.json
@@ -0,0 +1,21 @@
+{
+    "metricFieldSpecs": [],
+    "dimensionFieldSpecs": [
+      {
+        "dataType": "STRING",
+        "name": "event_id"
+      }
+    ],
+    "dateTimeFieldSpecs": [
+      {
+        "name": "mtime",
+        "dataType": "TIMESTAMP",
+        "format": "1:MILLISECONDS:TIMESTAMP",
+        "granularity": "1:MILLISECONDS"
+      }
+    ],
+    "schemaName": "upsertMeetupRsvp",
+    "primaryKeyColumns": [
+      "event_id"
+    ]
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.java
index e843b5c5a0..15e26b7f3f 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.java
@@ -22,6 +22,7 @@ import com.google.common.base.Function;
 import java.io.File;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -34,6 +35,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.NewPartitions;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.utils.Time;
 import org.apache.pinot.spi.stream.StreamDataServerStartable;
@@ -109,7 +111,40 @@ public class KafkaDataServerStartable implements 
StreamDataServerStartable {
           return null;
         }
       }
-    }, 1000L, 30000, "Kafka topic " + topic + " is not created yet");
+    }, 1000L, 300000, "Kafka topic " + topic + " is not created yet");
+  }
+
+  @Override
+  public void deleteTopic(String topic) {
+    try {
+      _adminClient.deleteTopics(Collections.singletonList(topic)).all().get();
+    } catch (Exception e) {
+      LOGGER.error("Could not delete topic: {}", topic);
+    }
+
+    waitForCondition(new Function<Void, Boolean>() {
+      @Nullable
+      @Override
+      public Boolean apply(@Nullable Void aVoid) {
+        try {
+          return !_adminClient.listTopics().names().get().contains(topic);
+        } catch (Exception e) {
+          LOGGER.warn("Could not fetch Kafka topics", e);
+          return null;
+        }
+      }
+    }, 1000L, 300000, "Kafka topic " + topic + " is not deleted yet");
+  }
+
+  @Override
+  public void createPartitions(String topic, int numPartitions) {
+    try {
+      Map<String, NewPartitions> newPartitionMap = new HashMap<>();
+      newPartitionMap.put(topic, NewPartitions.increaseTo(numPartitions));
+      _adminClient.createPartitions(newPartitionMap).all().get();
+    } catch (Exception e) {
+      LOGGER.error("Failed to create partitions for topic: {}", topic, e);
+    }
   }
 
   @Override
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataServerStartable.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataServerStartable.java
index f1ce97cbdf..9e66beeffb 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataServerStartable.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataServerStartable.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.spi.stream;
 
 import java.util.Properties;
+import org.apache.commons.lang3.NotImplementedException;
 
 
 /**
@@ -54,6 +55,24 @@ public interface StreamDataServerStartable {
    */
   void createTopic(String topic, Properties topicProps);
 
+  /**
+   * Delete a data stream (e.g Kafka topic) in the server.
+   *
+   * @param topic
+   */
+  default void deleteTopic(String topic) {
+    throw new NotImplementedException("deleteTopic is not implemented!");
+  }
+
+  /**
+   *
+   * @param topic
+   * @param numPartitions
+   */
+  default void createPartitions(String topic, int numPartitions) {
+    throw new NotImplementedException("createPartitions is not implemented!");
+  }
+
   /**
    * Get the port of the server.
    */
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 9c0ede2683..54c7ac9cb4 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -618,4 +618,12 @@ public class ControllerRequestURLBuilder {
   public String forCancelQueryByClientId(String clientRequestId) {
     return StringUtil.join("/", _baseUrl, "clientQuery", clientRequestId);
   }
+
+  public String forExternalView(String tableName) {
+    return StringUtil.join("/", _baseUrl, "tables", tableName, "externalview");
+  }
+
+  public String forIdealState(String tableName) {
+    return StringUtil.join("/", _baseUrl, "tables", tableName, "idealstate");
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to