Jackie-Jiang commented on code in PR #10454: URL: https://github.com/apache/pinot/pull/10454#discussion_r1147118316
########## pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java: ########## @@ -64,63 +65,50 @@ public void reduceAndSetResults(String tableName, DataSchema dataSchema, List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(_queryContext, dataSchema); DataSchema selectionDataSchema = SelectionOperatorUtils.getResultTableDataSchema(dataSchema, selectionColumns); brokerResponseNative.setResultTable(new ResultTable(selectionDataSchema, Collections.emptyList())); - } else { - // For data table map with more than one data tables, remove conflicting data tables - if (dataTableMap.size() > 1) { - List<ServerRoutingInstance> droppedServers = removeConflictingResponses(dataSchema, dataTableMap); - if (!droppedServers.isEmpty()) { - String errorMessage = QueryException.MERGE_RESPONSE_ERROR.getMessage() + ": responses for table: " + tableName - + " from servers: " + droppedServers + " got dropped due to data schema inconsistency."; - LOGGER.warn(errorMessage); - if (brokerMetrics != null) { - brokerMetrics.addMeteredTableValue(TableNameBuilder.extractRawTableName(tableName), - BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1L); - } - brokerResponseNative.addToExceptions( - new QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, errorMessage)); + return; + } + + // For data table map with more than one data tables, remove conflicting data tables + if (dataTableMap.size() > 1) { + DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes(); + List<ServerRoutingInstance> droppedServers = new ArrayList<>(); + Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator = dataTableMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<ServerRoutingInstance, DataTable> entry = iterator.next(); + DataSchema dataSchemaToCompare = entry.getValue().getDataSchema(); + assert dataSchemaToCompare != null; + if (!Arrays.equals(columnDataTypes, dataSchemaToCompare.getColumnDataTypes())) { + droppedServers.add(entry.getKey()); + iterator.remove(); } } - - int limit = _queryContext.getLimit(); - if (limit > 0 && _queryContext.getOrderByExpressions() != null) { - // Selection order-by - SelectionOperatorService selectionService = new SelectionOperatorService(_queryContext, dataSchema); - selectionService.reduceWithOrdering(dataTableMap.values(), _queryContext.isNullHandlingEnabled()); - brokerResponseNative.setResultTable(selectionService.renderResultTableWithOrdering()); - } else { - // Selection only - List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(_queryContext, dataSchema); - List<Object[]> reducedRows = SelectionOperatorUtils.reduceWithoutOrdering(dataTableMap.values(), limit, - _queryContext.isNullHandlingEnabled()); - brokerResponseNative.setResultTable( - SelectionOperatorUtils.renderResultTableWithoutOrdering(reducedRows, dataSchema, selectionColumns)); + if (!droppedServers.isEmpty()) { + String errorMessage = + QueryException.MERGE_RESPONSE_ERROR.getMessage() + ": responses for table: " + tableName + " from servers: " + + droppedServers + " got dropped due to data schema inconsistency."; Review Comment: No, we want to return the partial response with this error message -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org