This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 42897ad448 Remove support for High level consumers in Apache Pinot
(#11017)
42897ad448 is described below
commit 42897ad448735cd1f80c11f8b9fd882df04c0a5b
Author: Navina Ramesh <[email protected]>
AuthorDate: Tue Aug 1 23:33:54 2023 -0700
Remove support for High level consumers in Apache Pinot (#11017)
---
.../broker/broker/FakeStreamConsumerFactory.java | 100 +++++++++++++++++
.../broker/broker/HelixBrokerStarterTest.java | 7 +-
.../pinot/common/utils/config/TableConfigTest.java | 11 --
.../pinot/controller/BaseControllerStarter.java | 38 ++++---
.../apache/pinot/controller/ControllerConf.java | 9 +-
.../helix/core/PinotHelixResourceManager.java | 10 +-
.../api/PinotTableRestletResourceTest.java | 5 +-
.../pinot/controller/api/TableViewsTest.java | 2 +-
.../PinotHelixResourceManagerStatelessTest.java | 2 +-
.../impl/fakestream/FakeStreamConfigUtils.java | 12 --
.../impl/fakestream/FakeStreamConsumerFactory.java | 2 +-
.../impl/fakestream/FakeStreamLevelConsumer.java | 48 --------
.../core/realtime/stream/StreamConfigTest.java | 61 ++++++-----
.../apache/pinot/core/util/SchemaUtilsTest.java | 20 +++-
...yConsumerHLCRealtimeClusterIntegrationTest.java | 121 ---------------------
.../tests/HLCRealtimeClusterIntegrationTest.java | 30 -----
.../stream/kafka20/KafkaStreamLevelConsumer.java | 2 +
.../stream/pulsar/PulsarStreamLevelConsumer.java | 2 +
.../segment/local/utils/TableConfigUtils.java | 29 ++---
.../segment/local/utils/TableConfigUtilsTest.java | 48 ++------
.../org/apache/pinot/spi/stream/StreamConfig.java | 28 ++---
.../pinot/spi/stream/StreamLevelConsumer.java | 2 +
22 files changed, 232 insertions(+), 357 deletions(-)
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
new file mode 100644
index 0000000000..be2f6d8a6a
--- /dev/null
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.broker;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+
+public class FakeStreamConsumerFactory extends StreamConsumerFactory {
+ @Override
+ public PartitionLevelConsumer createPartitionLevelConsumer(String clientId,
int partition) {
+ return new FakePartitionLevelConsumer();
+ }
+
+ @Override
+ public StreamLevelConsumer createStreamLevelConsumer(String clientId, String
tableName, Set<String> fieldsToRead,
+ String groupId) {
+ return null;
+ }
+
+ @Override
+ public StreamMetadataProvider createPartitionMetadataProvider(String
clientId, int partition) {
+ return new FakesStreamMetadataProvider();
+ }
+
+ @Override
+ public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
+ return new FakesStreamMetadataProvider();
+ }
+
+ public class FakePartitionLevelConsumer implements PartitionLevelConsumer {
+
+ @Override
+ public MessageBatch fetchMessages(long startOffset, long endOffset, int
timeoutMillis)
+ throws TimeoutException {
+ return null;
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ }
+ }
+
+ public class FakesStreamMetadataProvider implements StreamMetadataProvider {
+
+ @Override
+ public List<PartitionGroupMetadata> computePartitionGroupMetadata(String
clientId, StreamConfig streamConfig,
+ List<PartitionGroupConsumptionStatus>
partitionGroupConsumptionStatuses, int timeoutMillis)
+ throws IOException, TimeoutException {
+ return Collections.singletonList(new PartitionGroupMetadata(0, new
LongMsgOffset(0)));
+ }
+
+ @Override
+ public int fetchPartitionCount(long timeoutMillis) {
+ return 1;
+ }
+
+ @Override
+ public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria
offsetCriteria, long timeoutMillis)
+ throws TimeoutException {
+ return new LongMsgOffset(0);
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ }
+ }
+}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index 4a7cee97ff..edd3bae35e 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -95,7 +95,8 @@ public class HelixBrokerStarterTest extends ControllerTest {
_helixResourceManager.addTable(offlineTableConfig);
TableConfig realtimeTimeConfig =
new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
-
.setTimeType(TimeUnit.DAYS.name()).setStreamConfigs(getStreamConfigs()).build();
+
.setTimeType(TimeUnit.DAYS.name()).setStreamConfigs(getStreamConfigs()).setNumReplicas(1)
+ .build();
_helixResourceManager.addTable(realtimeTimeConfig);
for (int i = 0; i < NUM_OFFLINE_SEGMENTS; i++) {
@@ -118,10 +119,12 @@ public class HelixBrokerStarterTest extends
ControllerTest {
private Map<String, String> getStreamConfigs() {
Map<String, String> streamConfigs = new HashMap<>();
streamConfigs.put("streamType", "kafka");
- streamConfigs.put("stream.kafka.consumer.type", "highLevel");
+ streamConfigs.put("stream.kafka.consumer.type", "lowlevel");
streamConfigs.put("stream.kafka.topic.name", "kafkaTopic");
streamConfigs.put("stream.kafka.decoder.class.name",
"org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder");
+ streamConfigs.put("stream.kafka.consumer.factory.class.name",
+ "org.apache.pinot.broker.broker.FakeStreamConsumerFactory");
return streamConfigs;
}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigTest.java
index 100cd62c59..cef1b0cc0e 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigTest.java
@@ -76,17 +76,6 @@ public class TableConfigTest {
offlineTableConfig.getValidationConfig().setReplicasPerPartition("3");
assertEquals(4, offlineTableConfig.getReplication());
- TableConfig realtimeHLCTableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName(TEST_REALTIME_HLC_TABLE_NAME)
-
.setStreamConfigs(getStreamConfigMap("highlevel")).setNumReplicas(2).build();
- assertEquals(2, realtimeHLCTableConfig.getReplication());
-
- realtimeHLCTableConfig.getValidationConfig().setReplication("4");
- assertEquals(4, realtimeHLCTableConfig.getReplication());
-
- realtimeHLCTableConfig.getValidationConfig().setReplicasPerPartition("3");
- assertEquals(4, realtimeHLCTableConfig.getReplication());
-
TableConfig realtimeLLCTableConfig =
new
TableConfigBuilder(TableType.REALTIME).setTableName(TEST_REALTIME_LLC_TABLE_NAME)
.setStreamConfigs(getStreamConfigMap("lowlevel")).setLLC(true).setNumReplicas(2).build();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index e32a419ecf..594ca27bfd 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -84,7 +84,6 @@ import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManag
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
-import
org.apache.pinot.controller.helix.core.realtime.PinotRealtimeSegmentManager;
import
org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
import org.apache.pinot.controller.helix.core.relocation.SegmentRelocator;
import org.apache.pinot.controller.helix.core.retention.RetentionManager;
@@ -101,6 +100,7 @@ import
org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import
org.apache.pinot.core.segment.processing.lifecycle.PinotSegmentLifecycleEventListenerManager;
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.util.ListenerConfigUtil;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.crypt.PinotCrypterFactory;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -108,7 +108,10 @@ import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.services.ServiceStartable;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
@@ -163,7 +166,6 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
protected TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo>
_taskManagerStatusCache;
protected PeriodicTaskScheduler _periodicTaskScheduler;
protected PinotHelixTaskResourceManager _helixTaskResourceManager;
- protected PinotRealtimeSegmentManager _realtimeSegmentsManager;
protected PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
protected SegmentCompletionManager _segmentCompletionManager;
protected LeadControllerManager _leadControllerManager;
@@ -430,15 +432,6 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
new SegmentCompletionManager(_helixParticipantManager,
_pinotLLCRealtimeSegmentManager, _controllerMetrics,
_leadControllerManager, _config.getSegmentCommitTimeoutSeconds());
- if (_config.getHLCTablesAllowed()) {
- LOGGER.info("Realtime tables with High Level consumers will be
supported");
- _realtimeSegmentsManager =
- new PinotRealtimeSegmentManager(_helixResourceManager,
_leadControllerManager, _config);
- _realtimeSegmentsManager.start(_controllerMetrics);
- } else {
- LOGGER.info("Realtime tables with High Level consumers will NOT be
supported");
- _realtimeSegmentsManager = null;
- }
_sqlQueryExecutor = new SqlQueryExecutor(_config.generateVipUrl());
_connectionManager = new MultiThreadedHttpConnectionManager();
@@ -502,6 +495,24 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
LOGGER.info("Starting controller admin application on: {}",
ListenerConfigUtil.toString(_listenerConfigs));
_adminApp.start(_listenerConfigs);
+ List<String> existingHlcTables = new ArrayList<>();
+ _helixResourceManager.getAllRealtimeTables().forEach(rt -> {
+ TableConfig tableConfig = _helixResourceManager.getTableConfig(rt);
+ if (tableConfig != null) {
+ Map<String, String> streamConfigMap =
IngestionConfigUtils.getStreamConfigMap(tableConfig);
+ try {
+ StreamConfig.validateConsumerType(
+ streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE,
"kafka"), streamConfigMap);
+ } catch (Exception e) {
+ existingHlcTables.add(rt);
+ }
+ }
+ });
+ if (existingHlcTables.size() > 0) {
+ LOGGER.error("High Level Consumer (HLC) based realtime tables are no
longer supported. Please delete the "
+ + "following HLC tables before proceeding: {}\n", existingHlcTables);
+ throw new RuntimeException("Unable to start controller due to existing
HLC tables!");
+ }
_controllerMetrics.addCallbackGauge("dataDir.exists", () -> new
File(_config.getDataDir()).exists() ? 1L : 0L);
_controllerMetrics.addCallbackGauge("dataDir.fileOpLatencyMs", () -> {
@@ -758,11 +769,6 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
LOGGER.info("Stopping Jersey admin API");
_adminApp.stop();
- if (_realtimeSegmentsManager != null) {
- LOGGER.info("Stopping realtime segment manager");
- _realtimeSegmentsManager.stop();
- }
-
LOGGER.info("Stopping resource manager");
_helixResourceManager.stop();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index e519f2b8f8..4c677a57fa 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -298,7 +298,8 @@ public class ControllerConf extends PinotConfiguration {
private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS =
64;
private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true;
private static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = false;
- private static final boolean DEFAULT_ALLOW_HLC_TABLES = true;
+ // Disallow any high level consumer (HLC) table
+ private static final boolean DEFAULT_ALLOW_HLC_TABLES = false;
private static final String DEFAULT_CONTROLLER_MODE =
ControllerMode.DUAL.name();
private static final String
DEFAULT_LEAD_CONTROLLER_RESOURCE_REBALANCE_STRATEGY =
AutoRebalanceStrategy.class.getName();
@@ -993,11 +994,7 @@ public class ControllerConf extends PinotConfiguration {
}
public boolean getHLCTablesAllowed() {
- return getProperty(ALLOW_HLC_TABLES, DEFAULT_ALLOW_HLC_TABLES);
- }
-
- public void setHLCTablesAllowed(boolean allowHLCTables) {
- setProperty(ALLOW_HLC_TABLES, allowHLCTables);
+ return DEFAULT_ALLOW_HLC_TABLES;
}
public String getMetricsPrefix() {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 346a19a1d6..ad784b2672 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -220,7 +220,6 @@ public class PinotHelixResourceManager {
private final String _dataDir;
private final boolean _isSingleTenantCluster;
private final boolean _enableBatchMessageMode;
- private final boolean _allowHLCTables;
private final int _deletedSegmentsRetentionInDays;
private final boolean _enableTieredSegmentAssignment;
@@ -235,7 +234,7 @@ public class PinotHelixResourceManager {
private final LineageManager _lineageManager;
public PinotHelixResourceManager(String zkURL, String helixClusterName,
@Nullable String dataDir,
- boolean isSingleTenantCluster, boolean enableBatchMessageMode, boolean
allowHLCTables,
+ boolean isSingleTenantCluster, boolean enableBatchMessageMode,
int deletedSegmentsRetentionInDays, boolean
enableTieredSegmentAssignment, LineageManager lineageManager) {
_helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
_helixClusterName = helixClusterName;
@@ -243,7 +242,6 @@ public class PinotHelixResourceManager {
_isSingleTenantCluster = isSingleTenantCluster;
_enableBatchMessageMode = enableBatchMessageMode;
_deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
- _allowHLCTables = allowHLCTables;
_enableTieredSegmentAssignment = enableTieredSegmentAssignment;
_instanceAdminEndpointCache =
CacheBuilder.newBuilder().expireAfterWrite(CACHE_ENTRY_EXPIRE_TIME_HOURS,
TimeUnit.HOURS)
@@ -265,8 +263,8 @@ public class PinotHelixResourceManager {
public PinotHelixResourceManager(ControllerConf controllerConf) {
this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
controllerConf.getDataDir(),
controllerConf.tenantIsolationEnabled(),
controllerConf.getEnableBatchMessageMode(),
- controllerConf.getHLCTablesAllowed(),
controllerConf.getDeletedSegmentsRetentionInDays(),
- controllerConf.tieredSegmentAssignmentEnabled(),
LineageManagerFactory.create(controllerConf));
+ controllerConf.getDeletedSegmentsRetentionInDays(),
controllerConf.tieredSegmentAssignmentEnabled(),
+ LineageManagerFactory.create(controllerConf));
}
/**
@@ -1753,7 +1751,7 @@ public class PinotHelixResourceManager {
// Check if HLC table is allowed.
StreamConfig streamConfig =
new StreamConfig(tableNameWithType,
IngestionConfigUtils.getStreamConfigMap(tableConfig));
- if (streamConfig.hasHighLevelConsumerType() && !_allowHLCTables) {
+ if (streamConfig.hasHighLevelConsumerType()) {
throw new InvalidTableConfigException(
"Creating HLC realtime table is not allowed for Table: " +
tableNameWithType);
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
index 8339e3025f..f0d17de26b 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
@@ -58,7 +58,8 @@ public class PinotTableRestletResourceTest extends
ControllerTest {
private static final String OFFLINE_TABLE_NAME = "testOfflineTable";
private static final String REALTIME_TABLE_NAME = "testRealtimeTable";
private final TableConfigBuilder _offlineBuilder = new
TableConfigBuilder(TableType.OFFLINE);
- private final TableConfigBuilder _realtimeBuilder = new
TableConfigBuilder(TableType.REALTIME);
+ private final TableConfigBuilder _realtimeBuilder = new
TableConfigBuilder(TableType.REALTIME)
+ .setStreamConfigs(Map.of("stream.type", "foo", "consumer.type",
"lowlevel"));
private String _createTableUrl;
@BeforeClass
@@ -72,7 +73,7 @@ public class PinotTableRestletResourceTest extends
ControllerTest {
// add schema for realtime table
DEFAULT_INSTANCE.addDummySchema(REALTIME_TABLE_NAME);
- StreamConfig streamConfig =
FakeStreamConfigUtils.getDefaultHighLevelStreamConfigs();
+ StreamConfig streamConfig =
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs();
_realtimeBuilder.setTableName(REALTIME_TABLE_NAME).setTimeColumnName("timeColumn").setTimeType("DAYS")
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("5").setSchemaName(REALTIME_TABLE_NAME)
.setStreamConfigs(streamConfig.getStreamConfigsMap());
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
index 5e88e7f7c3..279821537a 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
@@ -68,7 +68,7 @@ public class TableViewsTest extends ControllerTest {
// add schema for realtime table
DEFAULT_INSTANCE.addDummySchema(HYBRID_TABLE_NAME);
- StreamConfig streamConfig =
FakeStreamConfigUtils.getDefaultHighLevelStreamConfigs();
+ StreamConfig streamConfig =
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(4);
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(HYBRID_TABLE_NAME)
.setNumReplicas(DEFAULT_MIN_NUM_REPLICAS).setStreamConfigs(streamConfig.getStreamConfigsMap()).build();
DEFAULT_INSTANCE.getHelixResourceManager().addTable(tableConfig);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index 5a96a4eea9..818a42e534 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -367,7 +367,7 @@ public class PinotHelixResourceManagerStatelessTest extends
ControllerTest {
tableConfig =
new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME)
.setServerTenant(SERVER_TENANT_NAME)
-
.setStreamConfigs(FakeStreamConfigUtils.getDefaultHighLevelStreamConfigs().getStreamConfigsMap()).build();
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
waitForEVToDisappear(tableConfig.getTableName());
_helixResourceManager.addTable(tableConfig);
waitForTableOnlineInBrokerResourceEV(REALTIME_TABLE_NAME);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
index 75de0fbee9..8d06b86a59 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
@@ -156,18 +156,6 @@ public class FakeStreamConfigUtils {
return getDefaultLowLevelStreamConfigs(DEFAULT_NUM_PARTITIONS);
}
- /**
- * Generate fake stream configs for high level stream
- */
- public static StreamConfig getDefaultHighLevelStreamConfigs() {
- Map<String, String> streamConfigMap = getDefaultStreamConfigs();
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
StreamConfigProperties.STREAM_CONSUMER_TYPES),
- StreamConfig.ConsumerType.HIGHLEVEL.toString());
-
- return new StreamConfig(TABLE_NAME_WITH_TYPE, streamConfigMap);
- }
-
private static Map<String, String> getDefaultStreamConfigs() {
Map<String, String> streamConfigMap = new HashMap<>();
streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, STREAM_TYPE);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index 1923a4d822..fbcc68f56c 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -53,7 +53,7 @@ public class FakeStreamConsumerFactory extends
StreamConsumerFactory {
@Override
public StreamLevelConsumer createStreamLevelConsumer(String clientId, String
tableName, Set<String> fieldsToRead,
String groupId) {
- return new FakeStreamLevelConsumer();
+ throw new UnsupportedOperationException("Pinot no longer support stream
level consumers!");
}
@Override
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamLevelConsumer.java
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamLevelConsumer.java
deleted file mode 100644
index 3331c24074..0000000000
---
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamLevelConsumer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.realtime.impl.fakestream;
-
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
-
-
-/**
- * Test implementation of {@link StreamLevelConsumer}
- * This is currently a no-op
- */
-public class FakeStreamLevelConsumer implements StreamLevelConsumer {
- @Override
- public void start()
- throws Exception {
- }
-
- @Override
- public GenericRow next(GenericRow destination) {
- return destination;
- }
-
- @Override
- public void commit() {
- }
-
- @Override
- public void shutdown()
- throws Exception {
- }
-}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
index 2b164ac654..253d609d8f 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
@@ -196,7 +196,7 @@ public class StreamConfigTest {
Assert.assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(),
StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES);
- consumerType = "lowLevel,highLevel";
+ consumerType = "lowLevel";
String offsetCriteria = "smallest";
String decoderProp1Key = "prop1";
String decoderProp1Value = "decoderValueString";
@@ -227,7 +227,6 @@ public class StreamConfigTest {
Assert.assertEquals(streamConfig.getType(), streamType);
Assert.assertEquals(streamConfig.getTopicName(), topic);
Assert.assertEquals(streamConfig.getConsumerTypes().get(0),
StreamConfig.ConsumerType.LOWLEVEL);
- Assert.assertEquals(streamConfig.getConsumerTypes().get(1),
StreamConfig.ConsumerType.HIGHLEVEL);
Assert.assertEquals(streamConfig.getConsumerFactoryClassName(),
consumerFactoryClass);
Assert.assertEquals(streamConfig.getDecoderClass(), decoderClass);
Assert.assertEquals(streamConfig.getDecoderProperties().size(), 1);
@@ -301,7 +300,7 @@ public class StreamConfigTest {
exception = false;
try {
streamConfig = new StreamConfig(tableName, streamConfigMap);
- } catch (IllegalArgumentException e) {
+ } catch (Exception e) {
exception = true;
}
Assert.assertTrue(exception);
@@ -486,33 +485,37 @@ public class StreamConfigTest {
Assert.assertTrue(streamConfig.hasLowLevelConsumerType());
Assert.assertFalse(streamConfig.hasHighLevelConsumerType());
- consumerType = "highLevel";
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType,
StreamConfigProperties.STREAM_CONSUMER_TYPES),
- consumerType);
- streamConfig = new StreamConfig(tableName, streamConfigMap);
- Assert.assertEquals(streamConfig.getConsumerTypes().get(0),
StreamConfig.ConsumerType.HIGHLEVEL);
- Assert.assertFalse(streamConfig.hasLowLevelConsumerType());
- Assert.assertTrue(streamConfig.hasHighLevelConsumerType());
- consumerType = "highLevel,simple";
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType,
StreamConfigProperties.STREAM_CONSUMER_TYPES),
- consumerType);
- streamConfig = new StreamConfig(tableName, streamConfigMap);
- Assert.assertEquals(streamConfig.getConsumerTypes().get(0),
StreamConfig.ConsumerType.HIGHLEVEL);
- Assert.assertEquals(streamConfig.getConsumerTypes().get(1),
StreamConfig.ConsumerType.LOWLEVEL);
- Assert.assertTrue(streamConfig.hasLowLevelConsumerType());
- Assert.assertTrue(streamConfig.hasHighLevelConsumerType());
+ try {
+ consumerType = "highLevel";
+ streamConfigMap
+ .put(StreamConfigProperties.constructStreamProperty(streamType,
StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ consumerType);
+ new StreamConfig(tableName, streamConfigMap);
+ Assert.fail("Invalid consumer type(s) " + consumerType + " in stream
config");
+ } catch (Exception e) {
+ // expected
+ }
- consumerType = "highLevel,lowlevel";
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType,
StreamConfigProperties.STREAM_CONSUMER_TYPES),
- consumerType);
- streamConfig = new StreamConfig(tableName, streamConfigMap);
- Assert.assertEquals(streamConfig.getConsumerTypes().get(0),
StreamConfig.ConsumerType.HIGHLEVEL);
- Assert.assertEquals(streamConfig.getConsumerTypes().get(1),
StreamConfig.ConsumerType.LOWLEVEL);
- Assert.assertTrue(streamConfig.hasLowLevelConsumerType());
- Assert.assertTrue(streamConfig.hasHighLevelConsumerType());
+ try {
+ consumerType = "highLevel,simple";
+ streamConfigMap
+ .put(StreamConfigProperties.constructStreamProperty(streamType,
StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ consumerType);
+ new StreamConfig(tableName, streamConfigMap);
+ Assert.fail("Invalid consumer type(s) " + consumerType + " in stream
config");
+ } catch (Exception e) {
+ // expected
+ }
+
+ try {
+ consumerType = "highLevel,lowlevel";
+ streamConfigMap
+ .put(StreamConfigProperties.constructStreamProperty(streamType,
StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ consumerType);
+ new StreamConfig(tableName, streamConfigMap);
+ } catch (Exception e) {
+ // expected
+ }
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
index 57f91aa954..fe38fe910a 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
@@ -22,7 +22,9 @@ import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.segment.local.utils.SchemaUtils;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -136,13 +138,17 @@ public class SchemaUtilsTest {
schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
.addDateTime(TIME_COLUMN, DataType.LONG, "1:MILLISECONDS:EPOCH",
"1:HOURS").build();
tableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+ new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setStreamConfigs(getStreamConfigs())
+ .setTimeColumnName(TIME_COLUMN).build();
SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
// schema doesn't have destination columns from transformConfigs
schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
.addDateTime(TIME_COLUMN, DataType.LONG, "1:MILLISECONDS:EPOCH",
"1:HOURS").build();
- tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setStreamConfigs(getStreamConfigs())
+ .setTimeColumnName(TIME_COLUMN)
.setIngestionConfig(ingestionConfig).build();
try {
SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
@@ -157,6 +163,16 @@ public class SchemaUtilsTest {
SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
}
+ private Map<String, String> getStreamConfigs() {
+ Map<String, String> streamConfigs = new HashMap<>();
+ streamConfigs.put("streamType", "kafka");
+ streamConfigs.put("stream.kafka.consumer.type", "lowlevel");
+ streamConfigs.put("stream.kafka.topic.name", "test");
+ streamConfigs.put("stream.kafka.decoder.class.name",
+ "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
+ return streamConfigs;
+ }
+
/**
* TODO: transform functions have moved to tableConfig#ingestionConfig.
However, these tests remain to test
* backward compatibility/
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerHLCRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerHLCRealtimeClusterIntegrationTest.java
deleted file mode 100644
index e8318ab189..0000000000
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerHLCRealtimeClusterIntegrationTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import java.lang.reflect.Constructor;
-import java.util.Random;
-import java.util.Set;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.stream.PartitionLevelConsumer;
-import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.StreamConsumerFactory;
-import org.apache.pinot.spi.stream.StreamLevelConsumer;
-import org.apache.pinot.spi.stream.StreamMetadataProvider;
-import org.apache.pinot.tools.utils.KafkaStarterUtils;
-
-
-/**
- * Integration test that simulates a flaky Kafka consumer.
- */
-public class FlakyConsumerHLCRealtimeClusterIntegrationTest extends
HLCRealtimeClusterIntegrationTest {
-
- @Override
- protected String getStreamConsumerFactoryClassName() {
- return FlakyStreamFactory.class.getName();
- }
-
- public static class FlakyStreamLevelConsumer implements StreamLevelConsumer {
- private StreamLevelConsumer _streamLevelConsumer;
- private Random _random = new Random();
-
- public FlakyStreamLevelConsumer(String clientId, String tableName,
StreamConfig streamConfig,
- Set<String> fieldsToRead, String groupId) {
- try {
- final Constructor constructor =
Class.forName(KafkaStarterUtils.KAFKA_STREAM_LEVEL_CONSUMER_CLASS_NAME)
- .getConstructor(String.class, String.class, StreamConfig.class,
Set.class, String.class);
- _streamLevelConsumer =
- (StreamLevelConsumer) constructor.newInstance(clientId, tableName,
streamConfig, fieldsToRead, groupId);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void start()
- throws Exception {
- _streamLevelConsumer.start();
- }
-
- @Override
- public GenericRow next(GenericRow destination) {
- // Return a null row every ~1/1000 rows and an exception every ~1/1000
rows
- int randomValue = _random.nextInt(1000);
-
- if (randomValue == 0) {
- return null;
- } else if (randomValue == 1) {
- throw new RuntimeException("Flaky stream level consumer exception");
- } else {
- return _streamLevelConsumer.next(destination);
- }
- }
-
- @Override
- public void commit() {
- // Fail to commit 50% of the time
- boolean failToCommit = _random.nextBoolean();
-
- if (failToCommit) {
- throw new RuntimeException("Flaky stream level consumer exception");
- } else {
- _streamLevelConsumer.commit();
- }
- }
-
- @Override
- public void shutdown()
- throws Exception {
- _streamLevelConsumer.shutdown();
- }
- }
-
- public static class FlakyStreamFactory extends StreamConsumerFactory {
-
- @Override
- public PartitionLevelConsumer createPartitionLevelConsumer(String
clientId, int partition) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public StreamLevelConsumer createStreamLevelConsumer(String clientId,
String tableName, Set<String> fieldsToRead,
- String groupId) {
- return new FlakyStreamLevelConsumer(clientId, tableName, _streamConfig,
fieldsToRead, groupId);
- }
-
- @Override
- public StreamMetadataProvider createPartitionMetadataProvider(String
clientId, int partition) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public StreamMetadataProvider createStreamMetadataProvider(String
clientId) {
- throw new UnsupportedOperationException();
- }
- }
-}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HLCRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HLCRealtimeClusterIntegrationTest.java
deleted file mode 100644
index 0f57443677..0000000000
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HLCRealtimeClusterIntegrationTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-/**
- * Integration test for high-level Kafka consumer.
- */
-public class HLCRealtimeClusterIntegrationTest extends
BaseRealtimeClusterIntegrationTest {
-
- @Override
- protected boolean useLlc() {
- return false;
- }
-}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
index f6e75e54cd..81df76f628 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
@@ -41,6 +41,8 @@ import org.slf4j.LoggerFactory;
/**
* An implementation of a {@link StreamLevelConsumer} which consumes from the
kafka stream
*/
+// Pinot no longer support high level consumer model since v0.12.*
+@Deprecated
public class KafkaStreamLevelConsumer implements StreamLevelConsumer {
private final StreamMessageDecoder _messageDecoder;
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
index 82040f6de3..78835f492c 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
/**
* A {@link StreamLevelConsumer} implementation for the Pulsar stream
*/
+// Pinot no longer support high level consumer model since v0.12.*
+@Deprecated
public class PulsarStreamLevelConsumer implements StreamLevelConsumer {
private Logger _logger;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 0557d59219..0b246c95c1 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -137,10 +137,24 @@ public final class TableConfigUtils {
}
// Sanitize the table config before validation
sanitize(tableConfig);
+
// skip all validation if skip type ALL is selected.
if (!skipTypes.contains(ValidationType.ALL)) {
validateValidationConfig(tableConfig, schema);
+
+ StreamConfig streamConfig = null;
validateIngestionConfig(tableConfig, schema, disableGroovy);
+ // Only allow realtime tables with non-null stream.type and LLC
consumer.type
+ if (tableConfig.getTableType() == TableType.REALTIME) {
+ Map<String, String> streamConfigMap =
IngestionConfigUtils.getStreamConfigMap(tableConfig);
+ try {
+ // Validate that StreamConfig can be created
+ streamConfig = new StreamConfig(tableConfig.getTableName(),
streamConfigMap);
+ } catch (Exception e) {
+ throw new IllegalStateException("Could not create StreamConfig using
the streamConfig map", e);
+ }
+ validateDecoder(streamConfig);
+ }
validateTierConfigList(tableConfig.getTierConfigsList());
validateIndexingConfig(tableConfig.getIndexingConfig(), schema);
validateFieldConfigList(tableConfig.getFieldConfigList(),
tableConfig.getIndexingConfig(), schema);
@@ -309,21 +323,13 @@ public final class TableConfigUtils {
}
// Stream
+ // stream config map can either be in ingestion config or indexing
config. cannot be in both places
if (ingestionConfig.getStreamIngestionConfig() != null) {
IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
Preconditions.checkState(indexingConfig == null ||
MapUtils.isEmpty(indexingConfig.getStreamConfigs()),
"Should not use indexingConfig#getStreamConfigs if
ingestionConfig#StreamIngestionConfig is provided");
List<Map<String, String>> streamConfigMaps =
ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps();
Preconditions.checkState(streamConfigMaps.size() == 1, "Only 1 stream
is supported in REALTIME table");
-
- StreamConfig streamConfig;
- try {
- // Validate that StreamConfig can be created
- streamConfig = new StreamConfig(tableNameWithType,
streamConfigMaps.get(0));
- } catch (Exception e) {
- throw new IllegalStateException("Could not create StreamConfig using
the streamConfig map", e);
- }
- validateDecoder(streamConfig);
}
// Filter config
@@ -630,11 +636,6 @@ public final class TableConfigUtils {
// primary key exists
Preconditions.checkState(CollectionUtils.isNotEmpty(schema.getPrimaryKeyColumns()),
"Upsert/Dedup table must have primary key columns in the schema");
- // consumer type must be low-level
- Map<String, String> streamConfigsMap =
IngestionConfigUtils.getStreamConfigMap(tableConfig);
- StreamConfig streamConfig = new StreamConfig(tableConfig.getTableName(),
streamConfigsMap);
- Preconditions.checkState(streamConfig.hasLowLevelConsumerType() &&
!streamConfig.hasHighLevelConsumerType(),
- "Upsert/Dedup table must use low-level streaming consumer type");
// replica group is configured for routing
Preconditions.checkState(
tableConfig.getRoutingConfig() != null &&
isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()),
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 4ec499d58b..194ef34a49 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -57,7 +57,6 @@ import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.mockito.Mockito;
@@ -133,7 +132,9 @@ public class TableConfigUtilsTest {
schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
.addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
tableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+ new TableConfigBuilder(TableType.REALTIME)
+ .setStreamConfigs(getStreamConfigs())
+ .setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
TableConfigUtils.validate(tableConfig, schema);
// OFFLINE table
@@ -614,14 +615,6 @@ public class TableConfigUtilsTest {
ingestionConfig.setStreamIngestionConfig(new
StreamIngestionConfig(Collections.singletonList(streamConfigs)));
TableConfigUtils.validateIngestionConfig(tableConfig, null);
- streamConfigs.remove(StreamConfigProperties.STREAM_TYPE);
- try {
- TableConfigUtils.validateIngestionConfig(tableConfig, null);
- Assert.fail("Should fail for invalid stream configs map");
- } catch (IllegalStateException e) {
- // expected
- }
-
// validate the proto decoder
streamConfigs = getStreamConfigs();
streamConfigs.put("stream.kafka.decoder.class.name",
@@ -729,6 +722,7 @@ public class TableConfigUtilsTest {
//realtime table
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+ .setStreamConfigs(getStreamConfigs())
.setTierConfigList(Lists.newArrayList(
new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE,
"30d", null,
TierFactory.PINOT_SERVER_STORAGE_TYPE.toLowerCase(),
"tier1_tag_OFFLINE", null, null),
@@ -1459,16 +1453,6 @@ public class TableConfigUtilsTest {
new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
FieldSpec.DataType.STRING)
.setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
Map<String, String> streamConfigs = getStreamConfigs();
- tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setDedupConfig(new DedupConfig(true,
HashFunction.NONE)).setStreamConfigs(streamConfigs).build();
- try {
- TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
- Assert.fail();
- } catch (IllegalStateException e) {
- Assert.assertEquals(e.getMessage(), "Upsert/Dedup table must use
low-level streaming consumer type");
- }
-
- streamConfigs.put("stream.kafka.consumer.type", "simple");
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
.setDedupConfig(new DedupConfig(true,
HashFunction.NONE)).setStreamConfigs(streamConfigs).build();
try {
@@ -1527,25 +1511,7 @@ public class TableConfigUtilsTest {
schema =
new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
FieldSpec.DataType.STRING)
.setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
- try {
- TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
- Assert.fail();
- } catch (IllegalStateException e) {
- Assert.assertEquals(e.getMessage(),
- "Could not find streamConfigs for REALTIME table: " + TABLE_NAME +
"_REALTIME");
- }
-
Map<String, String> streamConfigs = getStreamConfigs();
- tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
- .setStreamConfigs(streamConfigs).build();
- try {
- TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
- Assert.fail();
- } catch (IllegalStateException e) {
- Assert.assertEquals(e.getMessage(), "Upsert/Dedup table must use
low-level streaming consumer type");
- }
-
- streamConfigs.put("stream.kafka.consumer.type", "simple");
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
.setStreamConfigs(streamConfigs).build();
try {
@@ -1777,8 +1743,8 @@ public class TableConfigUtilsTest {
// invalid Upsert config with RealtimeToOfflineTask
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
- .setUpsertConfig(new
UpsertConfig(UpsertConfig.Mode.FULL)).setTaskConfig(new TableTaskConfig(
- ImmutableMap.of("RealtimeToOfflineSegmentsTask",
realtimeToOfflineTaskConfig,
+ .setUpsertConfig(new
UpsertConfig(UpsertConfig.Mode.FULL)).setStreamConfigs(getStreamConfigs())
+ .setTaskConfig(new
TableTaskConfig(ImmutableMap.of("RealtimeToOfflineSegmentsTask",
realtimeToOfflineTaskConfig,
"SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
try {
TableConfigUtils.validateTaskConfigs(tableConfig, schema);
@@ -2069,7 +2035,7 @@ public class TableConfigUtilsTest {
private Map<String, String> getStreamConfigs() {
Map<String, String> streamConfigs = new HashMap<>();
streamConfigs.put("streamType", "kafka");
- streamConfigs.put("stream.kafka.consumer.type", "highLevel");
+ streamConfigs.put("stream.kafka.consumer.type", "lowlevel");
streamConfigs.put("stream.kafka.topic.name", "test");
streamConfigs.put("stream.kafka.decoder.class.name",
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index 66c40636ec..54b0c7905b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -100,7 +100,7 @@ public class StreamConfig {
*/
public StreamConfig(String tableNameWithType, Map<String, String>
streamConfigMap) {
_type = streamConfigMap.get(StreamConfigProperties.STREAM_TYPE);
- Preconditions.checkNotNull(_type, "Stream type cannot be null");
+ Preconditions.checkNotNull(_type, StreamConfigProperties.STREAM_TYPE + "
cannot be null");
String topicNameKey =
StreamConfigProperties.constructStreamProperty(_type,
StreamConfigProperties.STREAM_TOPIC_NAME);
@@ -109,19 +109,8 @@ public class StreamConfig {
_tableNameWithType = tableNameWithType;
- String consumerTypesKey =
- StreamConfigProperties.constructStreamProperty(_type,
StreamConfigProperties.STREAM_CONSUMER_TYPES);
- String consumerTypes = streamConfigMap.get(consumerTypesKey);
- Preconditions.checkNotNull(consumerTypes, "Must specify at least one
consumer type " + consumerTypesKey);
- for (String consumerType : consumerTypes.split(",")) {
- if (consumerType.equals(
- SIMPLE_CONSUMER_TYPE_STRING)) { //For backward compatibility of
stream configs which referred to lowlevel
- // as simple
- _consumerTypes.add(ConsumerType.LOWLEVEL);
- continue;
- }
- _consumerTypes.add(ConsumerType.valueOf(consumerType.toUpperCase()));
- }
+ validateConsumerType(_type, streamConfigMap);
+ _consumerTypes.add(ConsumerType.LOWLEVEL);
String consumerFactoryClassKey =
StreamConfigProperties.constructStreamProperty(_type,
StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS);
@@ -223,6 +212,17 @@ public class StreamConfig {
_streamConfigMap.putAll(streamConfigMap);
}
+ public static void validateConsumerType(String streamType, Map<String,
String> streamConfigMap) {
+ String consumerTypesKey =
+ StreamConfigProperties.constructStreamProperty(streamType,
StreamConfigProperties.STREAM_CONSUMER_TYPES);
+ String consumerTypes = streamConfigMap.get(consumerTypesKey);
+ Preconditions.checkNotNull(consumerTypes, consumerTypesKey + " cannot be
null");
+ for (String consumerType : consumerTypes.split(",")) {
+
Preconditions.checkState(ConsumerType.LOWLEVEL.name().equalsIgnoreCase(consumerType)
+ || SIMPLE_CONSUMER_TYPE_STRING.equalsIgnoreCase(consumerType),
+ "Realtime tables with HLC consumer (consumer.type=highlevel) is no
longer supported in Apache Pinot");
+ }
+ }
public boolean isServerUploadToDeepStore() {
return _serverUploadToDeepStore;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamLevelConsumer.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamLevelConsumer.java
index 9557241262..87f1493df5 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamLevelConsumer.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamLevelConsumer.java
@@ -24,8 +24,10 @@ import org.apache.pinot.spi.data.readers.GenericRow;
/**
+ * DEPRECATED - since = "Pinot no longer support high level consumer model
since v0.12.*"
* Interface for a consumer that consumes at stream level and is unaware of
any partitions of the stream
*/
+@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface StreamLevelConsumer {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]