This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 42897ad448 Remove support for High level consumers in Apache Pinot 
(#11017)
42897ad448 is described below

commit 42897ad448735cd1f80c11f8b9fd882df04c0a5b
Author: Navina Ramesh <[email protected]>
AuthorDate: Tue Aug 1 23:33:54 2023 -0700

    Remove support for High level consumers in Apache Pinot (#11017)
---
 .../broker/broker/FakeStreamConsumerFactory.java   | 100 +++++++++++++++++
 .../broker/broker/HelixBrokerStarterTest.java      |   7 +-
 .../pinot/common/utils/config/TableConfigTest.java |  11 --
 .../pinot/controller/BaseControllerStarter.java    |  38 ++++---
 .../apache/pinot/controller/ControllerConf.java    |   9 +-
 .../helix/core/PinotHelixResourceManager.java      |  10 +-
 .../api/PinotTableRestletResourceTest.java         |   5 +-
 .../pinot/controller/api/TableViewsTest.java       |   2 +-
 .../PinotHelixResourceManagerStatelessTest.java    |   2 +-
 .../impl/fakestream/FakeStreamConfigUtils.java     |  12 --
 .../impl/fakestream/FakeStreamConsumerFactory.java |   2 +-
 .../impl/fakestream/FakeStreamLevelConsumer.java   |  48 --------
 .../core/realtime/stream/StreamConfigTest.java     |  61 ++++++-----
 .../apache/pinot/core/util/SchemaUtilsTest.java    |  20 +++-
 ...yConsumerHLCRealtimeClusterIntegrationTest.java | 121 ---------------------
 .../tests/HLCRealtimeClusterIntegrationTest.java   |  30 -----
 .../stream/kafka20/KafkaStreamLevelConsumer.java   |   2 +
 .../stream/pulsar/PulsarStreamLevelConsumer.java   |   2 +
 .../segment/local/utils/TableConfigUtils.java      |  29 ++---
 .../segment/local/utils/TableConfigUtilsTest.java  |  48 ++------
 .../org/apache/pinot/spi/stream/StreamConfig.java  |  28 ++---
 .../pinot/spi/stream/StreamLevelConsumer.java      |   2 +
 22 files changed, 232 insertions(+), 357 deletions(-)

diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
new file mode 100644
index 0000000000..be2f6d8a6a
--- /dev/null
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.broker;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+
+public class FakeStreamConsumerFactory extends StreamConsumerFactory {
+  @Override
+  public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, 
int partition) {
+    return new FakePartitionLevelConsumer();
+  }
+
+  @Override
+  public StreamLevelConsumer createStreamLevelConsumer(String clientId, String 
tableName, Set<String> fieldsToRead,
+      String groupId) {
+    return null;
+  }
+
+  @Override
+  public StreamMetadataProvider createPartitionMetadataProvider(String 
clientId, int partition) {
+    return new FakesStreamMetadataProvider();
+  }
+
+  @Override
+  public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
+    return new FakesStreamMetadataProvider();
+  }
+
+  public class FakePartitionLevelConsumer implements PartitionLevelConsumer {
+
+    @Override
+    public MessageBatch fetchMessages(long startOffset, long endOffset, int 
timeoutMillis)
+        throws TimeoutException {
+      return null;
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+    }
+  }
+
+  public class FakesStreamMetadataProvider implements StreamMetadataProvider {
+
+    @Override
+    public List<PartitionGroupMetadata> computePartitionGroupMetadata(String 
clientId, StreamConfig streamConfig,
+        List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatuses, int timeoutMillis)
+        throws IOException, TimeoutException {
+      return Collections.singletonList(new PartitionGroupMetadata(0, new 
LongMsgOffset(0)));
+    }
+
+    @Override
+    public int fetchPartitionCount(long timeoutMillis) {
+      return 1;
+    }
+
+    @Override
+    public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria 
offsetCriteria, long timeoutMillis)
+        throws TimeoutException {
+      return new LongMsgOffset(0);
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+    }
+  }
+}
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index 4a7cee97ff..edd3bae35e 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -95,7 +95,8 @@ public class HelixBrokerStarterTest extends ControllerTest {
     _helixResourceManager.addTable(offlineTableConfig);
     TableConfig realtimeTimeConfig =
         new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
-            
.setTimeType(TimeUnit.DAYS.name()).setStreamConfigs(getStreamConfigs()).build();
+            
.setTimeType(TimeUnit.DAYS.name()).setStreamConfigs(getStreamConfigs()).setNumReplicas(1)
+            .build();
     _helixResourceManager.addTable(realtimeTimeConfig);
 
     for (int i = 0; i < NUM_OFFLINE_SEGMENTS; i++) {
@@ -118,10 +119,12 @@ public class HelixBrokerStarterTest extends 
ControllerTest {
   private Map<String, String> getStreamConfigs() {
     Map<String, String> streamConfigs = new HashMap<>();
     streamConfigs.put("streamType", "kafka");
-    streamConfigs.put("stream.kafka.consumer.type", "highLevel");
+    streamConfigs.put("stream.kafka.consumer.type", "lowlevel");
     streamConfigs.put("stream.kafka.topic.name", "kafkaTopic");
     streamConfigs.put("stream.kafka.decoder.class.name",
         "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder");
+    streamConfigs.put("stream.kafka.consumer.factory.class.name",
+        "org.apache.pinot.broker.broker.FakeStreamConsumerFactory");
     return streamConfigs;
   }
 
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigTest.java
index 100cd62c59..cef1b0cc0e 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigTest.java
@@ -76,17 +76,6 @@ public class TableConfigTest {
     offlineTableConfig.getValidationConfig().setReplicasPerPartition("3");
     assertEquals(4, offlineTableConfig.getReplication());
 
-    TableConfig realtimeHLCTableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName(TEST_REALTIME_HLC_TABLE_NAME)
-            
.setStreamConfigs(getStreamConfigMap("highlevel")).setNumReplicas(2).build();
-    assertEquals(2, realtimeHLCTableConfig.getReplication());
-
-    realtimeHLCTableConfig.getValidationConfig().setReplication("4");
-    assertEquals(4, realtimeHLCTableConfig.getReplication());
-
-    realtimeHLCTableConfig.getValidationConfig().setReplicasPerPartition("3");
-    assertEquals(4, realtimeHLCTableConfig.getReplication());
-
     TableConfig realtimeLLCTableConfig =
         new 
TableConfigBuilder(TableType.REALTIME).setTableName(TEST_REALTIME_LLC_TABLE_NAME)
             
.setStreamConfigs(getStreamConfigMap("lowlevel")).setLLC(true).setNumReplicas(2).build();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index e32a419ecf..594ca27bfd 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -84,7 +84,6 @@ import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManag
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
-import 
org.apache.pinot.controller.helix.core.realtime.PinotRealtimeSegmentManager;
 import 
org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
 import org.apache.pinot.controller.helix.core.relocation.SegmentRelocator;
 import org.apache.pinot.controller.helix.core.retention.RetentionManager;
@@ -101,6 +100,7 @@ import 
org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
 import 
org.apache.pinot.core.segment.processing.lifecycle.PinotSegmentLifecycleEventListenerManager;
 import org.apache.pinot.core.transport.ListenerConfig;
 import org.apache.pinot.core.util.ListenerConfigUtil;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.crypt.PinotCrypterFactory;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -108,7 +108,10 @@ import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
 import org.apache.pinot.spi.services.ServiceRole;
 import org.apache.pinot.spi.services.ServiceStartable;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.InstanceTypeUtils;
 import org.apache.pinot.spi.utils.NetUtils;
 import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
@@ -163,7 +166,6 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
   protected TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> 
_taskManagerStatusCache;
   protected PeriodicTaskScheduler _periodicTaskScheduler;
   protected PinotHelixTaskResourceManager _helixTaskResourceManager;
-  protected PinotRealtimeSegmentManager _realtimeSegmentsManager;
   protected PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
   protected SegmentCompletionManager _segmentCompletionManager;
   protected LeadControllerManager _leadControllerManager;
@@ -430,15 +432,6 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
         new SegmentCompletionManager(_helixParticipantManager, 
_pinotLLCRealtimeSegmentManager, _controllerMetrics,
             _leadControllerManager, _config.getSegmentCommitTimeoutSeconds());
 
-    if (_config.getHLCTablesAllowed()) {
-      LOGGER.info("Realtime tables with High Level consumers will be 
supported");
-      _realtimeSegmentsManager =
-          new PinotRealtimeSegmentManager(_helixResourceManager, 
_leadControllerManager, _config);
-      _realtimeSegmentsManager.start(_controllerMetrics);
-    } else {
-      LOGGER.info("Realtime tables with High Level consumers will NOT be 
supported");
-      _realtimeSegmentsManager = null;
-    }
     _sqlQueryExecutor = new SqlQueryExecutor(_config.generateVipUrl());
 
     _connectionManager = new MultiThreadedHttpConnectionManager();
@@ -502,6 +495,24 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
 
     LOGGER.info("Starting controller admin application on: {}", 
ListenerConfigUtil.toString(_listenerConfigs));
     _adminApp.start(_listenerConfigs);
+    List<String> existingHlcTables = new ArrayList<>();
+    _helixResourceManager.getAllRealtimeTables().forEach(rt -> {
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(rt);
+      if (tableConfig != null) {
+        Map<String, String> streamConfigMap = 
IngestionConfigUtils.getStreamConfigMap(tableConfig);
+        try {
+          StreamConfig.validateConsumerType(
+              streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, 
"kafka"), streamConfigMap);
+        } catch (Exception e) {
+          existingHlcTables.add(rt);
+        }
+      }
+    });
+    if (existingHlcTables.size() > 0) {
+      LOGGER.error("High Level Consumer (HLC) based realtime tables are no 
longer supported. Please delete the "
+          + "following HLC tables before proceeding: {}\n", existingHlcTables);
+      throw new RuntimeException("Unable to start controller due to existing 
HLC tables!");
+    }
 
     _controllerMetrics.addCallbackGauge("dataDir.exists", () -> new 
File(_config.getDataDir()).exists() ? 1L : 0L);
     _controllerMetrics.addCallbackGauge("dataDir.fileOpLatencyMs", () -> {
@@ -758,11 +769,6 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
       LOGGER.info("Stopping Jersey admin API");
       _adminApp.stop();
 
-      if (_realtimeSegmentsManager != null) {
-        LOGGER.info("Stopping realtime segment manager");
-        _realtimeSegmentsManager.stop();
-      }
-
       LOGGER.info("Stopping resource manager");
       _helixResourceManager.stop();
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index e519f2b8f8..4c677a57fa 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -298,7 +298,8 @@ public class ControllerConf extends PinotConfiguration {
   private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 
64;
   private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true;
   private static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = false;
-  private static final boolean DEFAULT_ALLOW_HLC_TABLES = true;
+  // Disallow any high level consumer (HLC) table
+  private static final boolean DEFAULT_ALLOW_HLC_TABLES = false;
   private static final String DEFAULT_CONTROLLER_MODE = 
ControllerMode.DUAL.name();
   private static final String 
DEFAULT_LEAD_CONTROLLER_RESOURCE_REBALANCE_STRATEGY =
       AutoRebalanceStrategy.class.getName();
@@ -993,11 +994,7 @@ public class ControllerConf extends PinotConfiguration {
   }
 
   public boolean getHLCTablesAllowed() {
-    return getProperty(ALLOW_HLC_TABLES, DEFAULT_ALLOW_HLC_TABLES);
-  }
-
-  public void setHLCTablesAllowed(boolean allowHLCTables) {
-    setProperty(ALLOW_HLC_TABLES, allowHLCTables);
+    return DEFAULT_ALLOW_HLC_TABLES;
   }
 
   public String getMetricsPrefix() {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 346a19a1d6..ad784b2672 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -220,7 +220,6 @@ public class PinotHelixResourceManager {
   private final String _dataDir;
   private final boolean _isSingleTenantCluster;
   private final boolean _enableBatchMessageMode;
-  private final boolean _allowHLCTables;
   private final int _deletedSegmentsRetentionInDays;
   private final boolean _enableTieredSegmentAssignment;
 
@@ -235,7 +234,7 @@ public class PinotHelixResourceManager {
   private final LineageManager _lineageManager;
 
   public PinotHelixResourceManager(String zkURL, String helixClusterName, 
@Nullable String dataDir,
-      boolean isSingleTenantCluster, boolean enableBatchMessageMode, boolean 
allowHLCTables,
+      boolean isSingleTenantCluster, boolean enableBatchMessageMode,
       int deletedSegmentsRetentionInDays, boolean 
enableTieredSegmentAssignment, LineageManager lineageManager) {
     _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
     _helixClusterName = helixClusterName;
@@ -243,7 +242,6 @@ public class PinotHelixResourceManager {
     _isSingleTenantCluster = isSingleTenantCluster;
     _enableBatchMessageMode = enableBatchMessageMode;
     _deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
-    _allowHLCTables = allowHLCTables;
     _enableTieredSegmentAssignment = enableTieredSegmentAssignment;
     _instanceAdminEndpointCache =
         
CacheBuilder.newBuilder().expireAfterWrite(CACHE_ENTRY_EXPIRE_TIME_HOURS, 
TimeUnit.HOURS)
@@ -265,8 +263,8 @@ public class PinotHelixResourceManager {
   public PinotHelixResourceManager(ControllerConf controllerConf) {
     this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), 
controllerConf.getDataDir(),
         controllerConf.tenantIsolationEnabled(), 
controllerConf.getEnableBatchMessageMode(),
-        controllerConf.getHLCTablesAllowed(), 
controllerConf.getDeletedSegmentsRetentionInDays(),
-        controllerConf.tieredSegmentAssignmentEnabled(), 
LineageManagerFactory.create(controllerConf));
+        controllerConf.getDeletedSegmentsRetentionInDays(), 
controllerConf.tieredSegmentAssignmentEnabled(),
+        LineageManagerFactory.create(controllerConf));
   }
 
   /**
@@ -1753,7 +1751,7 @@ public class PinotHelixResourceManager {
     // Check if HLC table is allowed.
     StreamConfig streamConfig =
         new StreamConfig(tableNameWithType, 
IngestionConfigUtils.getStreamConfigMap(tableConfig));
-    if (streamConfig.hasHighLevelConsumerType() && !_allowHLCTables) {
+    if (streamConfig.hasHighLevelConsumerType()) {
       throw new InvalidTableConfigException(
           "Creating HLC realtime table is not allowed for Table: " + 
tableNameWithType);
     }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
index 8339e3025f..f0d17de26b 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
@@ -58,7 +58,8 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
   private static final String OFFLINE_TABLE_NAME = "testOfflineTable";
   private static final String REALTIME_TABLE_NAME = "testRealtimeTable";
   private final TableConfigBuilder _offlineBuilder = new 
TableConfigBuilder(TableType.OFFLINE);
-  private final TableConfigBuilder _realtimeBuilder = new 
TableConfigBuilder(TableType.REALTIME);
+  private final TableConfigBuilder _realtimeBuilder = new 
TableConfigBuilder(TableType.REALTIME)
+      .setStreamConfigs(Map.of("stream.type", "foo", "consumer.type", 
"lowlevel"));
   private String _createTableUrl;
 
   @BeforeClass
@@ -72,7 +73,7 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
 
     // add schema for realtime table
     DEFAULT_INSTANCE.addDummySchema(REALTIME_TABLE_NAME);
-    StreamConfig streamConfig = 
FakeStreamConfigUtils.getDefaultHighLevelStreamConfigs();
+    StreamConfig streamConfig = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs();
     
_realtimeBuilder.setTableName(REALTIME_TABLE_NAME).setTimeColumnName("timeColumn").setTimeType("DAYS")
         
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("5").setSchemaName(REALTIME_TABLE_NAME)
         .setStreamConfigs(streamConfig.getStreamConfigsMap());
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
index 5e88e7f7c3..279821537a 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
@@ -68,7 +68,7 @@ public class TableViewsTest extends ControllerTest {
 
     // add schema for realtime table
     DEFAULT_INSTANCE.addDummySchema(HYBRID_TABLE_NAME);
-    StreamConfig streamConfig = 
FakeStreamConfigUtils.getDefaultHighLevelStreamConfigs();
+    StreamConfig streamConfig = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(4);
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(HYBRID_TABLE_NAME)
         
.setNumReplicas(DEFAULT_MIN_NUM_REPLICAS).setStreamConfigs(streamConfig.getStreamConfigsMap()).build();
     DEFAULT_INSTANCE.getHelixResourceManager().addTable(tableConfig);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index 5a96a4eea9..818a42e534 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -367,7 +367,7 @@ public class PinotHelixResourceManagerStatelessTest extends 
ControllerTest {
     tableConfig =
         new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME)
             .setServerTenant(SERVER_TENANT_NAME)
-            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultHighLevelStreamConfigs().getStreamConfigsMap()).build();
+            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
     waitForEVToDisappear(tableConfig.getTableName());
     _helixResourceManager.addTable(tableConfig);
     waitForTableOnlineInBrokerResourceEV(REALTIME_TABLE_NAME);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
index 75de0fbee9..8d06b86a59 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
@@ -156,18 +156,6 @@ public class FakeStreamConfigUtils {
     return getDefaultLowLevelStreamConfigs(DEFAULT_NUM_PARTITIONS);
   }
 
-  /**
-   * Generate fake stream configs for high level stream
-   */
-  public static StreamConfig getDefaultHighLevelStreamConfigs() {
-    Map<String, String> streamConfigMap = getDefaultStreamConfigs();
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, 
StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            StreamConfig.ConsumerType.HIGHLEVEL.toString());
-
-    return new StreamConfig(TABLE_NAME_WITH_TYPE, streamConfigMap);
-  }
-
   private static Map<String, String> getDefaultStreamConfigs() {
     Map<String, String> streamConfigMap = new HashMap<>();
     streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, STREAM_TYPE);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index 1923a4d822..fbcc68f56c 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -53,7 +53,7 @@ public class FakeStreamConsumerFactory extends 
StreamConsumerFactory {
   @Override
   public StreamLevelConsumer createStreamLevelConsumer(String clientId, String 
tableName, Set<String> fieldsToRead,
       String groupId) {
-    return new FakeStreamLevelConsumer();
+    throw new UnsupportedOperationException("Pinot no longer support stream 
level consumers!");
   }
 
   @Override
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamLevelConsumer.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamLevelConsumer.java
deleted file mode 100644
index 3331c24074..0000000000
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamLevelConsumer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.realtime.impl.fakestream;
-
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
-
-
-/**
- * Test implementation of {@link StreamLevelConsumer}
- * This is currently a no-op
- */
-public class FakeStreamLevelConsumer implements StreamLevelConsumer {
-  @Override
-  public void start()
-      throws Exception {
-  }
-
-  @Override
-  public GenericRow next(GenericRow destination) {
-    return destination;
-  }
-
-  @Override
-  public void commit() {
-  }
-
-  @Override
-  public void shutdown()
-      throws Exception {
-  }
-}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
index 2b164ac654..253d609d8f 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
@@ -196,7 +196,7 @@ public class StreamConfigTest {
     Assert.assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(),
         StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES);
 
-    consumerType = "lowLevel,highLevel";
+    consumerType = "lowLevel";
     String offsetCriteria = "smallest";
     String decoderProp1Key = "prop1";
     String decoderProp1Value = "decoderValueString";
@@ -227,7 +227,6 @@ public class StreamConfigTest {
     Assert.assertEquals(streamConfig.getType(), streamType);
     Assert.assertEquals(streamConfig.getTopicName(), topic);
     Assert.assertEquals(streamConfig.getConsumerTypes().get(0), 
StreamConfig.ConsumerType.LOWLEVEL);
-    Assert.assertEquals(streamConfig.getConsumerTypes().get(1), 
StreamConfig.ConsumerType.HIGHLEVEL);
     Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), 
consumerFactoryClass);
     Assert.assertEquals(streamConfig.getDecoderClass(), decoderClass);
     Assert.assertEquals(streamConfig.getDecoderProperties().size(), 1);
@@ -301,7 +300,7 @@ public class StreamConfigTest {
     exception = false;
     try {
       streamConfig = new StreamConfig(tableName, streamConfigMap);
-    } catch (IllegalArgumentException e) {
+    } catch (Exception e) {
       exception = true;
     }
     Assert.assertTrue(exception);
@@ -486,33 +485,37 @@ public class StreamConfigTest {
     Assert.assertTrue(streamConfig.hasLowLevelConsumerType());
     Assert.assertFalse(streamConfig.hasHighLevelConsumerType());
 
-    consumerType = "highLevel";
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, 
StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            consumerType);
-    streamConfig = new StreamConfig(tableName, streamConfigMap);
-    Assert.assertEquals(streamConfig.getConsumerTypes().get(0), 
StreamConfig.ConsumerType.HIGHLEVEL);
-    Assert.assertFalse(streamConfig.hasLowLevelConsumerType());
-    Assert.assertTrue(streamConfig.hasHighLevelConsumerType());
 
-    consumerType = "highLevel,simple";
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, 
StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            consumerType);
-    streamConfig = new StreamConfig(tableName, streamConfigMap);
-    Assert.assertEquals(streamConfig.getConsumerTypes().get(0), 
StreamConfig.ConsumerType.HIGHLEVEL);
-    Assert.assertEquals(streamConfig.getConsumerTypes().get(1), 
StreamConfig.ConsumerType.LOWLEVEL);
-    Assert.assertTrue(streamConfig.hasLowLevelConsumerType());
-    Assert.assertTrue(streamConfig.hasHighLevelConsumerType());
+    try {
+      consumerType = "highLevel";
+      streamConfigMap
+          .put(StreamConfigProperties.constructStreamProperty(streamType, 
StreamConfigProperties.STREAM_CONSUMER_TYPES),
+              consumerType);
+      new StreamConfig(tableName, streamConfigMap);
+      Assert.fail("Invalid consumer type(s) " + consumerType + " in stream 
config");
+    } catch (Exception e) {
+      // expected
+    }
 
-    consumerType = "highLevel,lowlevel";
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, 
StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            consumerType);
-    streamConfig = new StreamConfig(tableName, streamConfigMap);
-    Assert.assertEquals(streamConfig.getConsumerTypes().get(0), 
StreamConfig.ConsumerType.HIGHLEVEL);
-    Assert.assertEquals(streamConfig.getConsumerTypes().get(1), 
StreamConfig.ConsumerType.LOWLEVEL);
-    Assert.assertTrue(streamConfig.hasLowLevelConsumerType());
-    Assert.assertTrue(streamConfig.hasHighLevelConsumerType());
+    try {
+      consumerType = "highLevel,simple";
+      streamConfigMap
+          .put(StreamConfigProperties.constructStreamProperty(streamType, 
StreamConfigProperties.STREAM_CONSUMER_TYPES),
+              consumerType);
+      new StreamConfig(tableName, streamConfigMap);
+      Assert.fail("Invalid consumer type(s) " + consumerType + " in stream 
config");
+    } catch (Exception e) {
+      // expected
+    }
+
+    try {
+      consumerType = "highLevel,lowlevel";
+      streamConfigMap
+          .put(StreamConfigProperties.constructStreamProperty(streamType, 
StreamConfigProperties.STREAM_CONSUMER_TYPES),
+              consumerType);
+      new StreamConfig(tableName, streamConfigMap);
+    } catch (Exception e) {
+      // expected
+    }
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java 
b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
index 57f91aa954..fe38fe910a 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
@@ -22,7 +22,9 @@ import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.segment.local.utils.SchemaUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -136,13 +138,17 @@ public class SchemaUtilsTest {
     schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
         .addDateTime(TIME_COLUMN, DataType.LONG, "1:MILLISECONDS:EPOCH", 
"1:HOURS").build();
     tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+            .setStreamConfigs(getStreamConfigs())
+            .setTimeColumnName(TIME_COLUMN).build();
     SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
 
     // schema doesn't have destination columns from transformConfigs
     schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
         .addDateTime(TIME_COLUMN, DataType.LONG, "1:MILLISECONDS:EPOCH", 
"1:HOURS").build();
-    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setStreamConfigs(getStreamConfigs())
+        .setTimeColumnName(TIME_COLUMN)
         .setIngestionConfig(ingestionConfig).build();
     try {
       SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
@@ -157,6 +163,16 @@ public class SchemaUtilsTest {
     SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
   }
 
+  private Map<String, String> getStreamConfigs() {
+    Map<String, String> streamConfigs = new HashMap<>();
+    streamConfigs.put("streamType", "kafka");
+    streamConfigs.put("stream.kafka.consumer.type", "lowlevel");
+    streamConfigs.put("stream.kafka.topic.name", "test");
+    streamConfigs.put("stream.kafka.decoder.class.name",
+        "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
+    return streamConfigs;
+  }
+
   /**
    * TODO: transform functions have moved to tableConfig#ingestionConfig. 
However, these tests remain to test
    * backward compatibility/
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerHLCRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerHLCRealtimeClusterIntegrationTest.java
deleted file mode 100644
index e8318ab189..0000000000
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerHLCRealtimeClusterIntegrationTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import java.lang.reflect.Constructor;
-import java.util.Random;
-import java.util.Set;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.stream.PartitionLevelConsumer;
-import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.StreamConsumerFactory;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
-import org.apache.pinot.spi.stream.StreamMetadataProvider;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
-
-
-/**
- * Integration test that simulates a flaky Kafka consumer.
- */
-public class FlakyConsumerHLCRealtimeClusterIntegrationTest extends 
HLCRealtimeClusterIntegrationTest {
-
-  @Override
-  protected String getStreamConsumerFactoryClassName() {
-    return FlakyStreamFactory.class.getName();
-  }
-
-  public static class FlakyStreamLevelConsumer implements StreamLevelConsumer {
-    private StreamLevelConsumer _streamLevelConsumer;
-    private Random _random = new Random();
-
-    public FlakyStreamLevelConsumer(String clientId, String tableName, 
StreamConfig streamConfig,
-        Set<String> fieldsToRead, String groupId) {
-      try {
-        final Constructor constructor = 
Class.forName(KafkaStarterUtils.KAFKA_STREAM_LEVEL_CONSUMER_CLASS_NAME)
-            .getConstructor(String.class, String.class, StreamConfig.class, 
Set.class, String.class);
-        _streamLevelConsumer =
-            (StreamLevelConsumer) constructor.newInstance(clientId, tableName, 
streamConfig, fieldsToRead, groupId);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public void start()
-        throws Exception {
-      _streamLevelConsumer.start();
-    }
-
-    @Override
-    public GenericRow next(GenericRow destination) {
-      // Return a null row every ~1/1000 rows and an exception every ~1/1000 
rows
-      int randomValue = _random.nextInt(1000);
-
-      if (randomValue == 0) {
-        return null;
-      } else if (randomValue == 1) {
-        throw new RuntimeException("Flaky stream level consumer exception");
-      } else {
-        return _streamLevelConsumer.next(destination);
-      }
-    }
-
-    @Override
-    public void commit() {
-      // Fail to commit 50% of the time
-      boolean failToCommit = _random.nextBoolean();
-
-      if (failToCommit) {
-        throw new RuntimeException("Flaky stream level consumer exception");
-      } else {
-        _streamLevelConsumer.commit();
-      }
-    }
-
-    @Override
-    public void shutdown()
-        throws Exception {
-      _streamLevelConsumer.shutdown();
-    }
-  }
-
-  public static class FlakyStreamFactory extends StreamConsumerFactory {
-
-    @Override
-    public PartitionLevelConsumer createPartitionLevelConsumer(String 
clientId, int partition) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public StreamLevelConsumer createStreamLevelConsumer(String clientId, 
String tableName, Set<String> fieldsToRead,
-        String groupId) {
-      return new FlakyStreamLevelConsumer(clientId, tableName, _streamConfig, 
fieldsToRead, groupId);
-    }
-
-    @Override
-    public StreamMetadataProvider createPartitionMetadataProvider(String 
clientId, int partition) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public StreamMetadataProvider createStreamMetadataProvider(String 
clientId) {
-      throw new UnsupportedOperationException();
-    }
-  }
-}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HLCRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HLCRealtimeClusterIntegrationTest.java
deleted file mode 100644
index 0f57443677..0000000000
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HLCRealtimeClusterIntegrationTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-/**
- * Integration test for high-level Kafka consumer.
- */
-public class HLCRealtimeClusterIntegrationTest extends 
BaseRealtimeClusterIntegrationTest {
-
-  @Override
-  protected boolean useLlc() {
-    return false;
-  }
-}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
index f6e75e54cd..81df76f628 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
@@ -41,6 +41,8 @@ import org.slf4j.LoggerFactory;
 /**
  * An implementation of a {@link StreamLevelConsumer} which consumes from the 
kafka stream
  */
+// Pinot no longer support high level consumer model since v0.12.*
+@Deprecated
 public class KafkaStreamLevelConsumer implements StreamLevelConsumer {
 
   private final StreamMessageDecoder _messageDecoder;
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
index 82040f6de3..78835f492c 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
 /**
  * A {@link StreamLevelConsumer} implementation for the Pulsar stream
  */
+// Pinot no longer support high level consumer model since v0.12.*
+@Deprecated
 public class PulsarStreamLevelConsumer implements StreamLevelConsumer {
   private Logger _logger;
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 0557d59219..0b246c95c1 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -137,10 +137,24 @@ public final class TableConfigUtils {
     }
     // Sanitize the table config before validation
     sanitize(tableConfig);
+
     // skip all validation if skip type ALL is selected.
     if (!skipTypes.contains(ValidationType.ALL)) {
       validateValidationConfig(tableConfig, schema);
+
+      StreamConfig streamConfig = null;
       validateIngestionConfig(tableConfig, schema, disableGroovy);
+      // Only allow realtime tables with non-null stream.type and LLC 
consumer.type
+      if (tableConfig.getTableType() == TableType.REALTIME) {
+        Map<String, String> streamConfigMap = 
IngestionConfigUtils.getStreamConfigMap(tableConfig);
+        try {
+          // Validate that StreamConfig can be created
+          streamConfig = new StreamConfig(tableConfig.getTableName(), 
streamConfigMap);
+        } catch (Exception e) {
+          throw new IllegalStateException("Could not create StreamConfig using 
the streamConfig map", e);
+        }
+        validateDecoder(streamConfig);
+      }
       validateTierConfigList(tableConfig.getTierConfigsList());
       validateIndexingConfig(tableConfig.getIndexingConfig(), schema);
       validateFieldConfigList(tableConfig.getFieldConfigList(), 
tableConfig.getIndexingConfig(), schema);
@@ -309,21 +323,13 @@ public final class TableConfigUtils {
       }
 
       // Stream
+      // stream config map can either be in ingestion config or indexing 
config. cannot be in both places
       if (ingestionConfig.getStreamIngestionConfig() != null) {
         IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
         Preconditions.checkState(indexingConfig == null || 
MapUtils.isEmpty(indexingConfig.getStreamConfigs()),
             "Should not use indexingConfig#getStreamConfigs if 
ingestionConfig#StreamIngestionConfig is provided");
         List<Map<String, String>> streamConfigMaps = 
ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps();
         Preconditions.checkState(streamConfigMaps.size() == 1, "Only 1 stream 
is supported in REALTIME table");
-
-        StreamConfig streamConfig;
-        try {
-          // Validate that StreamConfig can be created
-          streamConfig = new StreamConfig(tableNameWithType, 
streamConfigMaps.get(0));
-        } catch (Exception e) {
-          throw new IllegalStateException("Could not create StreamConfig using 
the streamConfig map", e);
-        }
-        validateDecoder(streamConfig);
       }
 
       // Filter config
@@ -630,11 +636,6 @@ public final class TableConfigUtils {
     // primary key exists
     
Preconditions.checkState(CollectionUtils.isNotEmpty(schema.getPrimaryKeyColumns()),
         "Upsert/Dedup table must have primary key columns in the schema");
-    // consumer type must be low-level
-    Map<String, String> streamConfigsMap = 
IngestionConfigUtils.getStreamConfigMap(tableConfig);
-    StreamConfig streamConfig = new StreamConfig(tableConfig.getTableName(), 
streamConfigsMap);
-    Preconditions.checkState(streamConfig.hasLowLevelConsumerType() && 
!streamConfig.hasHighLevelConsumerType(),
-        "Upsert/Dedup table must use low-level streaming consumer type");
     // replica group is configured for routing
     Preconditions.checkState(
         tableConfig.getRoutingConfig() != null && 
isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()),
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 4ec499d58b..194ef34a49 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -57,7 +57,6 @@ import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.mockito.Mockito;
@@ -133,7 +132,9 @@ public class TableConfigUtilsTest {
     schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
         .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
     tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+        new TableConfigBuilder(TableType.REALTIME)
+            .setStreamConfigs(getStreamConfigs())
+            .setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
     TableConfigUtils.validate(tableConfig, schema);
 
     // OFFLINE table
@@ -614,14 +615,6 @@ public class TableConfigUtilsTest {
     ingestionConfig.setStreamIngestionConfig(new 
StreamIngestionConfig(Collections.singletonList(streamConfigs)));
     TableConfigUtils.validateIngestionConfig(tableConfig, null);
 
-    streamConfigs.remove(StreamConfigProperties.STREAM_TYPE);
-    try {
-      TableConfigUtils.validateIngestionConfig(tableConfig, null);
-      Assert.fail("Should fail for invalid stream configs map");
-    } catch (IllegalStateException e) {
-      // expected
-    }
-
     // validate the proto decoder
     streamConfigs = getStreamConfigs();
     streamConfigs.put("stream.kafka.decoder.class.name",
@@ -729,6 +722,7 @@ public class TableConfigUtilsTest {
 
     //realtime table
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+        .setStreamConfigs(getStreamConfigs())
         .setTierConfigList(Lists.newArrayList(
             new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, 
"30d", null,
                 TierFactory.PINOT_SERVER_STORAGE_TYPE.toLowerCase(), 
"tier1_tag_OFFLINE", null, null),
@@ -1459,16 +1453,6 @@ public class TableConfigUtilsTest {
         new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
 FieldSpec.DataType.STRING)
             .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
     Map<String, String> streamConfigs = getStreamConfigs();
-    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setDedupConfig(new DedupConfig(true, 
HashFunction.NONE)).setStreamConfigs(streamConfigs).build();
-    try {
-      TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
-      Assert.fail();
-    } catch (IllegalStateException e) {
-      Assert.assertEquals(e.getMessage(), "Upsert/Dedup table must use 
low-level streaming consumer type");
-    }
-
-    streamConfigs.put("stream.kafka.consumer.type", "simple");
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
         .setDedupConfig(new DedupConfig(true, 
HashFunction.NONE)).setStreamConfigs(streamConfigs).build();
     try {
@@ -1527,25 +1511,7 @@ public class TableConfigUtilsTest {
     schema =
         new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
 FieldSpec.DataType.STRING)
             .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
-    try {
-      TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
-      Assert.fail();
-    } catch (IllegalStateException e) {
-      Assert.assertEquals(e.getMessage(),
-          "Could not find streamConfigs for REALTIME table: " + TABLE_NAME + 
"_REALTIME");
-    }
-
     Map<String, String> streamConfigs = getStreamConfigs();
-    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
-        .setStreamConfigs(streamConfigs).build();
-    try {
-      TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
-      Assert.fail();
-    } catch (IllegalStateException e) {
-      Assert.assertEquals(e.getMessage(), "Upsert/Dedup table must use 
low-level streaming consumer type");
-    }
-
-    streamConfigs.put("stream.kafka.consumer.type", "simple");
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
         .setStreamConfigs(streamConfigs).build();
     try {
@@ -1777,8 +1743,8 @@ public class TableConfigUtilsTest {
 
     // invalid Upsert config with RealtimeToOfflineTask
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
-        .setUpsertConfig(new 
UpsertConfig(UpsertConfig.Mode.FULL)).setTaskConfig(new TableTaskConfig(
-            ImmutableMap.of("RealtimeToOfflineSegmentsTask", 
realtimeToOfflineTaskConfig,
+        .setUpsertConfig(new 
UpsertConfig(UpsertConfig.Mode.FULL)).setStreamConfigs(getStreamConfigs())
+        .setTaskConfig(new 
TableTaskConfig(ImmutableMap.of("RealtimeToOfflineSegmentsTask", 
realtimeToOfflineTaskConfig,
                 "SegmentGenerationAndPushTask", 
segmentGenerationAndPushTaskConfig))).build();
     try {
       TableConfigUtils.validateTaskConfigs(tableConfig, schema);
@@ -2069,7 +2035,7 @@ public class TableConfigUtilsTest {
   private Map<String, String> getStreamConfigs() {
     Map<String, String> streamConfigs = new HashMap<>();
     streamConfigs.put("streamType", "kafka");
-    streamConfigs.put("stream.kafka.consumer.type", "highLevel");
+    streamConfigs.put("stream.kafka.consumer.type", "lowlevel");
     streamConfigs.put("stream.kafka.topic.name", "test");
     streamConfigs.put("stream.kafka.decoder.class.name",
         "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index 66c40636ec..54b0c7905b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -100,7 +100,7 @@ public class StreamConfig {
    */
   public StreamConfig(String tableNameWithType, Map<String, String> 
streamConfigMap) {
     _type = streamConfigMap.get(StreamConfigProperties.STREAM_TYPE);
-    Preconditions.checkNotNull(_type, "Stream type cannot be null");
+    Preconditions.checkNotNull(_type, StreamConfigProperties.STREAM_TYPE + " 
cannot be null");
 
     String topicNameKey =
         StreamConfigProperties.constructStreamProperty(_type, 
StreamConfigProperties.STREAM_TOPIC_NAME);
@@ -109,19 +109,8 @@ public class StreamConfig {
 
     _tableNameWithType = tableNameWithType;
 
-    String consumerTypesKey =
-        StreamConfigProperties.constructStreamProperty(_type, 
StreamConfigProperties.STREAM_CONSUMER_TYPES);
-    String consumerTypes = streamConfigMap.get(consumerTypesKey);
-    Preconditions.checkNotNull(consumerTypes, "Must specify at least one 
consumer type " + consumerTypesKey);
-    for (String consumerType : consumerTypes.split(",")) {
-      if (consumerType.equals(
-          SIMPLE_CONSUMER_TYPE_STRING)) { //For backward compatibility of 
stream configs which referred to lowlevel
-        // as simple
-        _consumerTypes.add(ConsumerType.LOWLEVEL);
-        continue;
-      }
-      _consumerTypes.add(ConsumerType.valueOf(consumerType.toUpperCase()));
-    }
+    validateConsumerType(_type, streamConfigMap);
+    _consumerTypes.add(ConsumerType.LOWLEVEL);
 
     String consumerFactoryClassKey =
         StreamConfigProperties.constructStreamProperty(_type, 
StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS);
@@ -223,6 +212,17 @@ public class StreamConfig {
     _streamConfigMap.putAll(streamConfigMap);
   }
 
+  public static void validateConsumerType(String streamType, Map<String, 
String> streamConfigMap) {
+    String consumerTypesKey =
+        StreamConfigProperties.constructStreamProperty(streamType, 
StreamConfigProperties.STREAM_CONSUMER_TYPES);
+    String consumerTypes = streamConfigMap.get(consumerTypesKey);
+    Preconditions.checkNotNull(consumerTypes, consumerTypesKey + " cannot be 
null");
+    for (String consumerType : consumerTypes.split(",")) {
+      
Preconditions.checkState(ConsumerType.LOWLEVEL.name().equalsIgnoreCase(consumerType)
+              || SIMPLE_CONSUMER_TYPE_STRING.equalsIgnoreCase(consumerType),
+          "Realtime tables with HLC consumer (consumer.type=highlevel) is no 
longer supported in Apache Pinot");
+    }
+  }
   public boolean isServerUploadToDeepStore() {
     return _serverUploadToDeepStore;
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamLevelConsumer.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamLevelConsumer.java
index 9557241262..87f1493df5 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamLevelConsumer.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamLevelConsumer.java
@@ -24,8 +24,10 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 
 
 /**
+ * DEPRECATED - since = "Pinot no longer support high level consumer model 
since v0.12.*"
  * Interface for a consumer that consumes at stream level and is unaware of 
any partitions of the stream
  */
+@Deprecated
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public interface StreamLevelConsumer {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to