This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ba99dc4310 [Logical Table] Support logical tables in MSE. (#15773)
ba99dc4310 is described below
commit ba99dc4310615e4a299c278bf336a74d79dc71ce
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Fri May 30 19:24:31 2025 +0530
[Logical Table] Support logical tables in MSE. (#15773)
---
.../api/resources/PinotQueryResource.java | 54 +++++++--
.../helix/core/PinotHelixResourceManager.java | 11 ++
.../core/data/manager/InstanceDataManager.java | 6 +
.../core/data/manager/LogicalTableContext.java | 55 +++++++++
.../BaseLogicalTableIntegrationTest.java | 115 +++++++++++++++++--
.../apache/pinot/query/catalog/PinotCatalog.java | 11 +-
.../planner/physical/DispatchablePlanContext.java | 7 ++
.../planner/physical/DispatchablePlanMetadata.java | 26 +++++
.../planner/physical/DispatchablePlanVisitor.java | 17 ++-
.../apache/pinot/query/routing/WorkerManager.java | 96 +++++++++++++++-
.../apache/pinot/query/routing/WorkerMetadata.java | 27 ++++-
.../table/ImplicitHybridTableRouteProvider.java | 17 ++-
.../query/routing/table/LogicalTableRouteInfo.java | 53 +++++----
.../routing/table/LogicalTableRouteProvider.java | 94 ++++++++++------
.../routing/table/PhysicalTableRouteProvider.java | 1 -
.../timeboundary/MinTimeBoundaryStrategy.java | 47 ++++----
.../query/timeboundary/TimeBoundaryStrategy.java | 12 +-
.../query/routing/table/BaseTableRouteTest.java | 2 +-
...HybridTableRouteProviderCalculateRouteTest.java | 17 ++-
...tHybridTableRouteProviderGetTableRouteTest.java | 65 +++--------
.../timeboundary/MinTimeBoundaryStrategyTest.java | 5 +-
.../plan/server/ServerPlanRequestUtils.java | 124 ++++++++++++++++++---
.../starter/helix/HelixInstanceDataManager.java | 40 +++++++
23 files changed, 719 insertions(+), 183 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
index 73d880570e..ceaaa8bf07 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
@@ -69,6 +69,7 @@ import
org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.parser.utils.ParserUtils;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.exception.DatabaseConflictException;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.QueryErrorMessage;
@@ -240,12 +241,30 @@ public class PinotQueryResource {
List<String> instanceIds;
if (!tableNames.isEmpty()) {
List<TableConfig> tableConfigList = getListTableConfigs(tableNames,
database);
- if (tableConfigList == null || tableConfigList.isEmpty()) {
- throw QueryErrorCode.TABLE_DOES_NOT_EXIST.asException("Unable to find
table in cluster, table does not exist");
+ List<LogicalTableConfig> logicalTableConfigList = null;
+ // First check for table configs, if not found, check for logical table
configs.
+ if (tableConfigList.size() != tableNames.size()) {
+ logicalTableConfigList = getListLogicalTableConfigs(tableNames,
database);
+ // If config is not found for all tables, then find the tables that
are not found.
+ if ((tableConfigList.size() + logicalTableConfigList.size()) !=
tableNames.size()) {
+ Set<String> tableNamesFoundSet = new HashSet<>();
+ for (TableConfig tableConfig : tableConfigList) {
+ tableNamesFoundSet.add(tableConfig.getTableName());
+ }
+ for (LogicalTableConfig logicalTableConfig : logicalTableConfigList)
{
+ tableNamesFoundSet.add(logicalTableConfig.getTableName());
+ }
+
+ List<String> tablesNotFound = tableNames.stream().filter(name ->
!tableNamesFoundSet.contains(name))
+ .collect(Collectors.toList());
+
+ throw QueryErrorCode.TABLE_DOES_NOT_EXIST.asException(
+ "Unable to find table in cluster, table does not exist for
tables: " + tablesNotFound);
+ }
}
// find the unions of all the broker tenant tags of the queried tables.
- Set<String> brokerTenantsUnion = getBrokerTenantsUnion(tableConfigList);
+ Set<String> brokerTenantsUnion = getBrokerTenantsUnion(tableConfigList,
logicalTableConfigList);
if (brokerTenantsUnion.isEmpty()) {
throw QueryErrorCode.BROKER_REQUEST_SEND.asException("Unable to find
broker tenant for tables: " + tableNames);
}
@@ -324,14 +343,28 @@ public class PinotQueryResource {
if (_pinotHelixResourceManager.hasOfflineTable(actualTableName)) {
tableConfigList.add(Objects.requireNonNull(_pinotHelixResourceManager.getOfflineTableConfig(actualTableName)));
}
- if (tableConfigList.isEmpty()) {
- return null;
+ // If no table configs found for the table, skip it.
+ if (!tableConfigList.isEmpty()) {
+ allTableConfigList.addAll(tableConfigList);
}
- allTableConfigList.addAll(tableConfigList);
}
return allTableConfigList;
}
+ private List<LogicalTableConfig> getListLogicalTableConfigs(List<String>
tableNames, String database) {
+ List<LogicalTableConfig> allLogicalTableConfigList = new ArrayList<>();
+ for (String tableName : tableNames) {
+ String actualTableName =
_pinotHelixResourceManager.getActualLogicalTableName(tableName, database);
+ LogicalTableConfig logicalTableConfig =
+ _pinotHelixResourceManager.getLogicalTableConfig(actualTableName);
+ if (logicalTableConfig != null) {
+ allLogicalTableConfigList.add(logicalTableConfig);
+ }
+ }
+ return allLogicalTableConfigList;
+ }
+
+
private String selectRandomInstanceId(List<String> instanceIds) {
if (instanceIds.isEmpty()) {
throw QueryErrorCode.BROKER_RESOURCE_MISSING.asException("No broker
found for query");
@@ -356,11 +389,18 @@ public class PinotQueryResource {
}
// return the union of brokerTenants from the tables list.
- private Set<String> getBrokerTenantsUnion(List<TableConfig> tableConfigList)
{
+ private Set<String> getBrokerTenantsUnion(List<TableConfig> tableConfigList,
+ @Nullable List<LogicalTableConfig> logicalTableConfigList) {
Set<String> tableBrokerTenants = new HashSet<>();
for (TableConfig tableConfig : tableConfigList) {
tableBrokerTenants.add(tableConfig.getTenantConfig().getBroker());
}
+
+ if (logicalTableConfigList != null) {
+ for (LogicalTableConfig logicalTableConfig : logicalTableConfigList) {
+ tableBrokerTenants.add(logicalTableConfig.getBrokerTenant());
+ }
+ }
return tableBrokerTenants;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 0a4b2d37f3..ef823d55bf 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -906,6 +906,17 @@ public class PinotHelixResourceManager {
return actualTableName != null ? actualTableName : tableName;
}
+ /**
+ * Given a logical table name in any case, returns the logical table name as
defined in Helix/Segment/Schema
+ * @param logicalTableName logical tableName in any case.
+ * @return logicalTableName actually defined in Pinot (matches case) and
exists ,else, return the input value
+ */
+ public String getActualLogicalTableName(String logicalTableName, @Nullable
String databaseName) {
+ logicalTableName = DatabaseUtils.translateTableName(logicalTableName,
databaseName, _tableCache.isIgnoreCase());
+ String actualTableName =
_tableCache.getActualLogicalTableName(logicalTableName);
+ return actualTableName != null ? actualTableName : logicalTableName;
+ }
+
/**
* Table related APIs
*/
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index d5572dce52..15ef79f465 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -201,4 +201,10 @@ public interface InstanceDataManager {
* Returns the instance data directory
*/
String getInstanceDataDir();
+
+ /**
+ * Returns the logical table config and schema for the given logical table
name.
+ */
+ @Nullable
+ LogicalTableContext getLogicalTableContext(String logicalTableName);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/LogicalTableContext.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/LogicalTableContext.java
new file mode 100644
index 0000000000..86592f2db7
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/LogicalTableContext.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager;
+
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+public class LogicalTableContext {
+ private final LogicalTableConfig _logicalTableConfig;
+ private final Schema _logicalTableSchema;
+ private final TableConfig _refOfflineTableConfig;
+ private final TableConfig _refRealtimeTableConfig;
+
+ public LogicalTableContext(LogicalTableConfig logicalTableConfig, Schema
logicalTableSchema,
+ TableConfig refOfflineTableConfig, TableConfig refRealtimeTableConfig) {
+ _logicalTableConfig = logicalTableConfig;
+ _logicalTableSchema = logicalTableSchema;
+ _refOfflineTableConfig = refOfflineTableConfig;
+ _refRealtimeTableConfig = refRealtimeTableConfig;
+ }
+
+ public LogicalTableConfig getLogicalTableConfig() {
+ return _logicalTableConfig;
+ }
+
+ public Schema getLogicalTableSchema() {
+ return _logicalTableSchema;
+ }
+
+ public TableConfig getRefOfflineTableConfig() {
+ return _refOfflineTableConfig;
+ }
+
+ public TableConfig getRefRealtimeTableConfig() {
+ return _refRealtimeTableConfig;
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
index 7fb4802cc1..0dcbb3a9f3 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
@@ -34,6 +34,7 @@ import
org.apache.pinot.controller.helix.ControllerRequestClient;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet;
import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import org.apache.pinot.integration.tests.QueryAssert;
import org.apache.pinot.integration.tests.QueryGenerator;
import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -48,6 +49,7 @@ import
org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
+import org.intellij.lang.annotations.Language;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -67,6 +69,7 @@ public abstract class BaseLogicalTableIntegrationTest extends
BaseClusterIntegra
private static final String DEFAULT_TENANT = "DefaultTenant";
private static final String DEFAULT_LOGICAL_TABLE_NAME = "mytable";
protected static final String DEFAULT_TABLE_NAME = "physicalTable";
+ protected static final String EMPTY_OFFLINE_TABLE_NAME = "empty_o";
protected static BaseLogicalTableIntegrationTest _sharedClusterTestSuite =
null;
protected List<File> _avroFiles;
@@ -111,6 +114,7 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
_controllerRequestURLBuilder =
_sharedClusterTestSuite._controllerRequestURLBuilder;
_helixResourceManager = _sharedClusterTestSuite._helixResourceManager;
_kafkaStarters = _sharedClusterTestSuite._kafkaStarters;
+ _controllerBaseApiUrl = _sharedClusterTestSuite._controllerBaseApiUrl;
}
_avroFiles = getAllAvroFiles();
@@ -164,6 +168,7 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);
+ createLogicalTableWithEmptyOfflineTable();
}
@AfterClass
@@ -250,6 +255,13 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
return DEFAULT_TENANT;
}
+ // Setup H2 table with the same name as the logical table.
+ protected void setUpH2Connection(List<File> avroFiles)
+ throws Exception {
+ setUpH2Connection();
+ ClusterIntegrationTestUtils.setUpH2TableWithAvro(avroFiles,
getLogicalTableName(), _h2Connection);
+ }
+
/**
* Creates a new OFFLINE table config.
*/
@@ -324,12 +336,35 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
return LogicalTableConfig.fromString(resp);
}
- protected void deleteLogicalTable()
+ private void createLogicalTableWithEmptyOfflineTable()
throws IOException {
- String deleteLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableDelete(getLogicalTableName());
- // delete logical table
- String deleteResponse =
ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders());
- assertEquals(deleteResponse, "{\"status\":\"" + getLogicalTableName() + "
logical table successfully deleted.\"}");
+ Schema schema = createSchema(getSchemaFileName());
+
schema.setSchemaName(TableNameBuilder.extractRawTableName(EMPTY_OFFLINE_TABLE_NAME));
+ addSchema(schema);
+
+ Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
+ TableConfig offlineTableConfig =
createOfflineTableConfig(EMPTY_OFFLINE_TABLE_NAME);
+ addTableConfig(offlineTableConfig);
+
physicalTableConfigMap.put(TableNameBuilder.OFFLINE.tableNameWithType(EMPTY_OFFLINE_TABLE_NAME),
+ new PhysicalTableConfig());
+ String refOfflineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(EMPTY_OFFLINE_TABLE_NAME);
+
+ String logicalTableName = EMPTY_OFFLINE_TABLE_NAME + "_logical";
+
+ String addLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableCreate();
+ Schema logicalTableSchema = createSchema(getSchemaFileName());
+ logicalTableSchema.setSchemaName(logicalTableName);
+ addSchema(logicalTableSchema);
+ LogicalTableConfigBuilder builder =
+ new LogicalTableConfigBuilder().setTableName(logicalTableName)
+ .setBrokerTenant(DEFAULT_TENANT)
+ .setRefOfflineTableName(refOfflineTableName)
+ .setPhysicalTableConfigMap(physicalTableConfigMap);
+
+ String resp =
+ ControllerTest.sendPostRequest(addLogicalTableUrl,
builder.build().toSingleLineJsonString(), getHeaders());
+ assertEquals(resp, "{\"unrecognizedProperties\":{},\"status\":\"" +
logicalTableName
+ + " logical table successfully added.\"}");
}
@Override
@@ -414,22 +449,24 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
assertEquals(new HashSet<>(getPhysicalTableNames()),
logicalTableConfig.getPhysicalTableConfigMap().keySet());
}
- @Test
- public void testHardcodedQueries()
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testHardcodedQueries(boolean useMultiStageQueryEngine)
throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
super.testHardcodedQueries();
}
- @Test
public void testQueriesFromQueryFile()
throws Exception {
+ setUseMultiStageQueryEngine(false);
super.testQueriesFromQueryFile();
}
- @Test
- public void testGeneratedQueries()
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testGeneratedQueries(boolean useMultiStageQueryEngine)
throws Exception {
- super.testGeneratedQueries(true, false);
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ super.testGeneratedQueries(true, useMultiStageQueryEngine);
}
@Test
@@ -558,4 +595,60 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
exceptions = response.get("exceptions");
assertTrue(exceptions.isEmpty(), "Query should not throw exception");
}
+
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testLogicalTableWithEmptyOfflineTable(boolean
useMultiStageQueryEngine)
+ throws Exception {
+
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+ String logicalTableName = EMPTY_OFFLINE_TABLE_NAME + "_logical";
+ // Query should return empty result
+ JsonNode queryResponse = postQuery("SELECT count(*) FROM " +
logicalTableName);
+ assertEquals(queryResponse.get("numDocsScanned").asInt(), 0);
+ assertEquals(queryResponse.get("numServersQueried").asInt(),
useMultiStageQueryEngine ? 1 : 0);
+ assertTrue(queryResponse.get("exceptions").isEmpty());
+ }
+
+ @Test(dataProvider = "useBothQueryEngines")
+ void testControllerQuerySubmit(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ @Language("sql")
+ String query = "SELECT count(*) FROM " + getLogicalTableName();
+ JsonNode response = postQueryToController(query);
+ assertNoError(response);
+
+ query = "SELECT count(*) FROM " + getOfflineTableNames().get(0);
+ response = postQueryToController(query);
+ assertNoError(response);
+
+ query = "SELECT count(*) FROM unknown";
+ response = postQueryToController(query);
+
QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST)
+ .containsMessage("TableDoesNotExistError");
+ }
+
+ @Test
+ void testControllerJoinQuerySubmit()
+ throws Exception {
+ setUseMultiStageQueryEngine(true);
+ @Language("sql")
+ String query = "SELECT count(*) FROM " + getLogicalTableName() + " JOIN "
+ getPhysicalTableNames().get(0)
+ + " ON " + getLogicalTableName() + ".FlightNum = " +
getPhysicalTableNames().get(0) + ".FlightNum";
+ JsonNode response = postQueryToController(query);
+ assertNoError(response);
+
+ query = "SELECT count(*) FROM unknown JOIN " +
getPhysicalTableNames().get(0)
+ + " ON unknown.FlightNum = " + getPhysicalTableNames().get(0) +
".FlightNum";
+ response = postQueryToController(query);
+
QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST)
+ .containsMessage("TableDoesNotExistError");
+
+ query = "SELECT count(*) FROM " + getLogicalTableName() + " JOIN known ON
"
+ + getLogicalTableName() + ".FlightNum = unknown.FlightNum";
+ response = postQueryToController(query);
+
QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST)
+ .containsMessage("TableDoesNotExistError");
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
index da40d6c48c..c4ba328cee 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.catalog;
import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.rel.type.RelProtoDataType;
@@ -68,10 +69,14 @@ public class PinotCatalog implements Schema {
String rawTableName = TableNameBuilder.extractRawTableName(name);
String physicalTableName = DatabaseUtils.translateTableName(rawTableName,
_databaseName);
String tableName = _tableCache.getActualTableName(physicalTableName);
+
if (tableName == null) {
- return null;
+ tableName = _tableCache.getActualLogicalTableName(physicalTableName);
}
+ if (tableName == null) {
+ return null;
+ }
org.apache.pinot.spi.data.Schema schema = _tableCache.getSchema(tableName);
if (schema == null) {
return null;
@@ -86,7 +91,9 @@ public class PinotCatalog implements Schema {
*/
@Override
public Set<String> getTableNames() {
- return _tableCache.getTableNameMap().keySet().stream().filter(n ->
DatabaseUtils.isPartOfDatabase(n, _databaseName))
+ return Stream.concat(_tableCache.getTableNameMap().keySet().stream(),
+ _tableCache.getLogicalTableNameMap().keySet().stream())
+ .filter(n -> DatabaseUtils.isPartOfDatabase(n, _databaseName))
.collect(Collectors.toSet());
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
index 5aaf277858..fbace37191 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
@@ -103,8 +103,12 @@ public class DispatchablePlanContext {
dispatchablePlanMetadata.getWorkerIdToServerInstanceMap();
Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap =
dispatchablePlanMetadata.getWorkerIdToSegmentsMap();
+ Map<Integer, Map<String, List<String>>> workerIdToTableNameSegmentsMap =
+ dispatchablePlanMetadata.getWorkerIdToTableSegmentsMap();
Map<Integer, Map<Integer, MailboxInfos>> workerIdToMailboxesMap =
dispatchablePlanMetadata.getWorkerIdToMailboxesMap();
+ Preconditions.checkArgument(workerIdToSegmentsMap == null ||
workerIdToTableNameSegmentsMap == null,
+ "Both workerIdToSegmentsMap and workerIdToTableNameSegmentsMap
cannot be set at the same time");
Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdsMap =
new HashMap<>();
WorkerMetadata[] workerMetadataArray = new
WorkerMetadata[workerIdToServerInstanceMap.size()];
for (Map.Entry<Integer, QueryServerInstance> serverEntry :
workerIdToServerInstanceMap.entrySet()) {
@@ -115,6 +119,9 @@ public class DispatchablePlanContext {
if (workerIdToSegmentsMap != null) {
workerMetadata.setTableSegmentsMap(workerIdToSegmentsMap.get(workerId));
}
+ if (workerIdToTableNameSegmentsMap != null) {
+
workerMetadata.setLogicalTableSegmentsMap(workerIdToTableNameSegmentsMap.get(workerId));
+ }
workerMetadataArray[workerId] = workerMetadata;
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
index 0c27906ee6..ec88a8e4f4 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.routing.MailboxInfos;
import org.apache.pinot.query.routing.QueryServerInstance;
+import org.apache.pinot.query.routing.table.LogicalTableRouteInfo;
/**
@@ -80,6 +81,12 @@ public class DispatchablePlanMetadata implements
Serializable {
// Map from workerId -> {planFragmentId -> mailboxes}
private final Map<Integer, Map<Integer, MailboxInfos>>
_workerIdToMailboxesMap = new HashMap<>();
+ /**
+ * Map from workerId -> {physicalTableName -> segments} is required for
logical tables.
+ */
+ private Map<Integer, Map<String, List<String>>> _workerIdToTableSegmentsMap;
+ private LogicalTableRouteInfo _logicalTableRouteInfo;
+
public List<String> getScannedTables() {
return _scannedTables;
}
@@ -178,4 +185,23 @@ public class DispatchablePlanMetadata implements
Serializable {
public void addUnavailableSegments(String tableName, Collection<String>
unavailableSegments) {
_tableToUnavailableSegmentsMap.computeIfAbsent(tableName, k -> new
HashSet<>()).addAll(unavailableSegments);
}
+
+ @Nullable
+ public LogicalTableRouteInfo getLogicalTableRouteInfo() {
+ return _logicalTableRouteInfo;
+ }
+
+ public void setLogicalTableRouteInfo(LogicalTableRouteInfo
logicalTableRouteInfo) {
+ _logicalTableRouteInfo = logicalTableRouteInfo;
+ }
+
+ @Nullable
+ public Map<Integer, Map<String, List<String>>>
getWorkerIdToTableSegmentsMap() {
+ return _workerIdToTableSegmentsMap;
+ }
+
+ public void setWorkerIdToTableSegmentsMap(
+ Map<Integer, Map<String, List<String>>> workerIdToTableSegmentsMap) {
+ _workerIdToTableSegmentsMap = workerIdToTableSegmentsMap;
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
index c79a6bec68..848bea8fbc 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.planner.physical;
+import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Set;
@@ -38,6 +39,8 @@ import org.apache.pinot.query.planner.plannode.SortNode;
import org.apache.pinot.query.planner.plannode.TableScanNode;
import org.apache.pinot.query.planner.plannode.ValueNode;
import org.apache.pinot.query.planner.plannode.WindowNode;
+import org.apache.pinot.query.routing.table.LogicalTableRouteInfo;
+import org.apache.pinot.query.routing.table.LogicalTableRouteProvider;
public class DispatchablePlanVisitor implements PlanNodeVisitor<Void,
DispatchablePlanContext> {
@@ -137,7 +140,19 @@ public class DispatchablePlanVisitor implements
PlanNodeVisitor<Void, Dispatchab
@Override
public Void visitTableScan(TableScanNode node, DispatchablePlanContext
context) {
DispatchablePlanMetadata dispatchablePlanMetadata =
getOrCreateDispatchablePlanMetadata(node, context);
-
dispatchablePlanMetadata.addScannedTable(_tableCache.getActualTableName(node.getTableName()));
+
+ String tableNameInNode = node.getTableName();
+ String tableName = _tableCache.getActualTableName(tableNameInNode);
+ if (tableName == null) {
+ tableName = _tableCache.getActualLogicalTableName(tableNameInNode);
+ Preconditions.checkNotNull(tableName, "Logical table config not found in
table cache: " + tableNameInNode);
+ LogicalTableRouteProvider tableRouteProvider = new
LogicalTableRouteProvider();
+ LogicalTableRouteInfo logicalTableRouteInfo = new
LogicalTableRouteInfo();
+ tableRouteProvider.fillTableConfigMetadata(logicalTableRouteInfo,
tableName, _tableCache);
+ dispatchablePlanMetadata.setLogicalTableRouteInfo(logicalTableRouteInfo);
+ }
+
+ dispatchablePlanMetadata.addScannedTable(tableName);
dispatchablePlanMetadata.setTableOptions(
node.getNodeHint().getHintOptions().get(PinotHintOptions.TABLE_HINT_OPTIONS));
return null;
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index b474b06b4e..4a28f11632 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -35,18 +35,22 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.calcite.rel.rules.ImmutableTableOptions;
import org.apache.pinot.calcite.rel.rules.TableOptions;
+import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.routing.RoutingTable;
import org.apache.pinot.core.routing.ServerRouteInfo;
import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.core.transport.TableRouteInfo;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.physical.DispatchablePlanContext;
import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
import org.apache.pinot.query.planner.plannode.MailboxSendNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.routing.table.LogicalTableRouteInfo;
+import org.apache.pinot.query.routing.table.LogicalTableRouteProvider;
import org.apache.pinot.spi.config.table.TableType;
import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -373,7 +377,11 @@ public class WorkerManager {
DispatchablePlanMetadata metadata =
context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
Map<String, String> tableOptions = metadata.getTableOptions();
if (tableOptions == null) {
- assignWorkersToNonPartitionedLeafFragment(metadata, context);
+ if (metadata.getLogicalTableRouteInfo() != null) {
+ assignWorkersToNonPartitionedLeafFragmentForLogicalTable(metadata,
context);
+ } else {
+ assignWorkersToNonPartitionedLeafFragment(metadata, context);
+ }
return;
}
@@ -392,7 +400,11 @@ public class WorkerManager {
if
(Boolean.parseBoolean(tableOptions.get(PinotHintOptions.TableHintOptions.IS_REPLICATED)))
{
setSegmentsForReplicatedLeafFragment(metadata);
} else {
- assignWorkersToNonPartitionedLeafFragment(metadata, context);
+ if (metadata.getLogicalTableRouteInfo() != null) {
+ assignWorkersToNonPartitionedLeafFragmentForLogicalTable(metadata,
context);
+ } else {
+ assignWorkersToNonPartitionedLeafFragment(metadata, context);
+ }
}
}
@@ -539,6 +551,86 @@ public class WorkerManager {
CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM \"" +
tableNameWithType + "\""));
}
+ private void
assignWorkersToNonPartitionedLeafFragmentForLogicalTable(DispatchablePlanMetadata
metadata,
+ DispatchablePlanContext context) {
+ LogicalTableRouteInfo logicalTableRouteInfo =
metadata.getLogicalTableRouteInfo();
+ Preconditions.checkNotNull(logicalTableRouteInfo);
+ LogicalTableRouteProvider tableRouteProvider = new
LogicalTableRouteProvider();
+ tableRouteProvider.fillRouteMetadata(logicalTableRouteInfo,
_routingManager);
+ if (logicalTableRouteInfo.getTimeBoundaryInfo() != null) {
+
metadata.setTimeBoundaryInfo(logicalTableRouteInfo.getTimeBoundaryInfo());
+ }
+ BrokerRequest offlineBrokerRequest = null;
+ BrokerRequest realtimeBrokerRequest = null;
+
+ if (logicalTableRouteInfo.hasOffline()) {
+ offlineBrokerRequest = CalciteSqlCompiler.compileToBrokerRequest(
+ "SELECT * FROM \"" + logicalTableRouteInfo.getOfflineTableName() +
"\"");
+ }
+
+ if (logicalTableRouteInfo.hasRealtime()) {
+ realtimeBrokerRequest = CalciteSqlCompiler.compileToBrokerRequest(
+ "SELECT * FROM \"" + logicalTableRouteInfo.getRealtimeTableName() +
"\"");
+ }
+
+ tableRouteProvider.calculateRoutes(logicalTableRouteInfo, _routingManager,
offlineBrokerRequest,
+ realtimeBrokerRequest, context.getRequestId());
+
+ assignTableSegmentsToWorkers(logicalTableRouteInfo, metadata);
+ }
+
+ private static void assignTableSegmentsToWorkers(LogicalTableRouteInfo
logicalTableRouteInfo,
+ DispatchablePlanMetadata metadata) {
+ Map<ServerInstance, Map<String, List<String>>>
serverInstanceToLogicalSegmentsMap =
+ new HashMap<>();
+
+ if (logicalTableRouteInfo.getOfflineTables() != null) {
+ for (TableRouteInfo physicalTableRoute :
logicalTableRouteInfo.getOfflineTables()) {
+ // Routing table maybe null if no routing table is found OR there are
no segments.
+ if (physicalTableRoute.getOfflineRoutingTable() != null) {
+
transferToServerInstanceLogicalSegmentsMap(physicalTableRoute.getOfflineTableName(),
+ physicalTableRoute.getOfflineRoutingTable(),
serverInstanceToLogicalSegmentsMap);
+ }
+ }
+ }
+
+ if (logicalTableRouteInfo.getRealtimeTables() != null) {
+ for (TableRouteInfo physicalTableRoute :
logicalTableRouteInfo.getRealtimeTables()) {
+ // Routing table maybe null if no routing table is found OR there are
no segments.
+ if (physicalTableRoute.getRealtimeRoutingTable() != null) {
+
transferToServerInstanceLogicalSegmentsMap(physicalTableRoute.getRealtimeTableName(),
+ physicalTableRoute.getRealtimeRoutingTable(),
serverInstanceToLogicalSegmentsMap);
+ }
+ }
+ }
+
+ int workerId = 0;
+ Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new
HashMap<>();
+ Map<Integer, Map<String, List<String>>> workerIdToLogicalTableSegmentsMap
= new HashMap<>();
+ for (Map.Entry<ServerInstance, Map<String, List<String>>> entry
+ : serverInstanceToLogicalSegmentsMap.entrySet()) {
+ workerIdToServerInstanceMap.put(workerId, new
QueryServerInstance(entry.getKey()));
+ workerIdToLogicalTableSegmentsMap.put(workerId, entry.getValue());
+ workerId++;
+ }
+
+ metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap);
+ metadata.setWorkerIdToTableSegmentsMap(workerIdToLogicalTableSegmentsMap);
+ }
+
+ private static void transferToServerInstanceLogicalSegmentsMap(String
physicalTableName,
+ Map<ServerInstance, ServerRouteInfo> segmentsMap,
+ Map<ServerInstance, Map<String, List<String>>>
serverInstanceToLogicalSegmentsMap) {
+ for (Map.Entry<ServerInstance, ServerRouteInfo> serverEntry :
segmentsMap.entrySet()) {
+ Map<String, List<String>> tableNameToSegmentsMap =
+
serverInstanceToLogicalSegmentsMap.computeIfAbsent(serverEntry.getKey(), k ->
new HashMap<>());
+ // TODO: support optional segments for multi-stage engine.
+ Preconditions.checkState(
+ tableNameToSegmentsMap.put(physicalTableName,
serverEntry.getValue().getSegments()) == null,
+ "Entry for server {} and physical table: {} already exist!",
serverEntry.getKey(), physicalTableName);
+ }
+ }
+
// --------------------------------------------------------------------------
// Partitioned leaf stage assignment
// --------------------------------------------------------------------------
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
index ab1226862e..b00a31294e 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
@@ -43,6 +43,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
*/
public class WorkerMetadata {
public static final String TABLE_SEGMENTS_MAP_KEY = "tableSegmentsMap";
+ public static final String LOGICAL_TABLE_SEGMENTS_MAP_KEY =
"logicalTableSegmentsMap";
private final int _workerId;
private final Map<Integer, MailboxInfos> _mailboxInfosMap;
@@ -75,13 +76,17 @@ public class WorkerMetadata {
@Nullable
public Map<String, List<String>> getTableSegmentsMap() {
- String tableSegmentsMapStr = _customProperties.get(TABLE_SEGMENTS_MAP_KEY);
+ return deserializeStringSegmentListMap(TABLE_SEGMENTS_MAP_KEY);
+ }
+
+ private Map<String, List<String>> deserializeStringSegmentListMap(String
propertyKey) {
+ String tableSegmentsMapStr = _customProperties.get(propertyKey);
if (tableSegmentsMapStr != null) {
try {
return JsonUtils.stringToObject(tableSegmentsMapStr, new
TypeReference<Map<String, List<String>>>() {
});
} catch (IOException e) {
- throw new RuntimeException("Unable to deserialize table segments map:
" + tableSegmentsMapStr, e);
+ throw new RuntimeException("Unable to deserialize " + propertyKey + "
: " + tableSegmentsMapStr, e);
}
} else {
return null;
@@ -89,7 +94,8 @@ public class WorkerMetadata {
}
public boolean isLeafStageWorker() {
- return _customProperties.containsKey(TABLE_SEGMENTS_MAP_KEY);
+ return _customProperties.containsKey(TABLE_SEGMENTS_MAP_KEY)
+ || _customProperties.containsKey(LOGICAL_TABLE_SEGMENTS_MAP_KEY);
}
public void setTableSegmentsMap(Map<String, List<String>> tableSegmentsMap) {
@@ -101,4 +107,19 @@ public class WorkerMetadata {
}
_customProperties.put(TABLE_SEGMENTS_MAP_KEY, tableSegmentsMapStr);
}
+
+ @Nullable
+ public Map<String, List<String>> getLogicalTableSegmentsMap() {
+ return deserializeStringSegmentListMap(LOGICAL_TABLE_SEGMENTS_MAP_KEY);
+ }
+
+ public void setLogicalTableSegmentsMap(Map<String, List<String>>
logicalTableSegmentsMap) {
+ String logicalTableSegmentsMapStr;
+ try {
+ logicalTableSegmentsMapStr =
JsonUtils.objectToString(logicalTableSegmentsMap);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Unable to serialize table segments map: " +
logicalTableSegmentsMap, e);
+ }
+ _customProperties.put(LOGICAL_TABLE_SEGMENTS_MAP_KEY,
logicalTableSegmentsMapStr);
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProvider.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProvider.java
index 63543c41ce..48553c328d 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProvider.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProvider.java
@@ -46,10 +46,18 @@ public class ImplicitHybridTableRouteProvider implements
TableRouteProvider {
private static final Logger LOGGER =
LoggerFactory.getLogger(ImplicitHybridTableRouteProvider.class);
@Override
- public ImplicitHybridTableRouteInfo getTableRouteInfo(String tableName,
TableCache tableCache,
+ public TableRouteInfo getTableRouteInfo(String tableName, TableCache
tableCache,
RoutingManager routingManager) {
ImplicitHybridTableRouteInfo tableRouteInfo = new
ImplicitHybridTableRouteInfo();
+ fillTableConfigMetadata(tableRouteInfo, tableName, tableCache);
+ fillRouteMetadata(tableRouteInfo, routingManager);
+
+ return tableRouteInfo;
+ }
+
+ public void fillTableConfigMetadata(ImplicitHybridTableRouteInfo
tableRouteInfo,
+ String tableName, TableCache tableCache) {
TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
if (tableType == TableType.OFFLINE) {
@@ -71,19 +79,24 @@ public class ImplicitHybridTableRouteProvider implements
TableRouteProvider {
if (realtimeTableName != null) {
tableRouteInfo.setRealtimeTableConfig(tableCache.getTableConfig(realtimeTableName));
}
+ }
+ public void fillRouteMetadata(ImplicitHybridTableRouteInfo tableRouteInfo,
RoutingManager routingManager) {
if (tableRouteInfo.hasOffline()) {
+ String offlineTableName = tableRouteInfo.getOfflineTableName();
tableRouteInfo.setOfflineRouteExists(routingManager.routingExists(offlineTableName));
tableRouteInfo.setOfflineTableDisabled(routingManager.isTableDisabled(offlineTableName));
}
if (tableRouteInfo.hasRealtime()) {
+ String realtimeTableName = tableRouteInfo.getRealtimeTableName();
tableRouteInfo.setRealtimeRouteExists(routingManager.routingExists(realtimeTableName));
tableRouteInfo.setRealtimeTableDisabled(routingManager.isTableDisabled(realtimeTableName));
}
// Get TimeBoundaryInfo. If there is no time boundary, then do not
consider the offline table.
if (tableRouteInfo.isHybrid()) {
+ String offlineTableName = tableRouteInfo.getOfflineTableName();
// Time boundary info might be null when there is no segment in the
offline table, query real-time side only
TimeBoundaryInfo timeBoundaryInfo =
routingManager.getTimeBoundaryInfo(offlineTableName);
if (timeBoundaryInfo == null) {
@@ -95,8 +108,6 @@ public class ImplicitHybridTableRouteProvider implements
TableRouteProvider {
tableRouteInfo.setTimeBoundaryInfo(timeBoundaryInfo);
}
}
-
- return tableRouteInfo;
}
@Override
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteInfo.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteInfo.java
index 04d3b76bed..612f8e79cd 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteInfo.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteInfo.java
@@ -32,22 +32,23 @@ import org.apache.pinot.common.request.TableSegmentsInfo;
import org.apache.pinot.core.routing.ServerRouteInfo;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.BaseTableRouteInfo;
+import org.apache.pinot.core.transport.ImplicitHybridTableRouteInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.core.transport.TableRouteInfo;
+import org.apache.pinot.query.timeboundary.TimeBoundaryStrategy;
import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
public class LogicalTableRouteInfo extends BaseTableRouteInfo {
- private final LogicalTableConfig _logicalTable;
- private List<TableRouteInfo> _offlineTables;
- private List<TableRouteInfo> _realtimeTables;
+ private String _logicalTableName;
+ private List<ImplicitHybridTableRouteInfo> _offlineTables;
+ private List<ImplicitHybridTableRouteInfo> _realtimeTables;
private TableConfig _offlineTableConfig;
private TableConfig _realtimeTableConfig;
private QueryConfig _queryConfig;
@@ -56,15 +57,9 @@ public class LogicalTableRouteInfo extends
BaseTableRouteInfo {
private BrokerRequest _offlineBrokerRequest;
private BrokerRequest _realtimeBrokerRequest;
- private TimeBoundaryInfo _timeBoundaryInfo;
-
- LogicalTableRouteInfo() {
- _logicalTable = null;
- }
- public LogicalTableRouteInfo(LogicalTableConfig logicalTable) {
- _logicalTable = logicalTable;
- }
+ private TimeBoundaryStrategy _timeBoundaryStrategy;
+ private TimeBoundaryInfo _timeBoundaryInfo;
@Override
public Map<ServerRoutingInstance, InstanceRequest> getRequestMap(long
requestId, String brokerId, boolean preferTls) {
@@ -139,6 +134,15 @@ public class LogicalTableRouteInfo extends
BaseTableRouteInfo {
return instanceRequest;
}
+ public void setLogicalTableName(String logicalTableName) {
+ _logicalTableName = logicalTableName;
+ }
+
+ @Nullable
+ public String getLogicalTableName() {
+ return _logicalTableName;
+ }
+
@Nullable
@Override
public TableConfig getOfflineTableConfig() {
@@ -234,15 +238,15 @@ public class LogicalTableRouteInfo extends
BaseTableRouteInfo {
@Nullable
@Override
public String getOfflineTableName() {
- return hasOffline() && _logicalTable != null ?
TableNameBuilder.OFFLINE.tableNameWithType(
- _logicalTable.getTableName()) : null;
+ return hasOffline() && _logicalTableName != null ?
TableNameBuilder.OFFLINE.tableNameWithType(_logicalTableName)
+ : null;
}
@Nullable
@Override
public String getRealtimeTableName() {
- return hasRealtime() && _logicalTable != null ?
TableNameBuilder.REALTIME.tableNameWithType(
- _logicalTable.getTableName()) : null;
+ return hasRealtime() && _logicalTableName != null ?
TableNameBuilder.REALTIME.tableNameWithType(_logicalTableName)
+ : null;
}
@Nullable
@@ -356,20 +360,20 @@ public class LogicalTableRouteInfo extends
BaseTableRouteInfo {
}
@Nullable
- public List<TableRouteInfo> getOfflineTables() {
+ public List<ImplicitHybridTableRouteInfo> getOfflineTables() {
return _offlineTables;
}
- public void setOfflineTables(List<TableRouteInfo> offlineTables) {
+ public void setOfflineTables(List<ImplicitHybridTableRouteInfo>
offlineTables) {
_offlineTables = offlineTables;
}
@Nullable
- public List<TableRouteInfo> getRealtimeTables() {
+ public List<ImplicitHybridTableRouteInfo> getRealtimeTables() {
return _realtimeTables;
}
- public void setRealtimeTables(List<TableRouteInfo> realtimeTables) {
+ public void setRealtimeTables(List<ImplicitHybridTableRouteInfo>
realtimeTables) {
_realtimeTables = realtimeTables;
}
@@ -388,4 +392,13 @@ public class LogicalTableRouteInfo extends
BaseTableRouteInfo {
public void setRealtimeBrokerRequest(BrokerRequest realtimeBrokerRequest) {
_realtimeBrokerRequest = realtimeBrokerRequest;
}
+
+ @Nullable
+ public TimeBoundaryStrategy getTimeBoundaryStrategy() {
+ return _timeBoundaryStrategy;
+ }
+
+ public void setTimeBoundaryStrategy(TimeBoundaryStrategy
timeBoundaryStrategy) {
+ _timeBoundaryStrategy = timeBoundaryStrategy;
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java
index fe3937b8b0..bdce62a938 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java
@@ -25,6 +25,7 @@ import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.core.transport.ImplicitHybridTableRouteInfo;
import org.apache.pinot.core.transport.TableRouteInfo;
import org.apache.pinot.query.timeboundary.TimeBoundaryStrategy;
import org.apache.pinot.query.timeboundary.TimeBoundaryStrategyService;
@@ -41,20 +42,29 @@ public class LogicalTableRouteProvider implements
TableRouteProvider {
@Override
public TableRouteInfo getTableRouteInfo(String tableName, TableCache
tableCache, RoutingManager routingManager) {
- LogicalTableConfig logicalTable =
tableCache.getLogicalTableConfig(tableName);
- if (logicalTable == null) {
- return new LogicalTableRouteInfo();
- }
+ LogicalTableRouteInfo logicalTableRouteInfo = new LogicalTableRouteInfo();
+ fillTableConfigMetadata(logicalTableRouteInfo, tableName, tableCache);
+ fillRouteMetadata(logicalTableRouteInfo, routingManager);
+ return logicalTableRouteInfo;
+ }
+ public void fillTableConfigMetadata(LogicalTableRouteInfo
logicalTableRouteInfo, String tableName,
+ TableCache tableCache) {
+ LogicalTableConfig logicalTableConfig =
tableCache.getLogicalTableConfig(tableName);
+ if (logicalTableConfig == null) {
+ return;
+ }
+ logicalTableRouteInfo.setLogicalTableName(tableName);
PhysicalTableRouteProvider routeProvider = new
PhysicalTableRouteProvider();
- List<TableRouteInfo> offlineTables = new ArrayList<>();
- List<TableRouteInfo> realtimeTables = new ArrayList<>();
- for (String physicalTableName :
logicalTable.getPhysicalTableConfigMap().keySet()) {
+ List<ImplicitHybridTableRouteInfo> offlineTables = new ArrayList<>();
+ List<ImplicitHybridTableRouteInfo> realtimeTables = new ArrayList<>();
+ for (String physicalTableName :
logicalTableConfig.getPhysicalTableConfigMap().keySet()) {
TableType tableType =
TableNameBuilder.getTableTypeFromTableName(physicalTableName);
Preconditions.checkNotNull(tableType);
- TableRouteInfo physicalTableInfo =
- routeProvider.getTableRouteInfo(physicalTableName, tableCache,
routingManager);
+ ImplicitHybridTableRouteInfo physicalTableInfo = new
ImplicitHybridTableRouteInfo();
+ routeProvider.fillTableConfigMetadata(physicalTableInfo,
physicalTableName, tableCache);
+
if (physicalTableInfo.isExists()) {
if (tableType == TableType.OFFLINE) {
offlineTables.add(physicalTableInfo);
@@ -64,37 +74,59 @@ public class LogicalTableRouteProvider implements
TableRouteProvider {
}
}
- LogicalTableRouteInfo routeInfo = new LogicalTableRouteInfo(logicalTable);
if (!offlineTables.isEmpty()) {
- TableConfig offlineTableConfig =
tableCache.getTableConfig(logicalTable.getRefOfflineTableName());
+ TableConfig offlineTableConfig =
tableCache.getTableConfig(logicalTableConfig.getRefOfflineTableName());
Preconditions.checkNotNull(offlineTableConfig,
- "Offline table config not found: " +
logicalTable.getRefOfflineTableName());
- routeInfo.setOfflineTables(offlineTables);
- routeInfo.setOfflineTableConfig(offlineTableConfig);
+ "Offline table config not found: " +
logicalTableConfig.getRefOfflineTableName());
+ logicalTableRouteInfo.setOfflineTables(offlineTables);
+ logicalTableRouteInfo.setOfflineTableConfig(offlineTableConfig);
}
+
if (!realtimeTables.isEmpty()) {
- TableConfig realtimeTableConfig =
tableCache.getTableConfig(logicalTable.getRefRealtimeTableName());
+ TableConfig realtimeTableConfig =
tableCache.getTableConfig(logicalTableConfig.getRefRealtimeTableName());
Preconditions.checkNotNull(realtimeTableConfig,
- "Realtime table config not found: " +
logicalTable.getRefRealtimeTableName());
- routeInfo.setRealtimeTables(realtimeTables);
- routeInfo.setRealtimeTableConfig(realtimeTableConfig);
+ "Realtime table config not found: " +
logicalTableConfig.getRefRealtimeTableName());
+ logicalTableRouteInfo.setRealtimeTables(realtimeTables);
+ logicalTableRouteInfo.setRealtimeTableConfig(realtimeTableConfig);
}
- routeInfo.setQueryConfig(logicalTable.getQueryConfig());
- TimeBoundaryInfo timeBoundaryInfo;
if (!offlineTables.isEmpty() && !realtimeTables.isEmpty()) {
- String boundaryStrategy =
logicalTable.getTimeBoundaryConfig().getBoundaryStrategy();
+ String boundaryStrategy =
logicalTableConfig.getTimeBoundaryConfig().getBoundaryStrategy();
TimeBoundaryStrategy timeBoundaryStrategy =
TimeBoundaryStrategyService.getInstance().getTimeBoundaryStrategy(boundaryStrategy);
- timeBoundaryInfo =
timeBoundaryStrategy.computeTimeBoundary(logicalTable, tableCache,
routingManager);
- if (timeBoundaryInfo == null) {
- LOGGER.info("No time boundary info found for logical hybrid table: ");
- routeInfo.setOfflineTables(null);
- } else {
- routeInfo.setTimeBoundaryInfo(timeBoundaryInfo);
+ timeBoundaryStrategy.init(logicalTableConfig, tableCache);
+ logicalTableRouteInfo.setTimeBoundaryStrategy(timeBoundaryStrategy);
+ }
+
+ logicalTableRouteInfo.setQueryConfig(logicalTableConfig.getQueryConfig());
+ }
+
+ public void fillRouteMetadata(LogicalTableRouteInfo logicalTableRouteInfo,
RoutingManager routingManager) {
+ ImplicitHybridTableRouteProvider tableRouteProvider = new
ImplicitHybridTableRouteProvider();
+ if (logicalTableRouteInfo.getOfflineTables() != null) {
+ for (ImplicitHybridTableRouteInfo routeInfo :
logicalTableRouteInfo.getOfflineTables()) {
+ tableRouteProvider.fillRouteMetadata(routeInfo, routingManager);
+ }
+ }
+
+ if (logicalTableRouteInfo.getRealtimeTables() != null) {
+ for (ImplicitHybridTableRouteInfo routeInfo :
logicalTableRouteInfo.getRealtimeTables()) {
+ tableRouteProvider.fillRouteMetadata(routeInfo, routingManager);
+ }
+ }
+
+ if (logicalTableRouteInfo.isHybrid()) {
+ TimeBoundaryStrategy timeBoundaryStrategy =
logicalTableRouteInfo.getTimeBoundaryStrategy();
+ if (timeBoundaryStrategy != null) {
+ TimeBoundaryInfo timeBoundaryInfo =
timeBoundaryStrategy.computeTimeBoundary(routingManager);
+ if (timeBoundaryInfo == null) {
+ LOGGER.info("No time boundary info found for logical hybrid table:
");
+ logicalTableRouteInfo.setOfflineTables(null);
+ } else {
+ logicalTableRouteInfo.setTimeBoundaryInfo(timeBoundaryInfo);
+ }
}
}
- return routeInfo;
}
@Override
@@ -107,8 +139,7 @@ public class LogicalTableRouteProvider implements
TableRouteProvider {
if (routeInfo.getOfflineTables() != null) {
for (TableRouteInfo physicalTableInfo : routeInfo.getOfflineTables()) {
- routeProvider.calculateRoutes(physicalTableInfo, routingManager,
offlineBrokerRequest, null,
- requestId);
+ routeProvider.calculateRoutes(physicalTableInfo, routingManager,
offlineBrokerRequest, null, requestId);
numPrunedSegments += physicalTableInfo.getNumPrunedSegmentsTotal();
if (physicalTableInfo.getUnavailableSegments() != null) {
unavailableSegments.addAll(physicalTableInfo.getUnavailableSegments());
@@ -118,8 +149,7 @@ public class LogicalTableRouteProvider implements
TableRouteProvider {
if (routeInfo.getRealtimeTables() != null) {
for (TableRouteInfo physicalTableInfo : routeInfo.getRealtimeTables()) {
- routeProvider.calculateRoutes(physicalTableInfo, routingManager, null,
realtimeBrokerRequest,
- requestId);
+ routeProvider.calculateRoutes(physicalTableInfo, routingManager, null,
realtimeBrokerRequest, requestId);
numPrunedSegments += physicalTableInfo.getNumPrunedSegmentsTotal();
if (physicalTableInfo.getUnavailableSegments() != null) {
unavailableSegments.addAll(physicalTableInfo.getUnavailableSegments());
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/PhysicalTableRouteProvider.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/PhysicalTableRouteProvider.java
index 287afcdd7b..04d09fd634 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/PhysicalTableRouteProvider.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/PhysicalTableRouteProvider.java
@@ -45,7 +45,6 @@ public class PhysicalTableRouteProvider extends
ImplicitHybridTableRouteProvider
@Override
public void calculateRoutes(TableRouteInfo tableRouteInfo, RoutingManager
routingManager,
BrokerRequest offlineBrokerRequest, BrokerRequest realtimeBrokerRequest,
long requestId) {
- assert (tableRouteInfo.isExists());
String offlineTableName = tableRouteInfo.getOfflineTableName();
String realtimeTableName = tableRouteInfo.getRealtimeTableName();
Map<ServerInstance, ServerRouteInfo> offlineRoutingTable = null;
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
index 04be7d3cdf..e204c917da 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.timeboundary;
import com.google.auto.service.AutoService;
import com.google.common.base.Preconditions;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.config.provider.TableCache;
@@ -36,7 +37,27 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@AutoService(TimeBoundaryStrategy.class)
public class MinTimeBoundaryStrategy implements TimeBoundaryStrategy {
- public static final String INCLUDED_TABLES = "includedTables";
+ private static final String INCLUDED_TABLES = "includedTables";
+ Map<String, DateTimeFormatSpec> _dateTimeFormatSpecMap;
+
+ @Override
+ public void init(LogicalTableConfig logicalTableConfig, TableCache
tableCache) {
+ List<String> includedTables =
getTimeBoundaryTableNames(logicalTableConfig);
+ _dateTimeFormatSpecMap = new HashMap<>(includedTables.size());
+ for (String physicalTableName : includedTables) {
+ String rawTableName =
TableNameBuilder.extractRawTableName(physicalTableName);
+ Schema schema = tableCache.getSchema(rawTableName);
+ TableConfig tableConfig = tableCache.getTableConfig(physicalTableName);
+ Preconditions.checkArgument(tableConfig != null, "Table config not found
for table: %s", physicalTableName);
+ Preconditions.checkArgument(schema != null, "Schema not found for table:
%s", physicalTableName);
+ String timeColumnName =
tableConfig.getValidationConfig().getTimeColumnName();
+ DateTimeFieldSpec dateTimeFieldSpec =
schema.getSpecForTimeColumn(timeColumnName);
+ Preconditions.checkArgument(dateTimeFieldSpec != null, "Time column not
found in schema for table: %s",
+ physicalTableName);
+ DateTimeFormatSpec specFormatSpec = dateTimeFieldSpec.getFormatSpec();
+ _dateTimeFormatSpecMap.put(physicalTableName, specFormatSpec);
+ }
+ }
@Override
public String getName() {
@@ -44,29 +65,13 @@ public class MinTimeBoundaryStrategy implements
TimeBoundaryStrategy {
}
@Override
- public TimeBoundaryInfo computeTimeBoundary(LogicalTableConfig
logicalTableConfig, TableCache tableCache,
- RoutingManager routingManager) {
+ public TimeBoundaryInfo computeTimeBoundary(RoutingManager routingManager) {
TimeBoundaryInfo minTimeBoundaryInfo = null;
long minTimeBoundary = Long.MAX_VALUE;
- Map<String, Object> parameters =
logicalTableConfig.getTimeBoundaryConfig().getParameters();
- List<String> includedTables =
- parameters != null ? (List) parameters.getOrDefault("includedTables",
List.of()) : List.of();
- for (String physicalTableName : includedTables) {
- TimeBoundaryInfo current =
routingManager.getTimeBoundaryInfo(physicalTableName);
+ for (Map.Entry<String, DateTimeFormatSpec> entry :
_dateTimeFormatSpecMap.entrySet()) {
+ TimeBoundaryInfo current =
routingManager.getTimeBoundaryInfo(entry.getKey());
if (current != null) {
- String rawTableName =
TableNameBuilder.extractRawTableName(physicalTableName);
- Schema schema = tableCache.getSchema(rawTableName);
- TableConfig tableConfig = tableCache.getTableConfig(physicalTableName);
- Preconditions.checkArgument(tableConfig != null,
- "Table config not found for table: %s", physicalTableName);
- Preconditions.checkArgument(schema != null,
- "Schema not found for table: %s", physicalTableName);
- String timeColumnName =
tableConfig.getValidationConfig().getTimeColumnName();
- DateTimeFieldSpec dateTimeFieldSpec =
schema.getSpecForTimeColumn(timeColumnName);
- Preconditions.checkArgument(dateTimeFieldSpec != null,
- "Time column not found in schema for table: %s",
physicalTableName);
- DateTimeFormatSpec specFormatSpec = dateTimeFieldSpec.getFormatSpec();
- long currentTimeBoundaryMillis =
specFormatSpec.fromFormatToMillis(current.getTimeValue());
+ long currentTimeBoundaryMillis =
entry.getValue().fromFormatToMillis(current.getTimeValue());
if (minTimeBoundaryInfo == null) {
minTimeBoundaryInfo = current;
minTimeBoundary = currentTimeBoundaryMillis;
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
index 7a4ee21794..f2215b29b2 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
@@ -34,16 +34,20 @@ public interface TimeBoundaryStrategy {
*/
String getName();
+ /**
+ * Initializes the time boundary strategy with the given logical table
configuration and table cache.
+ * @param logicalTableConfig The logical table configuration to use for
initialization.
+ * @param tableCache The table cache to use for initialization.
+ */
+ void init(LogicalTableConfig logicalTableConfig, TableCache tableCache);
+
/**
* Computes the time boundary for the given physical table names.
*
- * @param logicalTableConfig The logical table configuration.
- * @param tableCache The table cache to use for fetching table metadata.
* @param routingManager The routing manager to use for computing the time
boundary.
* @return The computed time boundary information.
*/
- TimeBoundaryInfo computeTimeBoundary(LogicalTableConfig logicalTableConfig,
TableCache tableCache,
- RoutingManager routingManager);
+ TimeBoundaryInfo computeTimeBoundary(RoutingManager routingManager);
/**
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java
index 1c08dca4ae..71901f8432 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java
@@ -170,7 +170,7 @@ public class BaseTableRouteTest {
TimeBoundaryStrategyService mockService =
mock(TimeBoundaryStrategyService.class);
when(TimeBoundaryStrategyService.getInstance()).thenReturn(mockService);
when(mockService.getTimeBoundaryStrategy(any())).thenReturn(_timeBoundaryStrategy);
- when(_timeBoundaryStrategy.computeTimeBoundary(any(), any(),
any())).thenReturn(mock(TimeBoundaryInfo.class));
+
when(_timeBoundaryStrategy.computeTimeBoundary(any())).thenReturn(mock(TimeBoundaryInfo.class));
}
@AfterClass
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderCalculateRouteTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderCalculateRouteTest.java
index 6beb5d3ec6..be8606d068 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderCalculateRouteTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderCalculateRouteTest.java
@@ -27,9 +27,9 @@ import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.routing.RoutingTable;
import org.apache.pinot.core.routing.ServerRouteInfo;
-import org.apache.pinot.core.transport.ImplicitHybridTableRouteInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.transport.TableRouteInfo;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -59,9 +59,8 @@ public class
ImplicitHybridTableRouteProviderCalculateRouteTest extends BaseTabl
}
}
- private ImplicitHybridTableRouteInfo getImplicitHybridTableRouteInfo(String
tableName) {
- ImplicitHybridTableRouteInfo routeInfo =
- _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache,
_routingManager);
+ private TableRouteInfo getImplicitHybridTableRouteInfo(String tableName) {
+ TableRouteInfo routeInfo =
_hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache,
_routingManager);
BrokerRequestPair brokerRequestPair =
getBrokerRequestPair(tableName, routeInfo.hasOffline(),
routeInfo.hasRealtime(),
routeInfo.getOfflineTableName(), routeInfo.getRealtimeTableName());
@@ -73,7 +72,7 @@ public class
ImplicitHybridTableRouteProviderCalculateRouteTest extends BaseTabl
private void assertTableRoute(String tableName, Map<String, Set<String>>
expectedOfflineRoutingTable,
Map<String, Set<String>> expectedRealtimeRoutingTable, boolean
isOfflineExpected, boolean isRealtimeExpected) {
- ImplicitHybridTableRouteInfo routeInfo =
getImplicitHybridTableRouteInfo(tableName);
+ TableRouteInfo routeInfo = getImplicitHybridTableRouteInfo(tableName);
// If a routing table for offline table is expected, then compare it with
the expected routing table.
if (isOfflineExpected) {
@@ -281,7 +280,7 @@ public class
ImplicitHybridTableRouteProviderCalculateRouteTest extends BaseTabl
private void assertEqualsTableRouteInfoGetTableRouteResult(String tableName,
Map<String, Set<String>> expectedOfflineRoutingTable,
Map<String, Set<String>> expectedRealtimeRoutingTable, boolean
isOfflineExpected, boolean isRealtimeExpected) {
- ImplicitHybridTableRouteInfo routeInfo =
getImplicitHybridTableRouteInfo(tableName);
+ TableRouteInfo routeInfo = getImplicitHybridTableRouteInfo(tableName);
GetTableRouteResult expectedTableRoute = getTableRouting(tableName,
_routingManager);
if (isOfflineExpected) {
@@ -332,7 +331,7 @@ public class
ImplicitHybridTableRouteProviderCalculateRouteTest extends BaseTabl
@Test(dataProvider = "routeNotExistsProvider")
void testTableRoutingForRouteNotExists(String tableName) {
- ImplicitHybridTableRouteInfo routeInfo =
getImplicitHybridTableRouteInfo(tableName);
+ TableRouteInfo routeInfo = getImplicitHybridTableRouteInfo(tableName);
GetTableRouteResult expectedTableRoute = getTableRouting(tableName,
_routingManager);
assertNull(expectedTableRoute._offlineRoutingTable);
@@ -348,7 +347,7 @@ public class
ImplicitHybridTableRouteProviderCalculateRouteTest extends BaseTabl
@Test(dataProvider = "partiallyDisabledTableAndRouteProvider")
void testTableRoutingForPartiallyDisabledTable(String tableName, Map<String,
Set<String>> expectedOfflineRoutingTable,
Map<String, Set<String>> expectedRealtimeRoutingTable) {
- ImplicitHybridTableRouteInfo routeInfo =
getImplicitHybridTableRouteInfo(tableName);
+ TableRouteInfo routeInfo = getImplicitHybridTableRouteInfo(tableName);
GetTableRouteResult expectedTableRoute = getTableRouting(tableName,
_routingManager);
if (expectedOfflineRoutingTable == null) {
@@ -376,7 +375,7 @@ public class
ImplicitHybridTableRouteProviderCalculateRouteTest extends BaseTabl
@Test(dataProvider = "disabledTableProvider")
void testTableRoutingForDisabledTable(String tableName) {
- ImplicitHybridTableRouteInfo routeInfo =
getImplicitHybridTableRouteInfo(tableName);
+ TableRouteInfo routeInfo = getImplicitHybridTableRouteInfo(tableName);
GetTableRouteResult expectedTableRoute = getTableRouting(tableName,
_routingManager);
if (expectedTableRoute._offlineTableDisabled) {
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderGetTableRouteTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderGetTableRouteTest.java
index 2a5e046f49..ddd34ec9da 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderGetTableRouteTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/ImplicitHybridTableRouteProviderGetTableRouteTest.java
@@ -21,7 +21,7 @@ package org.apache.pinot.query.routing.table;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
-import org.apache.pinot.core.transport.ImplicitHybridTableRouteInfo;
+import org.apache.pinot.core.transport.TableRouteInfo;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.exception.QueryErrorCode;
@@ -41,26 +41,21 @@ import static org.testng.Assert.assertTrue;
public class ImplicitHybridTableRouteProviderGetTableRouteTest extends
BaseTableRouteTest {
@Test(dataProvider = "offlineTableProvider")
public void testOfflineTable(String parameter) {
- ImplicitHybridTableRouteInfo routeInfo =
- _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache,
_routingManager);
-
+ TableRouteInfo routeInfo =
_hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache,
_routingManager);
assertTrue(routeInfo.isExists(), "The table should exist");
assertTrue(routeInfo.isOffline(), "The table should be offline");
}
@Test(dataProvider = "realtimeTableProvider")
public void testRealtimeTable(String parameter) {
- ImplicitHybridTableRouteInfo routeInfo =
- _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache,
_routingManager);
-
+ TableRouteInfo routeInfo =
_hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache,
_routingManager);
assertTrue(routeInfo.isExists(), "The table should exist");
assertTrue(routeInfo.isRealtime(), "The table should be realtime");
}
@Test(dataProvider = "hybridTableProvider")
public void testHybridTable(String parameter) {
- ImplicitHybridTableRouteInfo routeInfo =
- _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache,
_routingManager);
+ TableRouteInfo routeInfo =
_hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache,
_routingManager);
assertTrue(routeInfo.isExists(), "The table should exist");
assertTrue(routeInfo.isHybrid(), "The table should be hybrid");
@@ -71,62 +66,48 @@ public class
ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable
*/
@Test
public void testWithNoTimeBoundary() {
- ImplicitHybridTableRouteInfo routeInfo =
- _hybridTableRouteProvider.getTableRouteInfo("b", _tableCache,
_routingManager);
-
+ TableRouteInfo routeInfo =
_hybridTableRouteProvider.getTableRouteInfo("b", _tableCache, _routingManager);
assertTrue(routeInfo.isExists(), "The table should exist");
assertTrue(routeInfo.isRealtime(), "The table should be realtime");
}
@Test(dataProvider = "nonExistentTableProvider")
public void testNonExistentTableName(String parameter) {
- ImplicitHybridTableRouteInfo routeInfo =
- _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache,
_routingManager);
-
+ TableRouteInfo routeInfo =
_hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache,
_routingManager);
assertFalse(routeInfo.isExists(), "The table should not exist");
}
@Test(dataProvider = "routeExistsProvider")
public void testRouteExists(String parameter) {
- ImplicitHybridTableRouteInfo routeInfo =
- _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache,
_routingManager);
-
+ TableRouteInfo routeInfo =
_hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache,
_routingManager);
assertTrue(routeInfo.isExists(), "The table should exist");
assertTrue(routeInfo.isRouteExists(), "The table should have route");
}
@Test(dataProvider = "routeNotExistsProvider")
public void testRouteNotExists(String parameter) {
- ImplicitHybridTableRouteInfo routeInfo =
- _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache,
_routingManager);
-
+ TableRouteInfo routeInfo =
_hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache,
_routingManager);
assertTrue(routeInfo.isExists(), "The table should exist");
assertFalse(routeInfo.isRouteExists(), "The table should not have route");
}
@Test(dataProvider = "notDisabledTableProvider")
public void testNotDisabledTable(String parameter) {
- ImplicitHybridTableRouteInfo routeInfo =
- _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache,
_routingManager);
-
+ TableRouteInfo routeInfo =
_hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache,
_routingManager);
assertTrue(routeInfo.isExists(), "The table should exist");
assertFalse(routeInfo.isDisabled(), "The table should not be disabled");
}
@Test(dataProvider = "partiallyDisabledTableProvider")
public void testPartiallyDisabledTable(String parameter) {
- ImplicitHybridTableRouteInfo routeInfo =
- _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache,
_routingManager);
-
+ TableRouteInfo routeInfo =
_hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache,
_routingManager);
assertTrue(routeInfo.isExists(), "The table should exist");
assertFalse(routeInfo.isDisabled(), "The table should be disabled");
}
@Test(dataProvider = "disabledTableProvider")
public void testDisabledTable(String parameter) {
- ImplicitHybridTableRouteInfo routeInfo =
- _hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache,
_routingManager);
-
+ TableRouteInfo routeInfo =
_hybridTableRouteProvider.getTableRouteInfo(parameter, _tableCache,
_routingManager);
assertTrue(routeInfo.isExists(), "The table should exist");
assertTrue(routeInfo.isDisabled(), "The table should not have route");
}
@@ -158,10 +139,8 @@ public class
ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable
/**
* Similar if offlineTableName is not null and there is a route in the
routing routeInfo. Same for
* realtimeTableName.
- * @param routeInfo
- * @return
*/
- boolean similar(ImplicitHybridTableRouteInfo routeInfo) {
+ boolean similar(TableRouteInfo routeInfo) {
boolean isEquals = true;
if (_offlineTableName != null) {
@@ -272,13 +251,11 @@ public class
ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable
/**
* This test checks tables that have a table config and an entry in routing
manager.
* It makes sure that getTableNameAndConfig() behaves the same way as
ImplicitTableRouteComputer.
- * @param tableName
*/
@Test(dataProvider = "tableNameAndConfigSuccessProvider")
public void testTableNameAndConfigSuccess(String tableName) {
TableNameAndConfig tableNameAndConfig = getTableNameAndConfig(tableName);
- ImplicitHybridTableRouteInfo routeInfo =
- _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache,
_routingManager);
+ TableRouteInfo routeInfo =
_hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache,
_routingManager);
assertTrue(tableNameAndConfig.similar(routeInfo), "The table name and
config should match the hybrid table");
}
@@ -303,13 +280,11 @@ public class
ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable
/**
* This test checks tables that do not have a table config or an entry in
routing manager.
- * @param tableName
*/
@Test(dataProvider = "tableNameAndConfigFailureProvider")
public void testTableNameAndConfigFailure(String tableName) {
TableNameAndConfig tableNameAndConfig = getTableNameAndConfig(tableName);
- ImplicitHybridTableRouteInfo routeInfo =
- _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache,
_routingManager);
+ TableRouteInfo routeInfo =
_hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache,
_routingManager);
// getTableNameAndConfig() returns an error as a BrokerResponse with the
right error code.
assertNotNull(tableNameAndConfig._brokerResponse);
@@ -373,12 +348,10 @@ public class
ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable
/**
* If a table is not disabled, then checkTableDisabled() should return null.
* ImplicitTableRouteComputer should not be disabled.
- * @param tableName
*/
@Test(dataProvider = "notDisabledTableProvider")
public void testNotDisabledWithCheckDisabled(String tableName) {
- ImplicitHybridTableRouteInfo routeInfo =
- _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache,
_routingManager);
+ TableRouteInfo routeInfo =
_hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache,
_routingManager);
ExceptionOrResponse exceptionOrResponse =
checkTableDisabled(routeInfo.hasOffline() &&
routeInfo.isOfflineTableDisabled(),
@@ -393,12 +366,10 @@ public class
ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable
/**
* In a hybrid table, if one of the tables is disabled, then
checkTableDisabled() should return an exception.
* ImplicitTableRouteComputer should not be disabled.
- * @param tableName
*/
@Test(dataProvider = "partiallyDisabledTableProvider")
public void testPartiallyDisabledWithCheckDisabled(String tableName) {
- ImplicitHybridTableRouteInfo routeInfo =
- _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache,
_routingManager);
+ TableRouteInfo routeInfo =
_hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache,
_routingManager);
ExceptionOrResponse exceptionOrResponse =
checkTableDisabled(routeInfo.hasOffline() &&
routeInfo.isOfflineTableDisabled(),
@@ -416,12 +387,10 @@ public class
ImplicitHybridTableRouteProviderGetTableRouteTest extends BaseTable
* If a table is disabled, then checkTableDisabled() should return a broker
response with error code
* TABLE_IS_DISABLED.
* ImplicitTableRouteComputer should be disabled.
- * @param tableName
*/
@Test(dataProvider = "disabledTableProvider")
public void testDisabledWithCheckDisabled(String tableName) {
- ImplicitHybridTableRouteInfo routeInfo =
- _hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache,
_routingManager);
+ TableRouteInfo routeInfo =
_hybridTableRouteProvider.getTableRouteInfo(tableName, _tableCache,
_routingManager);
ExceptionOrResponse exceptionOrResponse =
checkTableDisabled(routeInfo.hasOffline() &&
routeInfo.isOfflineTableDisabled(),
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategyTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategyTest.java
index 464ff805d9..7e0ce2d2fe 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategyTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategyTest.java
@@ -84,7 +84,6 @@ public class MinTimeBoundaryStrategyTest {
return new Object[][]{
{timeBoundaryInfoMap, List.of("table3_OFFLINE"), "table3_OFFLINE"},
- {timeBoundaryInfoMap, List.of("Invalid_OFFLINE"), "Invalid_OFFLINE"},
{timeBoundaryInfoMap, List.of("table2_OFFLINE", "table3_OFFLINE"),
"table2_OFFLINE"},
{timeBoundaryInfoMap, List.of("table3_OFFLINE", "table2_OFFLINE",
"table4_OFFLINE"), "table2_OFFLINE"},
{timeBoundaryInfoMap, List.of(), "empty_includedTables_OFFLINE"}
@@ -102,8 +101,8 @@ public class MinTimeBoundaryStrategyTest {
private void testComputeTimeBoundary(Map<String, TimeBoundaryInfo>
timeBoundaryInfoMap, String expectedTableName,
Map<String, Object> parameters) {
setupMocks(timeBoundaryInfoMap);
- TimeBoundaryInfo timeBoundaryInfo =
_minTimeBoundaryStrategy.computeTimeBoundary(
- createLogicalTableConfig(parameters), _mockTableCache,
_mockRoutingManager);
+ _minTimeBoundaryStrategy.init(createLogicalTableConfig(parameters),
_mockTableCache);
+ TimeBoundaryInfo timeBoundaryInfo =
_minTimeBoundaryStrategy.computeTimeBoundary(_mockRoutingManager);
assertSame(timeBoundaryInfo, timeBoundaryInfoMap.get(expectedTableName));
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index 8bd40d51c0..b0eb2e12d0 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -36,10 +36,12 @@ import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.common.request.QuerySource;
+import org.apache.pinot.common.request.TableSegmentsInfo;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.LogicalTableContext;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.optimizer.QueryOptimizer;
import org.apache.pinot.core.query.request.ServerQueryRequest;
@@ -112,8 +114,14 @@ public class ServerPlanRequestUtils {
// 2. Convert PinotQuery into InstanceRequest list (one for each physical
table)
PinotQuery pinotQuery = serverContext.getPinotQuery();
pinotQuery.setExplain(explain);
- List<InstanceRequest> instanceRequests =
- constructServerQueryRequests(executionContext, pinotQuery,
leafQueryExecutor.getInstanceDataManager());
+ List<InstanceRequest> instanceRequests;
+ if (executionContext.getWorkerMetadata().getLogicalTableSegmentsMap() !=
null) {
+ instanceRequests =
constructLogicalTableServerQueryRequests(executionContext, pinotQuery,
+ leafQueryExecutor.getInstanceDataManager());
+ } else {
+ instanceRequests =
+ constructServerQueryRequests(executionContext, pinotQuery,
leafQueryExecutor.getInstanceDataManager());
+ }
int numRequests = instanceRequests.size();
List<ServerQueryRequest> serverQueryRequests = new
ArrayList<>(numRequests);
for (InstanceRequest instanceRequest : instanceRequests) {
@@ -163,16 +171,20 @@ public class ServerPlanRequestUtils {
TableDataManager tableDataManager =
instanceDataManager.getTableDataManager(offlineTableName);
Preconditions.checkState(tableDataManager != null, "Failed to find
data manager for table: %s",
offlineTableName);
- return List.of(compileInstanceRequest(executionContext, pinotQuery,
timeBoundary, TableType.OFFLINE, segments,
- tableDataManager));
+ Pair<TableConfig, Schema> tableConfigAndSchema =
tableDataManager.getCachedTableConfigAndSchema();
+ return List.of(compileInstanceRequest(executionContext, pinotQuery,
timeBoundary, TableType.OFFLINE,
+ tableDataManager.getTableName(), tableConfigAndSchema.getLeft(),
tableConfigAndSchema.getRight(), segments,
+ null));
} else {
assert tableType.equals(TableType.REALTIME.name());
String realtimeTableName =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName);
TableDataManager tableDataManager =
instanceDataManager.getTableDataManager(realtimeTableName);
Preconditions.checkState(tableDataManager != null, "Failed to find
data manager for table: %s",
realtimeTableName);
- return List.of(compileInstanceRequest(executionContext, pinotQuery,
timeBoundary, TableType.REALTIME, segments,
- tableDataManager));
+ Pair<TableConfig, Schema> tableConfigAndSchema =
tableDataManager.getCachedTableConfigAndSchema();
+ return List.of(compileInstanceRequest(executionContext, pinotQuery,
timeBoundary, TableType.REALTIME,
+ tableDataManager.getTableName(), tableConfigAndSchema.getLeft(),
tableConfigAndSchema.getRight(), segments,
+ null));
}
} else {
assert numRequests == 2;
@@ -183,16 +195,21 @@ public class ServerPlanRequestUtils {
TableDataManager offlineTableDataManager =
instanceDataManager.getTableDataManager(offlineTableName);
Preconditions.checkState(offlineTableDataManager != null, "Failed to
find data manager for table: %s",
offlineTableName);
+ Pair<TableConfig, Schema> offlineTableConfigAndSchema =
offlineTableDataManager.getCachedTableConfigAndSchema();
String realtimeTableName =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName);
TableDataManager realtimeTableDataManager =
instanceDataManager.getTableDataManager(realtimeTableName);
Preconditions.checkState(realtimeTableDataManager != null, "Failed to
find data manager for table: %s",
realtimeTableName);
+ Pair<TableConfig, Schema> realtimeTableConfigAndSchema =
+ realtimeTableDataManager.getCachedTableConfigAndSchema();
// NOTE: Make a deep copy of PinotQuery for OFFLINE request.
return List.of(
compileInstanceRequest(executionContext, new PinotQuery(pinotQuery),
timeBoundary, TableType.OFFLINE,
- offlineSegments, offlineTableDataManager),
- compileInstanceRequest(executionContext, pinotQuery, timeBoundary,
TableType.REALTIME, realtimeSegments,
- realtimeTableDataManager));
+ offlineTableDataManager.getTableName(),
offlineTableConfigAndSchema.getLeft(),
+ offlineTableConfigAndSchema.getRight(), offlineSegments, null),
+ compileInstanceRequest(executionContext, pinotQuery, timeBoundary,
TableType.REALTIME,
+ realtimeTableDataManager.getTableName(),
realtimeTableConfigAndSchema.getLeft(),
+ realtimeTableConfigAndSchema.getRight(), realtimeSegments,
null));
}
}
@@ -200,13 +217,16 @@ public class ServerPlanRequestUtils {
* Convert {@link PinotQuery} into an {@link InstanceRequest}.
*/
private static InstanceRequest
compileInstanceRequest(OpChainExecutionContext executionContext, PinotQuery
pinotQuery,
- @Nullable TimeBoundaryInfo timeBoundaryInfo, TableType tableType,
List<String> segmentList,
- TableDataManager tableDataManager) {
+ @Nullable TimeBoundaryInfo timeBoundaryInfo, TableType tableType,
+ String tableNameWithType, TableConfig tableConfig, Schema schema,
@Nullable List<String> segmentList,
+ @Nullable List<TableSegmentsInfo> tableRouteInfoList) {
+ Preconditions.checkArgument(segmentList == null || tableRouteInfoList ==
null,
+ "Either segmentList OR tableRouteInfoList should be set");
+
// Making a unique requestId for leaf stages otherwise it causes problem
on stats/metrics/tracing.
long requestId = (executionContext.getRequestId() << 16) + ((long)
executionContext.getStageId() << 8) + (
tableType == TableType.REALTIME ? 1 : 0);
// 1. Modify the PinotQuery
- String tableNameWithType = tableDataManager.getTableName();
pinotQuery.getDataSource().setTableName(tableNameWithType);
if (timeBoundaryInfo != null) {
attachTimeBoundary(pinotQuery, timeBoundaryInfo, tableType ==
TableType.OFFLINE);
@@ -214,8 +234,7 @@ public class ServerPlanRequestUtils {
for (QueryRewriter queryRewriter : QUERY_REWRITERS) {
pinotQuery = queryRewriter.rewrite(pinotQuery);
}
- Pair<TableConfig, Schema> tableConfigAndSchema =
tableDataManager.getCachedTableConfigAndSchema();
- QUERY_OPTIMIZER.optimize(pinotQuery, tableConfigAndSchema.getLeft(),
tableConfigAndSchema.getRight());
+ QUERY_OPTIMIZER.optimize(pinotQuery, tableConfig, schema);
// 2. Update query options according to requestMetadataMap
updateQueryOptions(pinotQuery, executionContext);
@@ -233,7 +252,17 @@ public class ServerPlanRequestUtils {
instanceRequest.setCid(QueryThreadContext.getCid());
instanceRequest.setBrokerId("unknown");
instanceRequest.setEnableTrace(executionContext.isTraceEnabled());
- instanceRequest.setSearchSegments(segmentList);
+ /*
+ * If segmentList is not null, it means that the query is for a single
table and we can directly set the segments.
+ * If segmentList is null, it means that the query is for a logical table
and we need to set TableSegmentInfoList
+ *
+ * Either one of segmentList or tableRouteInfoList has to be set, but not
both.
+ */
+ if (segmentList != null) {
+ instanceRequest.setSearchSegments(segmentList);
+ } else {
+ instanceRequest.setTableSegmentsInfoList(tableRouteInfoList);
+ }
instanceRequest.setQuery(brokerRequest);
return instanceRequest;
@@ -386,4 +415,69 @@ public class ServerPlanRequestUtils {
}
return expressions;
}
+
+ private static List<InstanceRequest>
constructLogicalTableServerQueryRequests(
+ OpChainExecutionContext executionContext, PinotQuery pinotQuery,
InstanceDataManager instanceDataManager) {
+ StageMetadata stageMetadata = executionContext.getStageMetadata();
+ String logicalTableName = stageMetadata.getTableName();
+ LogicalTableContext logicalTableContext =
instanceDataManager.getLogicalTableContext(logicalTableName);
+ Preconditions.checkNotNull(logicalTableContext,
+ "LogicalTableManager not found for logical table name: " +
logicalTableName);
+
+ Map<String, List<String>> logicalTableSegmentsMap =
+ executionContext.getWorkerMetadata().getLogicalTableSegmentsMap();
+ List<TableSegmentsInfo> offlineTableRouteInfoList = new ArrayList<>();
+ List<TableSegmentsInfo> realtimeTableRouteInfoList = new ArrayList<>();
+
+ Preconditions.checkNotNull(logicalTableSegmentsMap);
+ for (Map.Entry<String, List<String>> entry:
logicalTableSegmentsMap.entrySet()) {
+ String physicalTableName = entry.getKey();
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(physicalTableName);
+ TableSegmentsInfo tableSegmentsInfo = new TableSegmentsInfo();
+ tableSegmentsInfo.setTableName(physicalTableName);
+ tableSegmentsInfo.setSegments(entry.getValue());
+ if (tableType == TableType.REALTIME) {
+ realtimeTableRouteInfoList.add(tableSegmentsInfo);
+ } else {
+ offlineTableRouteInfoList.add(tableSegmentsInfo);
+ }
+ }
+
+ TimeBoundaryInfo timeBoundaryInfo = stageMetadata.getTimeBoundary();
+
+ if (offlineTableRouteInfoList.isEmpty() ||
realtimeTableRouteInfoList.isEmpty()) {
+ List<TableSegmentsInfo> routeInfoList =
+ offlineTableRouteInfoList.isEmpty() ? realtimeTableRouteInfoList :
offlineTableRouteInfoList;
+ String tableType = offlineTableRouteInfoList.isEmpty() ?
TableType.REALTIME.name() : TableType.OFFLINE.name();
+ if (tableType.equals(TableType.OFFLINE.name())) {
+
Preconditions.checkNotNull(logicalTableContext.getRefOfflineTableConfig());
+ String offlineTableName =
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(logicalTableName);
+ return List.of(
+ compileInstanceRequest(executionContext, pinotQuery,
timeBoundaryInfo, TableType.OFFLINE, offlineTableName,
+ logicalTableContext.getRefOfflineTableConfig(),
logicalTableContext.getLogicalTableSchema(), null,
+ routeInfoList));
+ } else {
+
Preconditions.checkNotNull(logicalTableContext.getRefRealtimeTableConfig());
+ String realtimeTableName =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(logicalTableName);
+ return List.of(
+ compileInstanceRequest(executionContext, pinotQuery,
timeBoundaryInfo, TableType.REALTIME,
+ realtimeTableName,
logicalTableContext.getRefRealtimeTableConfig(),
+ logicalTableContext.getLogicalTableSchema(), null,
routeInfoList));
+ }
+ } else {
+
Preconditions.checkNotNull(logicalTableContext.getRefOfflineTableConfig());
+
Preconditions.checkNotNull(logicalTableContext.getRefRealtimeTableConfig());
+ String offlineTableName =
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(logicalTableName);
+ String realtimeTableName =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(logicalTableName);
+ PinotQuery offlinePinotQuery = pinotQuery.deepCopy();
+ PinotQuery realtimePinotQuery = pinotQuery.deepCopy();
+ return List.of(
+ compileInstanceRequest(executionContext, offlinePinotQuery,
timeBoundaryInfo, TableType.OFFLINE,
+ offlineTableName, logicalTableContext.getRefOfflineTableConfig(),
+ logicalTableContext.getLogicalTableSchema(), null,
offlineTableRouteInfoList),
+ compileInstanceRequest(executionContext, realtimePinotQuery,
timeBoundaryInfo, TableType.REALTIME,
+ realtimeTableName,
logicalTableContext.getRefRealtimeTableConfig(),
+ logicalTableContext.getLogicalTableSchema(), null,
realtimeTableRouteInfoList));
+ }
+ }
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 292833ff85..c71321b6d1 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -47,6 +47,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.LogicalTableContext;
import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
@@ -64,6 +65,7 @@ import
org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.plugin.PluginManager;
@@ -517,4 +519,42 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
});
}
}
+
+ // TODO: LogicalTableContext has to be cached.
https://github.com/apache/pinot/issues/15859
+ @Nullable
+ @Override
+ public LogicalTableContext getLogicalTableContext(String logicalTableName) {
+ Schema schema = ZKMetadataProvider.getSchema(getPropertyStore(),
logicalTableName);
+ if (schema == null) {
+ LOGGER.warn("Failed to find schema for logical table: {}, skipping",
logicalTableName);
+ return null;
+ }
+ LogicalTableConfig logicalTableConfig =
ZKMetadataProvider.getLogicalTableConfig(getPropertyStore(),
+ logicalTableName);
+ if (logicalTableConfig == null) {
+ LOGGER.warn("Failed to find logical table config for logical table: {},
skipping", logicalTableName);
+ return null;
+ }
+
+ TableConfig offlineTableConfig = null;
+ if (logicalTableConfig.getRefOfflineTableName() != null) {
+ offlineTableConfig =
ZKMetadataProvider.getOfflineTableConfig(getPropertyStore(),
+ logicalTableConfig.getRefOfflineTableName());
+ if (offlineTableConfig == null) {
+ LOGGER.warn("Failed to find offline table config for logical table:
{}, skipping", logicalTableName);
+ return null;
+ }
+ }
+
+ TableConfig realtimeTableConfig = null;
+ if (logicalTableConfig.getRefRealtimeTableName() != null) {
+ realtimeTableConfig =
ZKMetadataProvider.getRealtimeTableConfig(getPropertyStore(),
+ logicalTableConfig.getRefRealtimeTableName());
+ if (realtimeTableConfig == null) {
+ LOGGER.warn("Failed to find realtime table config for logical table:
{}, skipping", logicalTableName);
+ return null;
+ }
+ }
+ return new LogicalTableContext(logicalTableConfig, schema,
offlineTableConfig, realtimeTableConfig);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]