Jackie-Jiang commented on code in PR #13790:
URL: https://github.com/apache/pinot/pull/13790#discussion_r1849147395
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java:
##########
@@ -75,6 +84,103 @@ public static Map<String, String>
getStreamConfigMap(TableConfig tableConfig) {
return streamConfigMap;
}
+ /**
+ * Fetches the streamConfig from the given realtime table.
+ * First, the ingestionConfigs->stream->streamConfigs will be checked.
+ * If not found, the indexingConfig->streamConfigs will be checked (which is
deprecated).
+ * @param tableConfig realtime table config
+ * @return streamConfigs List of maps
+ */
+ public static List<Map<String, String>> getStreamConfigMaps(TableConfig
tableConfig) {
Review Comment:
Add deprecated annotation to the old `getStreamConfigMap()` if we are not
removing it
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java:
##########
@@ -75,6 +84,103 @@ public static Map<String, String>
getStreamConfigMap(TableConfig tableConfig) {
return streamConfigMap;
}
+ /**
+ * Fetches the streamConfig from the given realtime table.
+ * First, the ingestionConfigs->stream->streamConfigs will be checked.
+ * If not found, the indexingConfig->streamConfigs will be checked (which is
deprecated).
+ * @param tableConfig realtime table config
+ * @return streamConfigs List of maps
+ */
+ public static List<Map<String, String>> getStreamConfigMaps(TableConfig
tableConfig) {
+ String tableNameWithType = tableConfig.getTableName();
+ Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
+ "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType);
+ if (tableConfig.getIngestionConfig() != null
+ && tableConfig.getIngestionConfig().getStreamIngestionConfig() !=
null) {
+ List<Map<String, String>> streamConfigMaps =
+
tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps();
+ Preconditions.checkState(streamConfigMaps.size() > 0, "Table must have
at least 1 stream");
+ // For now, with multiple topics, we only support same type of stream
(e.g. Kafka)
Review Comment:
What is the reason for this limitation? Some comments explaining this would
be good.
Only apply this check when there are multiple streams to match the current
behavior
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java:
##########
@@ -75,6 +84,103 @@ public static Map<String, String>
getStreamConfigMap(TableConfig tableConfig) {
return streamConfigMap;
}
+ /**
+ * Fetches the streamConfig from the given realtime table.
+ * First, the ingestionConfigs->stream->streamConfigs will be checked.
+ * If not found, the indexingConfig->streamConfigs will be checked (which is
deprecated).
+ * @param tableConfig realtime table config
+ * @return streamConfigs List of maps
+ */
+ public static List<Map<String, String>> getStreamConfigMaps(TableConfig
tableConfig) {
+ String tableNameWithType = tableConfig.getTableName();
+ Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
+ "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType);
+ if (tableConfig.getIngestionConfig() != null
+ && tableConfig.getIngestionConfig().getStreamIngestionConfig() !=
null) {
+ List<Map<String, String>> streamConfigMaps =
+
tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps();
+ Preconditions.checkState(streamConfigMaps.size() > 0, "Table must have
at least 1 stream");
Review Comment:
(nit)
```suggestion
Preconditions.checkState(!streamConfigMaps.isEmpty(), "Table must have
at least 1 stream");
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java:
##########
@@ -99,4 +99,28 @@ public static List<PartitionGroupMetadata>
getPartitionGroupMetadataList(StreamC
throw new RuntimeException(fetcherException);
}
}
+
+ /**
+ * Fetches the list of {@link PartitionGroupMetadata} for the new partition
groups for the stream,
+ * with the help of the {@link PartitionGroupConsumptionStatus} of the
current partitionGroups.
+ * In particular, this method is used to fetch from multiple stream topics.
+ * @param streamConfigs
+ * @param partitionGroupConsumptionStatusList
+ * @return
+ */
+ public static List<PartitionGroupMetadata>
getPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
Review Comment:
Deprecate the old method or remove it. Please also clean up all usages of
the old one
##########
pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java:
##########
@@ -569,10 +569,13 @@ protected void configure() {
_helixResourceManager.getAllRealtimeTables().forEach(rt -> {
TableConfig tableConfig = _helixResourceManager.getTableConfig(rt);
if (tableConfig != null) {
- Map<String, String> streamConfigMap =
IngestionConfigUtils.getStreamConfigMap(tableConfig);
+ List<Map<String, String>> streamConfigMaps =
IngestionConfigUtils.getStreamConfigMaps(tableConfig);
try {
-
StreamConfig.validateConsumerType(streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE,
"kafka"),
- streamConfigMap);
+ for (Map<String, String> streamConfigMap : streamConfigMaps) {
+ StreamConfig.validateConsumerType(
+
streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"),
Review Comment:
(format) Not comply to [Pinot
Style](https://docs.pinot.apache.org/developers/developers-and-contributors/code-setup#set-up-ide)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]