yashmayya commented on code in PR #15773:
URL: https://github.com/apache/pinot/pull/15773#discussion_r2113360239


##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -517,4 +519,42 @@ public void forceCommit(String tableNameWithType, 
Set<String> segmentNames) {
       });
     }
   }
+
+  // TODO: LogicalTableContext has to be cached. 
https://github.com/apache/pinot/issues/15859
+  @Nullable
+  @Override
+  public LogicalTableContext getLogicalTableContext(String logicalTableName) {
+    Schema schema = ZKMetadataProvider.getSchema(getPropertyStore(), 
logicalTableName);
+    if (schema == null) {
+      LOGGER.warn("Failed to find schema for logical table: {}, skipping", 
logicalTableName);
+      return null;
+    }
+    LogicalTableConfig logicalTableConfig = 
ZKMetadataProvider.getLogicalTableConfig(getPropertyStore(),
+        logicalTableName);
+    if (logicalTableConfig == null) {
+      LOGGER.warn("Failed to find logical table config for logical table: {}, 
skipping", logicalTableName);
+      return null;
+    }
+
+    TableConfig offlineTableConfig = null;
+    if (logicalTableConfig.getRefOfflineTableName() != null) {
+      offlineTableConfig = 
ZKMetadataProvider.getOfflineTableConfig(getPropertyStore(),
+          logicalTableConfig.getRefOfflineTableName());
+      if (offlineTableConfig == null) {

Review Comment:
   All logical tables are expected to have a reference offline table AND a 
reference realtime table?



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -517,4 +519,42 @@ public void forceCommit(String tableNameWithType, 
Set<String> segmentNames) {
       });
     }
   }
+
+  // TODO: LogicalTableContext has to be cached. 
https://github.com/apache/pinot/issues/15859
+  @Nullable
+  @Override
+  public LogicalTableContext getLogicalTableContext(String logicalTableName) {

Review Comment:
   Isn't this a pretty major issue if each MSE query referencing a logical 
table requires 4 ZK calls? AFAIK we have been very careful about avoiding 
direct ZK access in any query path.
   
   cc - @Jackie-Jiang



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java:
##########
@@ -86,8 +90,20 @@ public Table getTable(String name) {
    */
   @Override
   public Set<String> getTableNames() {
-    return _tableCache.getTableNameMap().keySet().stream().filter(n -> 
DatabaseUtils.isPartOfDatabase(n, _databaseName))
-        .collect(Collectors.toSet());
+    Set<String> result = new HashSet<>();
+    for (String tableName: _tableCache.getTableNameMap().keySet()) {
+      if (DatabaseUtils.isPartOfDatabase(tableName, _databaseName)) {
+        result.add(tableName);
+      }
+    }
+
+    for (String logicalTableName: 
_tableCache.getLogicalTableNameMap().keySet()) {
+      if (DatabaseUtils.isPartOfDatabase(logicalTableName, _databaseName)) {
+        result.add(logicalTableName);
+      }
+    }
+
+    return result;

Review Comment:
   nit: optional suggestion to continue using the functional approach:
   ```suggestion
       return Stream.concat(_tableCache.getTableNameMap().keySet().stream(),
               _tableCache.getLogicalTableNameMap().keySet().stream())
           .filter(n -> DatabaseUtils.isPartOfDatabase(n, _databaseName))
           .collect(Collectors.toSet());
   ```



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java:
##########
@@ -392,7 +400,11 @@ private void assignWorkersToLeafFragment(PlanFragment 
fragment, DispatchablePlan
     if 
(Boolean.parseBoolean(tableOptions.get(PinotHintOptions.TableHintOptions.IS_REPLICATED)))
 {
       setSegmentsForReplicatedLeafFragment(metadata);
     } else {
-      assignWorkersToNonPartitionedLeafFragment(metadata, context);
+      if (metadata.getLogicalTableRouteInfo() != null) {

Review Comment:
   I presume we won't be supporting partitioned leaf stage for logical tables?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java:
##########
@@ -36,37 +37,42 @@
 @AutoService(TimeBoundaryStrategy.class)
 public class MinTimeBoundaryStrategy implements TimeBoundaryStrategy {
 
-  public static final String INCLUDED_TABLES = "includedTables";
+  private static final String INCLUDED_TABLES = "includedTables";
+  Map<String, DateTimeFormatSpec> _dateTimeFormatSpecMap;
+
+  @Override
+  public void init(LogicalTableConfig logicalTableConfig, TableCache 
tableCache) {
+    Map<String, Object> parameters = 
logicalTableConfig.getTimeBoundaryConfig().getParameters();

Review Comment:
   Unused?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java:
##########
@@ -386,4 +409,69 @@ private static List<Expression> 
computeInOperands(List<Object[]> dataContainer,
     }
     return expressions;
   }
+
+  private static List<InstanceRequest> 
constructLogicalTableServerQueryRequests(
+      OpChainExecutionContext executionContext, PinotQuery pinotQuery, 
InstanceDataManager instanceDataManager) {
+    StageMetadata stageMetadata = executionContext.getStageMetadata();
+    String logicalTableName = stageMetadata.getTableName();
+    LogicalTableContext logicalTableContext = 
instanceDataManager.getLogicalTableContext(logicalTableName);
+    Preconditions.checkNotNull(logicalTableContext,
+        "LogicalTableManager not found for logical table name: " + 
logicalTableName);
+
+    Map<String, List<String>> logicalTableSegmentsMap =
+        executionContext.getWorkerMetadata().getLogicalTableSegmentsMap();
+    List<TableSegmentsInfo> offlineTableRouteInfoList = new ArrayList<>();
+    List<TableSegmentsInfo> realtimeTableRouteInfoList = new ArrayList<>();
+
+    Preconditions.checkNotNull(logicalTableSegmentsMap);
+    for (Map.Entry<String, List<String>> entry: 
logicalTableSegmentsMap.entrySet()) {
+      String physicalTableName = entry.getKey();
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(physicalTableName);
+      TableSegmentsInfo tableSegmentsInfo = new TableSegmentsInfo();
+      tableSegmentsInfo.setTableName(physicalTableName);
+      tableSegmentsInfo.setSegments(entry.getValue());
+      if (tableType == TableType.REALTIME) {
+        realtimeTableRouteInfoList.add(tableSegmentsInfo);
+      } else {
+        offlineTableRouteInfoList.add(tableSegmentsInfo);
+      }
+    }
+
+    TimeBoundaryInfo timeBoundaryInfo = stageMetadata.getTimeBoundary();
+
+    if (offlineTableRouteInfoList.isEmpty() || 
realtimeTableRouteInfoList.isEmpty()) {
+      List<TableSegmentsInfo> routeInfoList =
+          offlineTableRouteInfoList.isEmpty() ? realtimeTableRouteInfoList : 
offlineTableRouteInfoList;
+      String tableType = offlineTableRouteInfoList.isEmpty() ? 
TableType.REALTIME.name() : TableType.OFFLINE.name();
+      if (tableType.equals(TableType.OFFLINE.name())) {
+        
Preconditions.checkNotNull(logicalTableContext.getRefOfflineTableConfig());
+        String offlineTableName = 
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(logicalTableName);

Review Comment:
   Why do we need to suffix the logical table name with `OFFLINE`?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java:
##########
@@ -137,7 +140,21 @@ public Void visitSort(SortNode node, 
DispatchablePlanContext context) {
   @Override
   public Void visitTableScan(TableScanNode node, DispatchablePlanContext 
context) {
     DispatchablePlanMetadata dispatchablePlanMetadata = 
getOrCreateDispatchablePlanMetadata(node, context);
-    
dispatchablePlanMetadata.addScannedTable(_tableCache.getActualTableName(node.getTableName()));
+
+    LogicalTableRouteInfo logicalTableRouteInfo = null;
+
+    String tableNameInNode = node.getTableName();
+    String tableName = _tableCache.getActualTableName(tableNameInNode);
+    if (tableName == null) {
+      tableName = _tableCache.getActualLogicalTableName(tableNameInNode);
+      Preconditions.checkNotNull(tableName, "Logical table config not found in 
table cache: " + tableNameInNode);
+      LogicalTableRouteProvider tableRouteProvider = new 
LogicalTableRouteProvider();
+      logicalTableRouteInfo = new LogicalTableRouteInfo();
+      tableRouteProvider.fillTableConfigMetadata(logicalTableRouteInfo, 
tableName, _tableCache);
+    }
+
+    dispatchablePlanMetadata.addScannedTable(tableName);
+    dispatchablePlanMetadata.setLogicalTableRouteInfo(logicalTableRouteInfo);

Review Comment:
   Could we add this and the `LogicalTableRouteInfo` instantiation / 
initialization to the if block where we're dealing with everything logical 
table related so that it's neatly contained 
(`dispatchablePlanMetadata.setLogicalTableRouteInfo(null)` isn't explicitly 
required for physical tables)? 



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java:
##########
@@ -80,6 +81,24 @@ public class DispatchablePlanMetadata implements 
Serializable {
   // Map from workerId -> {planFragmentId -> mailboxes}
   private final Map<Integer, Map<Integer, MailboxInfos>> 
_workerIdToMailboxesMap = new HashMap<>();
 
+  /**
+   * Map from workerId -> {tableType -> {tableName -> segments}} is required 
for logical tables.
+   * Raw definition of the map is:
+   * Map<String, Map<String, Map<String, List<String>>>>. Since this 
definition is hard to understand - specifically
+   * what do each of the string keys store, we define two classes:
+   * {@link TableTypeToSegmentsMap} and {@link 
TableTypeTableNameToSegmentsMap} to help read code more easily.
+   */
+  public static class TableTypeToSegmentsMap {

Review Comment:
   So just to clarify, both `workerIdToSegmentsMap` (physical tables only?) and 
`workerIdToTableSegmentsMap` (logical tables only?) are `Map<Integer, 
Map<String, List<String>>>` but in the former, the inner map string key is 
simply `OFFLINE` or `REALTIME` and in the latter it's gonna be a physical table 
name with type?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java:
##########
@@ -233,7 +252,11 @@ private static InstanceRequest 
compileInstanceRequest(OpChainExecutionContext ex
     instanceRequest.setCid(QueryThreadContext.getCid());
     instanceRequest.setBrokerId("unknown");
     instanceRequest.setEnableTrace(executionContext.isTraceEnabled());
-    instanceRequest.setSearchSegments(segmentList);
+    if (segmentList != null) {
+      instanceRequest.setSearchSegments(segmentList);
+    } else {
+      instanceRequest.setTableSegmentsInfoList(tableRouteInfoList);
+    }

Review Comment:
   nit: A comment here about the physical vs logical table difference here 
might be useful



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java:
##########
@@ -101,4 +103,30 @@ public void setTableSegmentsMap(Map<String, List<String>> 
tableSegmentsMap) {
     }
     _customProperties.put(TABLE_SEGMENTS_MAP_KEY, tableSegmentsMapStr);
   }
+
+  @Nullable
+  public Map<String, List<String>> getLogicalTableSegmentsMap() {
+    String logicalTableSegmentsMapStr = 
_customProperties.get(LOGICAL_TABLE_SEGMENTS_MAP_KEY);
+    if (logicalTableSegmentsMapStr != null) {
+      try {
+        return JsonUtils.stringToObject(logicalTableSegmentsMapStr,
+            new TypeReference<Map<String, List<String>>>() {
+            });
+      } catch (IOException e) {
+        throw new RuntimeException("Unable to deserialize table segments map: 
" + logicalTableSegmentsMapStr, e);
+      }
+    } else {
+      return null;
+    }
+  }

Review Comment:
   This is basically identical to `getTableSegmentsMap` with the only 
difference being the map key, can we extract it out into a common method?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java:
##########
@@ -36,37 +37,42 @@
 @AutoService(TimeBoundaryStrategy.class)
 public class MinTimeBoundaryStrategy implements TimeBoundaryStrategy {
 
-  public static final String INCLUDED_TABLES = "includedTables";
+  private static final String INCLUDED_TABLES = "includedTables";
+  Map<String, DateTimeFormatSpec> _dateTimeFormatSpecMap;
+
+  @Override
+  public void init(LogicalTableConfig logicalTableConfig, TableCache 
tableCache) {
+    Map<String, Object> parameters = 
logicalTableConfig.getTimeBoundaryConfig().getParameters();
+    List<String> includedTables = 
getTimeBoundaryTableNames(logicalTableConfig);
+    _dateTimeFormatSpecMap = new HashMap<>(includedTables.size());
+    for (String physicalTableName : includedTables) {
+      String rawTableName = 
TableNameBuilder.extractRawTableName(physicalTableName);
+      Schema schema = tableCache.getSchema(rawTableName);
+      TableConfig tableConfig = tableCache.getTableConfig(physicalTableName);
+      Preconditions.checkArgument(tableConfig != null, "Table config not found 
for table: %s", physicalTableName);
+      Preconditions.checkArgument(schema != null, "Schema not found for table: 
%s", physicalTableName);
+      String timeColumnName = 
tableConfig.getValidationConfig().getTimeColumnName();
+      DateTimeFieldSpec dateTimeFieldSpec = 
schema.getSpecForTimeColumn(timeColumnName);
+      Preconditions.checkArgument(dateTimeFieldSpec != null, "Time column not 
found in schema for table: %s",
+          physicalTableName);
+      DateTimeFormatSpec specFormatSpec = dateTimeFieldSpec.getFormatSpec();
+      _dateTimeFormatSpecMap.put(physicalTableName, specFormatSpec);
+    }
+  }
 
   @Override
   public String getName() {
     return "min";
   }
 
   @Override
-  public TimeBoundaryInfo computeTimeBoundary(LogicalTableConfig 
logicalTableConfig, TableCache tableCache,
-      RoutingManager routingManager) {
+  public TimeBoundaryInfo computeTimeBoundary(RoutingManager routingManager) {

Review Comment:
   Are these changes related to support for logical tables in MSE or an 
unrelated refactor? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to