snleee commented on code in PR #8550:
URL: https://github.com/apache/pinot/pull/8550#discussion_r851439939
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -363,7 +363,13 @@ private BrokerResponseNative handleSQLRequest(long
requestId, String query, Json
// Validate the request
try {
- validateRequest(pinotQuery, _queryResponseLimit);
+ int numReplicas = 1;
+ if (offlineTableConfig != null) {
+ numReplicas =
offlineTableConfig.getValidationConfig().getReplicationNumber();
+ } else if (realtimeTableConfig != null) {
+ numReplicas =
realtimeTableConfig.getValidationConfig().getReplicationNumber();
Review Comment:
For realtime, `getReplicasPerPartitionNumber()` is the correct indicator for
the replication factor.
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java:
##########
@@ -49,15 +50,26 @@ public ReplicaGroupInstanceSelector(String
tableNameWithType, BrokerMetrics brok
@Override
Map<String, String> select(List<String> segments, int requestId,
- Map<String, List<String>> segmentToEnabledInstancesMap) {
+ Map<String, List<String>> segmentToEnabledInstancesMap, Map<String,
String> queryOptions) {
Map<String, String> segmentToSelectedInstanceMap = new
HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
+ // validate queryOptions to get query
+ String replicaGroup =
queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.NUM_REPLICA_GROUPS);
+ int replicaGroupNum = 1;
+ int currentRequest = 0;
+ if (replicaGroup != null) {
+ replicaGroupNum = Integer.parseInt(replicaGroup);
Review Comment:
`replicaGroupNum`->`numReplicaGroup`
##########
pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java:
##########
@@ -581,6 +585,132 @@ public void testInstanceSelector() {
assertTrue(selectionResult.getUnavailableSegments().isEmpty());
}
+ @Test
+ public void testReplicaGroupInstanceSelectorNumReplicaGroups() {
+ String offlineTableName = "testTable_OFFLINE";
+ BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
+ BrokerRequest brokerRequest = mock(BrokerRequest.class);
+ PinotQuery pinotQuery = mock(PinotQuery.class);
+ Map<String, String> queryOptions = new HashMap<>();
+ queryOptions.put("numReplicaGroups", "2");
Review Comment:
Can you also cover the case with `numReplicaGroups=4`?
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java:
##########
@@ -49,15 +50,26 @@ public ReplicaGroupInstanceSelector(String
tableNameWithType, BrokerMetrics brok
@Override
Map<String, String> select(List<String> segments, int requestId,
- Map<String, List<String>> segmentToEnabledInstancesMap) {
+ Map<String, List<String>> segmentToEnabledInstancesMap, Map<String,
String> queryOptions) {
Map<String, String> segmentToSelectedInstanceMap = new
HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
+ // validate queryOptions to get query
+ String replicaGroup =
queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.NUM_REPLICA_GROUPS);
Review Comment:
Instead of re-writing the option based on the table config, you can use
`enabledInstances` + `numReplicaGroup`.
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java:
##########
@@ -49,15 +50,26 @@ public ReplicaGroupInstanceSelector(String
tableNameWithType, BrokerMetrics brok
@Override
Map<String, String> select(List<String> segments, int requestId,
- Map<String, List<String>> segmentToEnabledInstancesMap) {
+ Map<String, List<String>> segmentToEnabledInstancesMap, Map<String,
String> queryOptions) {
Map<String, String> segmentToSelectedInstanceMap = new
HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
+ // validate queryOptions to get query
+ String replicaGroup =
queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.NUM_REPLICA_GROUPS);
+ int replicaGroupNum = 1;
+ int currentRequest = 0;
+ if (replicaGroup != null) {
+ replicaGroupNum = Integer.parseInt(replicaGroup);
+ }
for (String segment : segments) {
List<String> enabledInstances =
segmentToEnabledInstancesMap.get(segment);
// NOTE: enabledInstances can be null when there is no enabled instances
for the segment, or the instance selector
// has not been updated (we update all components for routing in
sequence)
if (enabledInstances != null) {
int numEnabledInstances = enabledInstances.size();
- segmentToSelectedInstanceMap.put(segment,
enabledInstances.get(requestId % numEnabledInstances));
+ int instanceToSelect = (requestId + currentRequest++) %
numEnabledInstances;
Review Comment:
Can we add the comment in the header about how the behavior changes when the
option is given?
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -2152,8 +2159,16 @@ static void validateRequest(PinotQuery pinotQuery, int
queryResponseLimit) {
throw new IllegalStateException("SQL query should always have response
format and group-by mode set to SQL");
}
+ // throw errors if options is less than 1, rectify if larger that current
replicas
+ if (queryOptions.get(Broker.Request.QueryOptionKey.NUM_REPLICA_GROUPS) !=
null) {
Review Comment:
I'm not sure if the `validateRequest()` is the correct place to update value
for `queryOptions`. Why don't we process this in `select()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]