yashmayya commented on code in PR #15773:
URL: https://github.com/apache/pinot/pull/15773#discussion_r2113360239
##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -517,4 +519,42 @@ public void forceCommit(String tableNameWithType,
Set<String> segmentNames) {
});
}
}
+
+ // TODO: LogicalTableContext has to be cached.
https://github.com/apache/pinot/issues/15859
+ @Nullable
+ @Override
+ public LogicalTableContext getLogicalTableContext(String logicalTableName) {
+ Schema schema = ZKMetadataProvider.getSchema(getPropertyStore(),
logicalTableName);
+ if (schema == null) {
+ LOGGER.warn("Failed to find schema for logical table: {}, skipping",
logicalTableName);
+ return null;
+ }
+ LogicalTableConfig logicalTableConfig =
ZKMetadataProvider.getLogicalTableConfig(getPropertyStore(),
+ logicalTableName);
+ if (logicalTableConfig == null) {
+ LOGGER.warn("Failed to find logical table config for logical table: {},
skipping", logicalTableName);
+ return null;
+ }
+
+ TableConfig offlineTableConfig = null;
+ if (logicalTableConfig.getRefOfflineTableName() != null) {
+ offlineTableConfig =
ZKMetadataProvider.getOfflineTableConfig(getPropertyStore(),
+ logicalTableConfig.getRefOfflineTableName());
+ if (offlineTableConfig == null) {
Review Comment:
All logical tables are expected to have a reference offline table AND a
reference realtime table?
##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -517,4 +519,42 @@ public void forceCommit(String tableNameWithType,
Set<String> segmentNames) {
});
}
}
+
+ // TODO: LogicalTableContext has to be cached.
https://github.com/apache/pinot/issues/15859
+ @Nullable
+ @Override
+ public LogicalTableContext getLogicalTableContext(String logicalTableName) {
Review Comment:
Isn't this a pretty major issue if each MSE query referencing a logical
table requires 4 ZK calls? AFAIK we have been very careful about avoiding
direct ZK access in any query path.
cc - @Jackie-Jiang
##########
pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java:
##########
@@ -86,8 +90,20 @@ public Table getTable(String name) {
*/
@Override
public Set<String> getTableNames() {
- return _tableCache.getTableNameMap().keySet().stream().filter(n ->
DatabaseUtils.isPartOfDatabase(n, _databaseName))
- .collect(Collectors.toSet());
+ Set<String> result = new HashSet<>();
+ for (String tableName: _tableCache.getTableNameMap().keySet()) {
+ if (DatabaseUtils.isPartOfDatabase(tableName, _databaseName)) {
+ result.add(tableName);
+ }
+ }
+
+ for (String logicalTableName:
_tableCache.getLogicalTableNameMap().keySet()) {
+ if (DatabaseUtils.isPartOfDatabase(logicalTableName, _databaseName)) {
+ result.add(logicalTableName);
+ }
+ }
+
+ return result;
Review Comment:
nit: optional suggestion to continue using the functional approach:
```suggestion
return Stream.concat(_tableCache.getTableNameMap().keySet().stream(),
_tableCache.getLogicalTableNameMap().keySet().stream())
.filter(n -> DatabaseUtils.isPartOfDatabase(n, _databaseName))
.collect(Collectors.toSet());
```
##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java:
##########
@@ -392,7 +400,11 @@ private void assignWorkersToLeafFragment(PlanFragment
fragment, DispatchablePlan
if
(Boolean.parseBoolean(tableOptions.get(PinotHintOptions.TableHintOptions.IS_REPLICATED)))
{
setSegmentsForReplicatedLeafFragment(metadata);
} else {
- assignWorkersToNonPartitionedLeafFragment(metadata, context);
+ if (metadata.getLogicalTableRouteInfo() != null) {
Review Comment:
I presume we won't be supporting partitioned leaf stage for logical tables?
##########
pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java:
##########
@@ -36,37 +37,42 @@
@AutoService(TimeBoundaryStrategy.class)
public class MinTimeBoundaryStrategy implements TimeBoundaryStrategy {
- public static final String INCLUDED_TABLES = "includedTables";
+ private static final String INCLUDED_TABLES = "includedTables";
+ Map<String, DateTimeFormatSpec> _dateTimeFormatSpecMap;
+
+ @Override
+ public void init(LogicalTableConfig logicalTableConfig, TableCache
tableCache) {
+ Map<String, Object> parameters =
logicalTableConfig.getTimeBoundaryConfig().getParameters();
Review Comment:
Unused?
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java:
##########
@@ -386,4 +409,69 @@ private static List<Expression>
computeInOperands(List<Object[]> dataContainer,
}
return expressions;
}
+
+ private static List<InstanceRequest>
constructLogicalTableServerQueryRequests(
+ OpChainExecutionContext executionContext, PinotQuery pinotQuery,
InstanceDataManager instanceDataManager) {
+ StageMetadata stageMetadata = executionContext.getStageMetadata();
+ String logicalTableName = stageMetadata.getTableName();
+ LogicalTableContext logicalTableContext =
instanceDataManager.getLogicalTableContext(logicalTableName);
+ Preconditions.checkNotNull(logicalTableContext,
+ "LogicalTableManager not found for logical table name: " +
logicalTableName);
+
+ Map<String, List<String>> logicalTableSegmentsMap =
+ executionContext.getWorkerMetadata().getLogicalTableSegmentsMap();
+ List<TableSegmentsInfo> offlineTableRouteInfoList = new ArrayList<>();
+ List<TableSegmentsInfo> realtimeTableRouteInfoList = new ArrayList<>();
+
+ Preconditions.checkNotNull(logicalTableSegmentsMap);
+ for (Map.Entry<String, List<String>> entry:
logicalTableSegmentsMap.entrySet()) {
+ String physicalTableName = entry.getKey();
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(physicalTableName);
+ TableSegmentsInfo tableSegmentsInfo = new TableSegmentsInfo();
+ tableSegmentsInfo.setTableName(physicalTableName);
+ tableSegmentsInfo.setSegments(entry.getValue());
+ if (tableType == TableType.REALTIME) {
+ realtimeTableRouteInfoList.add(tableSegmentsInfo);
+ } else {
+ offlineTableRouteInfoList.add(tableSegmentsInfo);
+ }
+ }
+
+ TimeBoundaryInfo timeBoundaryInfo = stageMetadata.getTimeBoundary();
+
+ if (offlineTableRouteInfoList.isEmpty() ||
realtimeTableRouteInfoList.isEmpty()) {
+ List<TableSegmentsInfo> routeInfoList =
+ offlineTableRouteInfoList.isEmpty() ? realtimeTableRouteInfoList :
offlineTableRouteInfoList;
+ String tableType = offlineTableRouteInfoList.isEmpty() ?
TableType.REALTIME.name() : TableType.OFFLINE.name();
+ if (tableType.equals(TableType.OFFLINE.name())) {
+
Preconditions.checkNotNull(logicalTableContext.getRefOfflineTableConfig());
+ String offlineTableName =
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(logicalTableName);
Review Comment:
Why do we need to suffix the logical table name with `OFFLINE`?
##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java:
##########
@@ -137,7 +140,21 @@ public Void visitSort(SortNode node,
DispatchablePlanContext context) {
@Override
public Void visitTableScan(TableScanNode node, DispatchablePlanContext
context) {
DispatchablePlanMetadata dispatchablePlanMetadata =
getOrCreateDispatchablePlanMetadata(node, context);
-
dispatchablePlanMetadata.addScannedTable(_tableCache.getActualTableName(node.getTableName()));
+
+ LogicalTableRouteInfo logicalTableRouteInfo = null;
+
+ String tableNameInNode = node.getTableName();
+ String tableName = _tableCache.getActualTableName(tableNameInNode);
+ if (tableName == null) {
+ tableName = _tableCache.getActualLogicalTableName(tableNameInNode);
+ Preconditions.checkNotNull(tableName, "Logical table config not found in
table cache: " + tableNameInNode);
+ LogicalTableRouteProvider tableRouteProvider = new
LogicalTableRouteProvider();
+ logicalTableRouteInfo = new LogicalTableRouteInfo();
+ tableRouteProvider.fillTableConfigMetadata(logicalTableRouteInfo,
tableName, _tableCache);
+ }
+
+ dispatchablePlanMetadata.addScannedTable(tableName);
+ dispatchablePlanMetadata.setLogicalTableRouteInfo(logicalTableRouteInfo);
Review Comment:
Could we add this and the `LogicalTableRouteInfo` instantiation /
initialization to the if block where we're dealing with everything logical
table related so that it's neatly contained
(`dispatchablePlanMetadata.setLogicalTableRouteInfo(null)` isn't explicitly
required for physical tables)?
##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java:
##########
@@ -80,6 +81,24 @@ public class DispatchablePlanMetadata implements
Serializable {
// Map from workerId -> {planFragmentId -> mailboxes}
private final Map<Integer, Map<Integer, MailboxInfos>>
_workerIdToMailboxesMap = new HashMap<>();
+ /**
+ * Map from workerId -> {tableType -> {tableName -> segments}} is required
for logical tables.
+ * Raw definition of the map is:
+ * Map<String, Map<String, Map<String, List<String>>>>. Since this
definition is hard to understand - specifically
+ * what do each of the string keys store, we define two classes:
+ * {@link TableTypeToSegmentsMap} and {@link
TableTypeTableNameToSegmentsMap} to help read code more easily.
+ */
+ public static class TableTypeToSegmentsMap {
Review Comment:
So just to clarify, both `workerIdToSegmentsMap` (physical tables only?) and
`workerIdToTableSegmentsMap` (logical tables only?) are `Map<Integer,
Map<String, List<String>>>` but in the former, the inner map string key is
simply `OFFLINE` or `REALTIME` and in the latter it's gonna be a physical table
name with type?
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java:
##########
@@ -233,7 +252,11 @@ private static InstanceRequest
compileInstanceRequest(OpChainExecutionContext ex
instanceRequest.setCid(QueryThreadContext.getCid());
instanceRequest.setBrokerId("unknown");
instanceRequest.setEnableTrace(executionContext.isTraceEnabled());
- instanceRequest.setSearchSegments(segmentList);
+ if (segmentList != null) {
+ instanceRequest.setSearchSegments(segmentList);
+ } else {
+ instanceRequest.setTableSegmentsInfoList(tableRouteInfoList);
+ }
Review Comment:
nit: A comment here about the physical vs logical table difference here
might be useful
##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java:
##########
@@ -101,4 +103,30 @@ public void setTableSegmentsMap(Map<String, List<String>>
tableSegmentsMap) {
}
_customProperties.put(TABLE_SEGMENTS_MAP_KEY, tableSegmentsMapStr);
}
+
+ @Nullable
+ public Map<String, List<String>> getLogicalTableSegmentsMap() {
+ String logicalTableSegmentsMapStr =
_customProperties.get(LOGICAL_TABLE_SEGMENTS_MAP_KEY);
+ if (logicalTableSegmentsMapStr != null) {
+ try {
+ return JsonUtils.stringToObject(logicalTableSegmentsMapStr,
+ new TypeReference<Map<String, List<String>>>() {
+ });
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to deserialize table segments map:
" + logicalTableSegmentsMapStr, e);
+ }
+ } else {
+ return null;
+ }
+ }
Review Comment:
This is basically identical to `getTableSegmentsMap` with the only
difference being the map key, can we extract it out into a common method?
##########
pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java:
##########
@@ -36,37 +37,42 @@
@AutoService(TimeBoundaryStrategy.class)
public class MinTimeBoundaryStrategy implements TimeBoundaryStrategy {
- public static final String INCLUDED_TABLES = "includedTables";
+ private static final String INCLUDED_TABLES = "includedTables";
+ Map<String, DateTimeFormatSpec> _dateTimeFormatSpecMap;
+
+ @Override
+ public void init(LogicalTableConfig logicalTableConfig, TableCache
tableCache) {
+ Map<String, Object> parameters =
logicalTableConfig.getTimeBoundaryConfig().getParameters();
+ List<String> includedTables =
getTimeBoundaryTableNames(logicalTableConfig);
+ _dateTimeFormatSpecMap = new HashMap<>(includedTables.size());
+ for (String physicalTableName : includedTables) {
+ String rawTableName =
TableNameBuilder.extractRawTableName(physicalTableName);
+ Schema schema = tableCache.getSchema(rawTableName);
+ TableConfig tableConfig = tableCache.getTableConfig(physicalTableName);
+ Preconditions.checkArgument(tableConfig != null, "Table config not found
for table: %s", physicalTableName);
+ Preconditions.checkArgument(schema != null, "Schema not found for table:
%s", physicalTableName);
+ String timeColumnName =
tableConfig.getValidationConfig().getTimeColumnName();
+ DateTimeFieldSpec dateTimeFieldSpec =
schema.getSpecForTimeColumn(timeColumnName);
+ Preconditions.checkArgument(dateTimeFieldSpec != null, "Time column not
found in schema for table: %s",
+ physicalTableName);
+ DateTimeFormatSpec specFormatSpec = dateTimeFieldSpec.getFormatSpec();
+ _dateTimeFormatSpecMap.put(physicalTableName, specFormatSpec);
+ }
+ }
@Override
public String getName() {
return "min";
}
@Override
- public TimeBoundaryInfo computeTimeBoundary(LogicalTableConfig
logicalTableConfig, TableCache tableCache,
- RoutingManager routingManager) {
+ public TimeBoundaryInfo computeTimeBoundary(RoutingManager routingManager) {
Review Comment:
Are these changes related to support for logical tables in MSE or an
unrelated refactor?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]