itschrispeck commented on code in PR #13790:
URL: https://github.com/apache/pinot/pull/13790#discussion_r1849154551
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1294,7 +1315,7 @@ IdealState ensureAllPartitionsConsuming(TableConfig
tableConfig, StreamConfig st
selectStartOffset(offsetCriteria, partitionId,
partitionIdToStartOffset,
partitionIdToSmallestOffset, tableConfig.getTableName(),
offsetFactory,
latestSegmentZKMetadata.getEndOffset());
- createNewConsumingSegment(tableConfig, streamConfig,
latestSegmentZKMetadata, currentTimeMs,
+ createNewConsumingSegment(tableConfig, streamConfigs.get(0),
latestSegmentZKMetadata, currentTimeMs,
Review Comment:
Can we use the partitionId to choose the correct streamConfig?
Or we'd need to document that segment flush settings are only used from the
first streamConfig in the table config's list (though I feel different flush
settings per stream will eventually be a future requirement)
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -929,15 +949,16 @@ public void ensureAllPartitionsConsuming(TableConfig
tableConfig, StreamConfig s
List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
offsetsHaveToChange
? Collections.emptyList() // offsets from metadata are not
valid anymore; fetch for all partitions
- : getPartitionGroupConsumptionStatusList(idealState,
streamConfig);
- OffsetCriteria originalOffsetCriteria =
streamConfig.getOffsetCriteria();
+ : getPartitionGroupConsumptionStatusList(idealState,
streamConfigs);
+ // FIXME: Right now, we assume topics are sharing same offset criteria
Review Comment:
Does it make sense to add a precondition to check this?
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java:
##########
@@ -75,6 +84,103 @@ public static Map<String, String>
getStreamConfigMap(TableConfig tableConfig) {
return streamConfigMap;
}
+ /**
+ * Fetches the streamConfig from the given realtime table.
+ * First, the ingestionConfigs->stream->streamConfigs will be checked.
+ * If not found, the indexingConfig->streamConfigs will be checked (which is
deprecated).
+ * @param tableConfig realtime table config
+ * @return streamConfigs List of maps
+ */
+ public static List<Map<String, String>> getStreamConfigMaps(TableConfig
tableConfig) {
Review Comment:
Can we remove the old method if it is no longer used?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -255,12 +255,12 @@ public List<PartitionGroupConsumptionStatus>
getPartitionGroupConsumptionStatusL
// Create a {@link PartitionGroupConsumptionStatus} for each latest segment
StreamPartitionMsgOffsetFactory offsetFactory =
-
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
+
StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory();
for (Map.Entry<Integer, LLCSegmentName> entry :
partitionGroupIdToLatestSegment.entrySet()) {
int partitionGroupId = entry.getKey();
LLCSegmentName llcSegmentName = entry.getValue();
SegmentZKMetadata segmentZKMetadata =
- getSegmentZKMetadata(streamConfig.getTableNameWithType(),
llcSegmentName.getSegmentName());
+ getSegmentZKMetadata(streamConfigs.get(0).getTableNameWithType(),
llcSegmentName.getSegmentName());
Review Comment:
nit: `idealState.getId()` instead of `.get(0)`?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java:
##########
@@ -87,6 +89,33 @@ public MissingConsumingSegmentFinder(String
realtimeTableName, ZkHelixPropertySt
}
}
+ public MissingConsumingSegmentFinder(String realtimeTableName,
ZkHelixPropertyStore<ZNRecord> propertyStore,
+ ControllerMetrics controllerMetrics, List<StreamConfig> streamConfigs) {
+ _realtimeTableName = realtimeTableName;
+ _controllerMetrics = controllerMetrics;
+ _segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore,
controllerMetrics);
+ _streamPartitionMsgOffsetFactory =
+
StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory();
Review Comment:
I think this breaks when mixing streams that do not use the same offset
factory type, e.g. kinesis and kafka. (there's a lot of this specific case for
offset factory, won't mark all)
We could UT, or shall we add a TODO for them since we can't easily test e2e
internally?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java:
##########
@@ -87,6 +89,33 @@ public MissingConsumingSegmentFinder(String
realtimeTableName, ZkHelixPropertySt
}
}
+ public MissingConsumingSegmentFinder(String realtimeTableName,
ZkHelixPropertyStore<ZNRecord> propertyStore,
Review Comment:
The old constructor is no longer used, can we remove it and update the
tests?
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -18,33 +18,44 @@
*/
package org.apache.pinot.spi.stream;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Fetches the list of {@link PartitionGroupMetadata} for all partition groups
of the stream,
+ * Fetches the list of {@link PartitionGroupMetadata} for all partition groups
of the streams,
* using the {@link StreamMetadataProvider}
*/
public class PartitionGroupMetadataFetcher implements Callable<Boolean> {
private static final Logger LOGGER =
LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class);
private List<PartitionGroupMetadata> _newPartitionGroupMetadataList;
- private final StreamConfig _streamConfig;
+ private final List<StreamConfig> _streamConfigs;
private final List<PartitionGroupConsumptionStatus>
_partitionGroupConsumptionStatusList;
- private final StreamConsumerFactory _streamConsumerFactory;
private Exception _exception;
- private final String _topicName;
+ private final List<String> _topicNames;
+
+ public PartitionGroupMetadataFetcher(List<StreamConfig> streamConfigs,
+ List<PartitionGroupConsumptionStatus>
partitionGroupConsumptionStatusList) {
+ _topicNames =
streamConfigs.stream().map(StreamConfig::getTopicName).collect(Collectors.toList());
+ _streamConfigs = streamConfigs;
+ _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList;
+ _newPartitionGroupMetadataList = new ArrayList<>();
+ }
public PartitionGroupMetadataFetcher(StreamConfig streamConfig,
Review Comment:
Similar here, let's remove the unused constructor
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]