This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c9f0c47d0a Logical table time boundary (#15776)
c9f0c47d0a is described below
commit c9f0c47d0ad96607760b706a79802d1598222ef3
Author: Abhishek Bafna <[email protected]>
AuthorDate: Tue May 20 11:55:29 2025 +0530
Logical table time boundary (#15776)
* Time boundary computation for SSE.
* Fix checkstyle
* Fix checkstyle
* Enhancing validations and tests for time boundary.
* Addressing review comments.
* Addressing review comments.
* Addressing review comments.
---------
Co-authored-by: abhishekbafna <[email protected]>
---
...okerResourceOnlineOfflineStateModelFactory.java | 22 ++--
.../BrokerUserDefinedMessageHandlerFactory.java | 27 +++++
.../pinot/broker/routing/BrokerRoutingManager.java | 71 ++++++++++--
.../messages/LogicalTableConfigRefreshMessage.java | 65 +++++++++++
.../pinot/common/metadata/ZKMetadataProvider.java | 4 +
.../common/utils/LogicalTableConfigUtils.java | 29 +++++
.../helix/core/PinotHelixResourceManager.java | 31 +++++
.../resources/PinotLogicalTableResourceTest.java | 54 +++++++++
.../pinot/controller/helix/ControllerTest.java | 2 +
.../BaseLogicalTableIntegrationTest.java | 127 +++++++++++++++++----
...hOneOfflineOneRealtimeTableIntegrationTest.java | 44 +++++++
...alTableWithOneRealtimeTableIntegrationTest.java | 29 +++++
...elveOfflineOneRealtimeTableIntegrationTest.java | 43 +++++++
...hTwoOfflineOneRealtimeTableIntegrationTest.java | 77 +++++++++++++
.../routing/table/LogicalTableRouteProvider.java | 27 +++--
.../timeboundary/MinTimeBoundaryStrategy.java | 79 +++++++++++++
.../query/timeboundary/TimeBoundaryStrategy.java | 46 ++++++++
.../timeboundary/TimeBoundaryStrategyService.java | 71 ++++++++++++
.../query/routing/table/BaseTableRouteTest.java | 26 ++++-
...ogicalTableRouteProviderCalculateRouteTest.java | 14 +++
.../LogicalTableRouteProviderGetRouteTest.java | 7 ++
.../timeboundary/MinTimeBoundaryStrategyTest.java | 122 ++++++++++++++++++++
.../TimeBoundaryStrategyServiceTest.java | 40 +++++++
.../apache/pinot/spi/data/LogicalTableConfig.java | 14 +++
.../apache/pinot/spi/data/TimeBoundaryConfig.java | 52 +++++++++
.../utils/builder/ControllerRequestURLBuilder.java | 8 ++
.../utils/builder/LogicalTableConfigBuilder.java | 8 ++
27 files changed, 1087 insertions(+), 52 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
index 41f9e9ef87..dcf8a667e1 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
@@ -75,18 +75,22 @@ public class BrokerResourceOnlineOfflineStateModelFactory
extends StateModelFact
@Transition(from = "OFFLINE", to = "ONLINE")
public void onBecomeOnlineFromOffline(Message message, NotificationContext
context) {
- String tableNameWithType = message.getPartitionName();
- LOGGER.info("Processing transition from OFFLINE to ONLINE for table:
{}", tableNameWithType);
+ String physicalOrLogicalTable = message.getPartitionName();
+ LOGGER.info("Processing transition from OFFLINE to ONLINE for table:
{}", physicalOrLogicalTable);
try {
- _routingManager.buildRouting(tableNameWithType);
- TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
- _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig,
-
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(BROKER_RESOURCE_INSTANCE)));
- _queryQuotaManager.createDatabaseRateLimiter(
-
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableNameWithType));
+ if (ZKMetadataProvider.isLogicalTableExists(_propertyStore,
physicalOrLogicalTable)) {
+ _routingManager.buildRoutingForLogicalTable(physicalOrLogicalTable);
+ } else {
+ _routingManager.buildRouting(physicalOrLogicalTable);
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, physicalOrLogicalTable);
+ _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig,
+
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(BROKER_RESOURCE_INSTANCE)));
+ _queryQuotaManager.createDatabaseRateLimiter(
+
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(physicalOrLogicalTable));
+ }
} catch (Exception e) {
LOGGER.error("Caught exception while processing transition from
OFFLINE to ONLINE for table: {}",
- tableNameWithType, e);
+ physicalOrLogicalTable, e);
throw e;
}
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
index f4da13621e..033a126ea6 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
@@ -27,6 +27,7 @@ import
org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManage
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.messages.ApplicationQpsQuotaRefreshMessage;
import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage;
+import org.apache.pinot.common.messages.LogicalTableConfigRefreshMessage;
import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
import org.apache.pinot.common.messages.TableConfigRefreshMessage;
@@ -62,6 +63,8 @@ public class BrokerUserDefinedMessageHandlerFactory
implements MessageHandlerFac
return new RefreshSegmentMessageHandler(new
SegmentRefreshMessage(message), context);
case TableConfigRefreshMessage.REFRESH_TABLE_CONFIG_MSG_SUB_TYPE:
return new RefreshTableConfigMessageHandler(new
TableConfigRefreshMessage(message), context);
+ case
LogicalTableConfigRefreshMessage.REFRESH_LOGICAL_TABLE_CONFIG_MSG_SUB_TYPE:
+ return new RefreshLogicalTableConfigMessageHandler(new
LogicalTableConfigRefreshMessage(message), context);
case RoutingTableRebuildMessage.REBUILD_ROUTING_TABLE_MSG_SUB_TYPE:
return new RebuildRoutingTableMessageHandler(new
RoutingTableRebuildMessage(message), context);
case DatabaseConfigRefreshMessage.REFRESH_DATABASE_CONFIG_MSG_SUB_TYPE:
@@ -139,6 +142,30 @@ public class BrokerUserDefinedMessageHandlerFactory
implements MessageHandlerFac
}
}
+ private class RefreshLogicalTableConfigMessageHandler extends MessageHandler
{
+ final String _logicalTableName;
+
+ RefreshLogicalTableConfigMessageHandler(LogicalTableConfigRefreshMessage
refreshMessage,
+ NotificationContext context) {
+ super(refreshMessage, context);
+ _logicalTableName = refreshMessage.getLogicalTableName();
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() {
+ _routingManager.buildRoutingForLogicalTable(_logicalTableName);
+ HelixTaskResult result = new HelixTaskResult();
+ result.setSuccess(true);
+ return result;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type) {
+ LOGGER.error("Got error while refreshing logical table config for table:
{} (error code: {}, error type: {})",
+ _logicalTableName, code, type, e);
+ }
+ }
+
private class RefreshDatabaseConfigMessageHandler extends MessageHandler {
final String _databaseName;
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index 9c54f57618..801b39055b 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -70,6 +70,8 @@ import
org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
@@ -418,17 +420,72 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
}
/**
- * Builds/rebuilds the routing for the physical table, for logical tables it
is skipped.
- * @param physicalOrLogicalTable a physical table with type or logical table
name
+ * Builds the routing for a logical table. This method is called when a
logical table is created or updated.
+ * @param logicalTableName the name of the logical table
*/
- public synchronized void buildRouting(String physicalOrLogicalTable) {
- // skip route building for logical tables
- if (ZKMetadataProvider.isLogicalTableExists(_propertyStore,
physicalOrLogicalTable)) {
- LOGGER.info("Skipping route building for logical table: {}",
physicalOrLogicalTable);
+ public synchronized void buildRoutingForLogicalTable(String
logicalTableName) {
+ LogicalTableConfig logicalTableConfig =
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, logicalTableName);
+ Preconditions.checkState(logicalTableConfig != null, "Failed to find
logical table config for: %s",
+ logicalTableConfig);
+ if (!logicalTableConfig.isHybridLogicalTable()) {
+ LOGGER.info("Skip time boundary manager setting for non hybrid logical
table: {}", logicalTableName);
return;
}
- String tableNameWithType = physicalOrLogicalTable;
+ LOGGER.info("Setting time boundary manager for logical table: {}",
logicalTableName);
+
+ TimeBoundaryConfig timeBoundaryConfig =
logicalTableConfig.getTimeBoundaryConfig();
+
Preconditions.checkArgument(timeBoundaryConfig.getBoundaryStrategy().equals("min"),
+ "Invalid time boundary strategy: %s",
timeBoundaryConfig.getBoundaryStrategy());
+ List<String> includedTables =
+ (List<String>)
timeBoundaryConfig.getParameters().getOrDefault("includedTables", List.of());
+
+ for (String tableNameWithType : includedTables) {
+
Preconditions.checkArgument(TableNameBuilder.isOfflineTableResource(tableNameWithType),
+ "Invalid table in the time boundary config: %s", tableNameWithType);
+ try {
+ // build routing if it does not exist for the offline table
+ if (!_routingEntryMap.containsKey(tableNameWithType)) {
+ buildRouting(tableNameWithType);
+ }
+
+ if (_routingEntryMap.get(tableNameWithType).getTimeBoundaryManager()
!= null) {
+ LOGGER.info("Skip time boundary manager init for table: {}",
tableNameWithType);
+ continue;
+ }
+
+ // init time boundary manager for the table
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+ Preconditions.checkState(tableConfig != null, "Failed to find table
config for table: %s", tableNameWithType);
+
+ String idealStatePath = getIdealStatePath(tableNameWithType);
+ IdealState idealState = getIdealState(idealStatePath);
+ Preconditions.checkState(idealState != null, "Failed to find ideal
state for table: %s", tableNameWithType);
+
+ String externalViewPath = getExternalViewPath(tableNameWithType);
+ ExternalView externalView = getExternalView(externalViewPath);
+
+ Set<String> onlineSegments = getOnlineSegments(idealState);
+ SegmentPreSelector segmentPreSelector =
+ SegmentPreSelectorFactory.getSegmentPreSelector(tableConfig,
_propertyStore);
+ Set<String> preSelectedOnlineSegments =
segmentPreSelector.preSelect(onlineSegments);
+
+ TimeBoundaryManager timeBoundaryManager = new
TimeBoundaryManager(tableConfig, _propertyStore, _brokerMetrics);
+ timeBoundaryManager.init(idealState, externalView,
preSelectedOnlineSegments);
+
+
_routingEntryMap.get(tableNameWithType).setTimeBoundaryManager(timeBoundaryManager);
+ } catch (Exception e) {
+ LOGGER.error("Caught unexpected exception while setting time boundary
manager for table: {}", tableNameWithType,
+ e);
+ }
+ }
+ }
+
+ /**
+ * Builds the routing for a table.
+ * @param tableNameWithType the name of the table
+ */
+ public synchronized void buildRouting(String tableNameWithType) {
LOGGER.info("Building routing for table: {}", tableNameWithType);
TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/messages/LogicalTableConfigRefreshMessage.java
b/pinot-common/src/main/java/org/apache/pinot/common/messages/LogicalTableConfigRefreshMessage.java
new file mode 100644
index 0000000000..dc16baa4ee
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/messages/LogicalTableConfigRefreshMessage.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.messages;
+
+import java.util.UUID;
+import org.apache.helix.model.Message;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+
+
+/**
+ * This (Helix) message is sent from the controller to brokers when a request
is received to update the logical table
+ * config.
+ *
+ * NOTE: We keep the table name as a separate key instead of using the Helix
PARTITION_NAME so that this message can be
+ * used for any resource.
+ */
+public class LogicalTableConfigRefreshMessage extends Message {
+ public static final String REFRESH_LOGICAL_TABLE_CONFIG_MSG_SUB_TYPE =
"REFRESH_LOGICAL_TABLE_CONFIG";
+
+ private static final String TABLE_NAME_KEY = "logicalTableName";
+
+ /**
+ * Constructor for the sender.
+ */
+ public LogicalTableConfigRefreshMessage(String logicalTableName) {
+ super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+ setMsgSubType(REFRESH_LOGICAL_TABLE_CONFIG_MSG_SUB_TYPE);
+ // Give it infinite time to process the message, as long as session is
alive
+ setExecutionTimeout(-1);
+ // Set the Pinot specific fields
+ // NOTE: DO NOT use Helix field "PARTITION_NAME" because it can be
overridden by Helix while sending the message
+ ZNRecord znRecord = getRecord();
+ znRecord.setSimpleField(TABLE_NAME_KEY, logicalTableName);
+ }
+
+ /**
+ * Constructor for the receiver.
+ */
+ public LogicalTableConfigRefreshMessage(Message message) {
+ super(message.getRecord());
+ if
(!message.getMsgSubType().equals(REFRESH_LOGICAL_TABLE_CONFIG_MSG_SUB_TYPE)) {
+ throw new IllegalArgumentException("Invalid message subtype:" +
message.getMsgSubType());
+ }
+ }
+
+ public String getLogicalTableName() {
+ return getRecord().getSimpleField(TABLE_NAME_KEY);
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 829a273f2d..029f5c3491 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -883,4 +883,8 @@ public class ZKMetadataProvider {
public static boolean isLogicalTableExists(ZkHelixPropertyStore<ZNRecord>
propertyStore, String tableName) {
return
propertyStore.exists(constructPropertyStorePathForLogical(tableName),
AccessOption.PERSISTENT);
}
+
+ public static boolean isTableConfigExists(ZkHelixPropertyStore<ZNRecord>
propertyStore, String tableName) {
+ return
propertyStore.exists(constructPropertyStorePathForResourceConfig(tableName),
AccessOption.PERSISTENT);
+ }
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
index d9b5e69b1e..42d5e43f9f 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
@@ -33,6 +33,7 @@ import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.PhysicalTableConfig;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -64,6 +65,10 @@ public class LogicalTableConfigUtils {
if (record.getSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY)
!= null) {
builder.setRefRealtimeTableName(record.getSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY));
}
+ String timeBoundaryConfigJson =
record.getSimpleField(LogicalTableConfig.TIME_BOUNDARY_CONFIG_KEY);
+ if (timeBoundaryConfigJson != null) {
+
builder.setTimeBoundaryConfig(JsonUtils.stringToObject(timeBoundaryConfigJson,
TimeBoundaryConfig.class));
+ }
Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
for (Map.Entry<String, String> entry :
record.getMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY)
@@ -105,6 +110,10 @@ public class LogicalTableConfigUtils {
record.setSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY,
logicalTableConfig.getRefRealtimeTableName());
}
+ if (logicalTableConfig.getTimeBoundaryConfig() != null) {
+ record.setSimpleField(LogicalTableConfig.TIME_BOUNDARY_CONFIG_KEY,
+ logicalTableConfig.getTimeBoundaryConfig().toJsonString());
+ }
return record;
}
@@ -199,5 +208,25 @@ public class LogicalTableConfigUtils {
throw new IllegalArgumentException(
"Invalid logical table. Reason: Schema with same name as logical
table '" + tableName + "' does not exist");
}
+
+ // validate time boundary config is not null for hybrid tables
+ TimeBoundaryConfig timeBoundaryConfig =
logicalTableConfig.getTimeBoundaryConfig();
+ if (logicalTableConfig.isHybridLogicalTable() && timeBoundaryConfig ==
null) {
+ throw new IllegalArgumentException(
+ "Invalid logical table. Reason: 'timeBoundaryConfig' should not be
null for hybrid logical tables");
+ }
+
+ // time boundary strategy should not be null or empty
+ if (timeBoundaryConfig != null &&
StringUtils.isEmpty(timeBoundaryConfig.getBoundaryStrategy())) {
+ throw new IllegalArgumentException(
+ "Invalid logical table. Reason:
'timeBoundaryConfig.boundaryStrategy' should not be null or empty");
+ }
+
+ // validate time boundary config parameters
+ if (timeBoundaryConfig != null
+ && (timeBoundaryConfig.getParameters() == null ||
timeBoundaryConfig.getParameters().isEmpty())) {
+ throw new IllegalArgumentException(
+ "Invalid logical table. Reason: 'timeBoundaryConfig.parameters'
should not be null or empty");
+ }
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 81c14fc2ca..892e3fbc11 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -105,6 +105,7 @@ import
org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
import org.apache.pinot.common.lineage.SegmentLineageUtils;
import org.apache.pinot.common.messages.ApplicationQpsQuotaRefreshMessage;
import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage;
+import org.apache.pinot.common.messages.LogicalTableConfigRefreshMessage;
import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
@@ -179,6 +180,7 @@ import org.apache.pinot.spi.config.user.UserConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
@@ -2150,6 +2152,14 @@ public class PinotHelixResourceManager {
updateBrokerResourceForLogicalTable(logicalTableConfig, tableName);
}
+ TimeBoundaryConfig oldTimeBoundaryConfig =
oldLogicalTableConfig.getTimeBoundaryConfig();
+ TimeBoundaryConfig newTimeBoundaryConfig =
logicalTableConfig.getTimeBoundaryConfig();
+ // compare the old and new time boundary config and send message if they
are different
+ if ((oldTimeBoundaryConfig != null &&
!oldTimeBoundaryConfig.equals(newTimeBoundaryConfig))
+ || (oldTimeBoundaryConfig == null && newTimeBoundaryConfig != null)) {
+ sendLogicalTableConfigRefreshMessage(logicalTableConfig.getTableName());
+ }
+
LOGGER.info("Updated logical table {}: Successfully updated table",
tableName);
}
@@ -3172,6 +3182,27 @@ public class PinotHelixResourceManager {
}
}
+ private void sendLogicalTableConfigRefreshMessage(String logicalTableName) {
+ LogicalTableConfigRefreshMessage refreshMessage = new
LogicalTableConfigRefreshMessage(logicalTableName);
+
+ // Send logical table config refresh message to brokers
+ Criteria recipientCriteria = new Criteria();
+ recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ recipientCriteria.setInstanceName("%");
+ recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
+ recipientCriteria.setSessionSpecific(true);
+ recipientCriteria.setPartition(logicalTableName);
+ // Send message with no callback and infinite timeout on the recipient
+ int numMessagesSent =
+ _helixZkManager.getMessagingService().send(recipientCriteria,
refreshMessage, null, -1);
+ if (numMessagesSent > 0) {
+ LOGGER.info("Sent {} logical table config refresh messages to brokers
for table: {}", numMessagesSent,
+ logicalTableName);
+ } else {
+ LOGGER.warn("No logical table config refresh message sent to brokers for
table: {}", logicalTableName);
+ }
+ }
+
private void sendApplicationQpsQuotaRefreshMessage(String appName) {
ApplicationQpsQuotaRefreshMessage message = new
ApplicationQpsQuotaRefreshMessage(appName);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
index ced4a3f6c9..3458777e8d 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
@@ -31,6 +31,7 @@ import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
@@ -333,6 +334,59 @@ public class PinotLogicalTableResourceTest extends
ControllerTest {
throwable.getMessage());
}
+ public void testLogicalTableTimeBoundaryConfigValidation()
+ throws IOException {
+ // Test logical table time boundary strategy validation
+ List<String> physicalTableNamesWithType =
createHybridTables(List.of("test_table_8"));
+ LogicalTableConfig logicalTableConfig =
+ getDummyLogicalTableConfig(LOGICAL_TABLE_NAME,
physicalTableNamesWithType, BROKER_TENANT);
+
+ // Test logical table with no time boundary config
+ logicalTableConfig.setTimeBoundaryConfig(null);
+ Throwable throwable = expectThrows(IOException.class, () -> {
+ ControllerTest.sendPostRequest(_addLogicalTableUrl,
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+ });
+ assertTrue(throwable.getMessage()
+ .contains("Reason: 'timeBoundaryConfig' should not be null for
hybrid logical tables"),
+ throwable.getMessage());
+
+ // Test logical table with time boundary config but null strategy
+ logicalTableConfig.setTimeBoundaryConfig(new TimeBoundaryConfig(null,
null));
+ throwable = expectThrows(IOException.class, () -> {
+ ControllerTest.sendPostRequest(_addLogicalTableUrl,
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+ });
+ assertTrue(throwable.getMessage()
+ .contains("Reason: 'timeBoundaryConfig.strategy' should not be
null or empty for hybrid logical tables"),
+ throwable.getMessage());
+
+ // Test logical table with time boundary config but empty strategy
+ logicalTableConfig.setTimeBoundaryConfig(new TimeBoundaryConfig("", null));
+ throwable = expectThrows(IOException.class, () -> {
+ ControllerTest.sendPostRequest(_addLogicalTableUrl,
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+ });
+ assertTrue(throwable.getMessage()
+ .contains("Reason: 'timeBoundaryConfig.strategy' should not be
null or empty for hybrid logical tables"),
+ throwable.getMessage());
+
+ // Test logical table with time boundary config but null parameters
+ logicalTableConfig.setTimeBoundaryConfig(new TimeBoundaryConfig("min",
null));
+ throwable = expectThrows(IOException.class, () -> {
+ ControllerTest.sendPostRequest(_addLogicalTableUrl,
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+ });
+ assertTrue(throwable.getMessage()
+ .contains("Reason: 'timeBoundaryConfig.parameters' should not be
null or empty for hybrid logical tables"),
+ throwable.getMessage());
+
+ // Test logical table with time boundary config but empty parameters
+ logicalTableConfig.setTimeBoundaryConfig(new TimeBoundaryConfig("min",
Map.of()));
+ throwable = expectThrows(IOException.class, () -> {
+ ControllerTest.sendPostRequest(_addLogicalTableUrl,
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+ });
+ assertTrue(throwable.getMessage()
+ .contains("Reason: 'timeBoundaryConfig.parameters' should not be
null or empty for hybrid logical tables"),
+ throwable.getMessage());
+ }
+
@Test
public void testLogicalTableWithSameNameNotAllowed()
throws IOException {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 9f775d3201..38c29dfeb7 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -81,6 +81,7 @@ import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.PhysicalTableConfig;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
@@ -408,6 +409,7 @@ public class ControllerTest {
.setRefRealtimeTableName(realtimeTableName)
.setQuotaConfig(new QuotaConfig(null, "999"))
.setQueryConfig(new QueryConfig(1L, true, false, null, 1L, 1L))
+ .setTimeBoundaryConfig(new TimeBoundaryConfig("min",
Map.of("includedTables", physicalTableNames)))
.setPhysicalTableConfigMap(physicalTableConfigMap);
return builder.build();
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
index 7405a00ce5..3f2bffd888 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
@@ -24,9 +24,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.controller.helix.ControllerRequestClient;
import org.apache.pinot.controller.helix.ControllerTest;
@@ -39,7 +41,9 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.PhysicalTableConfig;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -63,8 +67,8 @@ public abstract class BaseLogicalTableIntegrationTest extends
BaseClusterIntegra
private static final String DEFAULT_TENANT = "DefaultTenant";
private static final String DEFAULT_LOGICAL_TABLE_NAME = "mytable";
protected static final String DEFAULT_TABLE_NAME = "physicalTable";
- private static final int NUM_OFFLINE_SEGMENTS = 12;
protected static BaseLogicalTableIntegrationTest _sharedClusterTestSuite =
null;
+ protected List<File> _avroFiles;
@BeforeSuite
public void setUpSuite()
@@ -99,11 +103,6 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
LOGGER.info("Finished tearing down integration test suite");
}
- @Override
- protected String getTableName() {
- return DEFAULT_TABLE_NAME;
- }
-
@BeforeClass
public void setUp()
throws Exception {
@@ -113,10 +112,12 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
_helixResourceManager = _sharedClusterTestSuite._helixResourceManager;
}
- List<File> avroFiles = getAllAvroFiles();
- int numSegmentsPerTable = NUM_OFFLINE_SEGMENTS /
getOfflineTableNames().size();
- int index = 0;
- for (String tableName : getOfflineTableNames()) {
+ _avroFiles = getAllAvroFiles();
+ Map<String, List<File>> offlineTableDataFiles = getOfflineTableDataFiles();
+ for (Map.Entry<String, List<File>> entry :
offlineTableDataFiles.entrySet()) {
+ String tableName = entry.getKey();
+ List<File> avroFilesForTable = entry.getValue();
+
File tarDir = new File(_tarDir, tableName);
TestUtils.ensureDirectoriesExistAndEmpty(tarDir);
@@ -128,25 +129,37 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
TableConfig offlineTableConfig = createOfflineTableConfig(tableName);
addTableConfig(offlineTableConfig);
- List<File> offlineAvroFiles = new ArrayList<>(numSegmentsPerTable);
- for (int i = index; i < index + numSegmentsPerTable; i++) {
- offlineAvroFiles.add(avroFiles.get(i));
- }
- index += numSegmentsPerTable;
-
// Create and upload segments
- ClusterIntegrationTestUtils.buildSegmentsFromAvro(offlineAvroFiles,
offlineTableConfig, schema, 0, _segmentDir,
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFilesForTable,
offlineTableConfig, schema, 0, _segmentDir,
tarDir);
uploadSegments(tableName, tarDir);
}
+ // create realtime table
+ Map<String, List<File>> realtimeTableDataFiles =
getRealtimeTableDataFiles();
+ for (Map.Entry<String, List<File>> entry :
realtimeTableDataFiles.entrySet()) {
+ String tableName = entry.getKey();
+ List<File> avroFilesForTable = entry.getValue();
+ // create and upload the schema and table config
+ Schema schema = createSchema(getSchemaFileName());
+ schema.setSchemaName(tableName);
+ addSchema(schema);
+
+ TableConfig realtimeTableConfig =
createRealtimeTableConfig(avroFilesForTable.get(0));
+ realtimeTableConfig.setTableName(tableName);
+ addTableConfig(realtimeTableConfig);
+
+ // push avro files into kafka
+ pushAvroIntoKafka(avroFilesForTable);
+ }
+
createLogicalTable();
// Set up the H2 connection
- setUpH2Connection(avroFiles);
+ setUpH2Connection(_avroFiles);
// Initialize the query generator
- setUpQueryGenerator(avroFiles);
+ setUpQueryGenerator(_avroFiles);
// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);
@@ -158,11 +171,70 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
cleanup();
}
- protected abstract List<String> getOfflineTableNames();
+ protected List<String> getOfflineTableNames() {
+ return List.of();
+ }
+
+ protected List<String> getRealtimeTableNames() {
+ return List.of();
+ }
+
+ protected Map<String, List<File>> getOfflineTableDataFiles() {
+ List<String> offlineTableNames = getOfflineTableNames();
+ return !offlineTableNames.isEmpty() ?
distributeFilesToTables(offlineTableNames, _avroFiles) : Map.of();
+ }
+
+ protected Map<String, List<File>> getRealtimeTableDataFiles() {
+ List<String> realtimeTableNames = getRealtimeTableNames();
+ return !realtimeTableNames.isEmpty() ?
distributeFilesToTables(realtimeTableNames, _avroFiles) : Map.of();
+ }
+
+ protected Map<String, List<File>> distributeFilesToTables(List<String>
tableNames, List<File> avroFiles) {
+ Map<String, List<File>> tableNameToFilesMap = new HashMap<>();
+
+ // Initialize the map with empty lists for each table name
+ tableNames.forEach(table -> tableNameToFilesMap.put(table, new
ArrayList<>()));
+
+ // Round-robin distribution of files to table names
+ for (int i = 0; i < avroFiles.size(); i++) {
+ String tableName = tableNames.get(i % tableNames.size());
+ tableNameToFilesMap.get(tableName).add(avroFiles.get(i));
+ }
+ return tableNameToFilesMap;
+ }
+
+ private List<String> getTimeBoundaryTable() {
+ String timeBoundaryTable = null;
+ long maxEndTimeMillis = Long.MIN_VALUE;
+ try {
+ for (String tableName : getOfflineTableNames()) {
+ String url =
_controllerRequestURLBuilder.forSegmentMetadata(tableName, TableType.OFFLINE);
+ String response = ControllerTest.sendGetRequest(url);
+ JsonNode jsonNode = JsonUtils.stringToJsonNode(response);
+ Iterator<String> stringIterator = jsonNode.fieldNames();
+ while (stringIterator.hasNext()) {
+ String segmentName = stringIterator.next();
+ JsonNode segmentJsonNode = jsonNode.get(segmentName);
+ long endTimeMillis = segmentJsonNode.get("endTimeMillis").asLong();
+ if (endTimeMillis > maxEndTimeMillis) {
+ maxEndTimeMillis = endTimeMillis;
+ timeBoundaryTable = tableName;
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to get the time boundary table", e);
+ }
+ return timeBoundaryTable != null ?
List.of(TableNameBuilder.OFFLINE.tableNameWithType(timeBoundaryTable))
+ : List.of();
+ }
protected List<String> getPhysicalTableNames() {
- return
getOfflineTableNames().stream().map(TableNameBuilder.OFFLINE::tableNameWithType)
+ List<String> offlineTableNames =
getOfflineTableNames().stream().map(TableNameBuilder.OFFLINE::tableNameWithType)
.collect(Collectors.toList());
+ List<String> realtimeTableNames = getRealtimeTableNames().stream()
+
.map(TableNameBuilder.REALTIME::tableNameWithType).collect(Collectors.toList());
+ return Stream.concat(offlineTableNames.stream(),
realtimeTableNames.stream()).collect(Collectors.toList());
}
protected String getLogicalTableName() {
@@ -205,7 +277,7 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
// @formatter:on
}
- public static LogicalTableConfig getLogicalTableConfig(String tableName,
List<String> physicalTableNames,
+ public LogicalTableConfig getLogicalTableConfig(String tableName,
List<String> physicalTableNames,
String brokerTenant) {
Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
for (String physicalTableName : physicalTableNames) {
@@ -216,9 +288,16 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
String realtimeTableName =
physicalTableNames.stream().filter(TableNameBuilder::isRealtimeTableResource).findFirst().orElse(null);
LogicalTableConfigBuilder builder =
- new
LogicalTableConfigBuilder().setTableName(tableName).setBrokerTenant(brokerTenant)
-
.setRefOfflineTableName(offlineTableName).setRefRealtimeTableName(realtimeTableName)
+ new LogicalTableConfigBuilder().setTableName(tableName)
+ .setBrokerTenant(brokerTenant)
+ .setRefOfflineTableName(offlineTableName)
+ .setRefRealtimeTableName(realtimeTableName)
.setPhysicalTableConfigMap(physicalTableConfigMap);
+ if (!getOfflineTableNames().isEmpty() &&
!getRealtimeTableNames().isEmpty()) {
+ builder.setTimeBoundaryConfig(
+ new TimeBoundaryConfig("min", Map.of("includedTables",
getTimeBoundaryTable()))
+ );
+ }
return builder.build();
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithOneOfflineOneRealtimeTableIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithOneOfflineOneRealtimeTableIntegrationTest.java
new file mode 100644
index 0000000000..d6f599ce1b
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithOneOfflineOneRealtimeTableIntegrationTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.logicaltable;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+
+public class LogicalTableWithOneOfflineOneRealtimeTableIntegrationTest extends
BaseLogicalTableIntegrationTest {
+
+ @Override
+ protected List<String> getOfflineTableNames() {
+ return List.of("o_1");
+ }
+
+ @Override
+ protected List<String> getRealtimeTableNames() {
+ return List.of("r_1");
+ }
+
+ @Override
+ protected Map<String, List<File>> getRealtimeTableDataFiles() {
+ // Overlapping data files for the hybrid table
+ return distributeFilesToTables(getRealtimeTableNames(),
+ _avroFiles.subList(_avroFiles.size() - 4, _avroFiles.size()));
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithOneRealtimeTableIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithOneRealtimeTableIntegrationTest.java
new file mode 100644
index 0000000000..dee7e1f59c
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithOneRealtimeTableIntegrationTest.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.logicaltable;
+
+import java.util.List;
+
+
+public class LogicalTableWithOneRealtimeTableIntegrationTest extends
BaseLogicalTableIntegrationTest {
+ @Override
+ protected List<String> getRealtimeTableNames() {
+ return List.of("r_1");
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwelveOfflineOneRealtimeTableIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwelveOfflineOneRealtimeTableIntegrationTest.java
new file mode 100644
index 0000000000..96eb11e2fe
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwelveOfflineOneRealtimeTableIntegrationTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.logicaltable;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+
+public class LogicalTableWithTwelveOfflineOneRealtimeTableIntegrationTest
extends BaseLogicalTableIntegrationTest {
+ @Override
+ protected List<String> getOfflineTableNames() {
+ return List.of("o_1", "o_2", "o_3", "o_4", "o_5", "o_6", "o_7", "o_8",
"o_9", "o_10", "o_11", "o_12");
+ }
+
+ @Override
+ protected List<String> getRealtimeTableNames() {
+ return List.of("r_1");
+ }
+
+ @Override
+ protected Map<String, List<File>> getRealtimeTableDataFiles() {
+ // Overlapping data files for the hybrid table
+ return distributeFilesToTables(getRealtimeTableNames(),
+ _avroFiles.subList(_avroFiles.size() - 2, _avroFiles.size()));
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
new file mode 100644
index 0000000000..d406b2ad56
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.logicaltable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.Test;
+
+
+public class LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest extends
BaseLogicalTableIntegrationTest {
+ @Override
+ protected List<String> getOfflineTableNames() {
+ return List.of("o_1", "o_2");
+ }
+
+ @Override
+ protected List<String> getRealtimeTableNames() {
+ return List.of("r_1");
+ }
+
+ @Override
+ protected Map<String, List<File>> getRealtimeTableDataFiles() {
+ // Overlapping data files for the hybrid table
+ return distributeFilesToTables(getRealtimeTableNames(),
+ _avroFiles.subList(_avroFiles.size() - 4, _avroFiles.size()));
+ }
+
+ @Test
+ public void testUpdateLogicalTableTimeBoundary()
+ throws Exception {
+ LogicalTableConfig logicalTableConfig =
getLogicalTableConfig(getLogicalTableName());
+ updateTimeBoundaryTableInLogicalTable(logicalTableConfig);
+
+ // Wait to ensure logical table config update helix message is processed
in broker
+ waitForAllDocsLoaded(5_000);
+
+ // Run the tests
+ testGeneratedQueries();
+ testHardcodedQueries();
+ testQueriesFromQueryFile();
+ }
+
+ private void updateTimeBoundaryTableInLogicalTable(LogicalTableConfig
logicalTableConfig)
+ throws IOException {
+ List<String> includedTables =
+ (List<String>)
logicalTableConfig.getTimeBoundaryConfig().getParameters().get("includedTables");
+
+ String timeBoundaryTableName =
TableNameBuilder.extractRawTableName(includedTables.get(0));
+ String newTimeBoundaryTableName = timeBoundaryTableName.equals("o_1") ?
"o_2" : "o_1";
+ newTimeBoundaryTableName =
TableNameBuilder.OFFLINE.tableNameWithType(newTimeBoundaryTableName);
+
+ Map<String, Object> parameters = Map.of("includedTables",
List.of(newTimeBoundaryTableName));
+ logicalTableConfig.getTimeBoundaryConfig().setParameters(parameters);
+
+ updateLogicalTableConfig(logicalTableConfig.getTableName(),
logicalTableConfig);
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java
index 60ddd345ae..fe3937b8b0 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java
@@ -26,6 +26,8 @@ import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.TableRouteInfo;
+import org.apache.pinot.query.timeboundary.TimeBoundaryStrategy;
+import org.apache.pinot.query.timeboundary.TimeBoundaryStrategyService;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.LogicalTableConfig;
@@ -78,6 +80,20 @@ public class LogicalTableRouteProvider implements
TableRouteProvider {
routeInfo.setRealtimeTableConfig(realtimeTableConfig);
}
routeInfo.setQueryConfig(logicalTable.getQueryConfig());
+
+ TimeBoundaryInfo timeBoundaryInfo;
+ if (!offlineTables.isEmpty() && !realtimeTables.isEmpty()) {
+ String boundaryStrategy =
logicalTable.getTimeBoundaryConfig().getBoundaryStrategy();
+ TimeBoundaryStrategy timeBoundaryStrategy =
+
TimeBoundaryStrategyService.getInstance().getTimeBoundaryStrategy(boundaryStrategy);
+ timeBoundaryInfo =
timeBoundaryStrategy.computeTimeBoundary(logicalTable, tableCache,
routingManager);
+ if (timeBoundaryInfo == null) {
+ LOGGER.info("No time boundary info found for logical hybrid table: ");
+ routeInfo.setOfflineTables(null);
+ } else {
+ routeInfo.setTimeBoundaryInfo(timeBoundaryInfo);
+ }
+ }
return routeInfo;
}
@@ -111,17 +127,6 @@ public class LogicalTableRouteProvider implements
TableRouteProvider {
}
}
- TimeBoundaryInfo timeBoundaryInfo = null;
- if (routeInfo.hasRealtime() && routeInfo.hasOffline()) {
- timeBoundaryInfo =
routingManager.getTimeBoundaryInfo(routeInfo.getOfflineTables().get(0).getOfflineTableName());
- if (timeBoundaryInfo == null) {
- LOGGER.debug("No time boundary info found for hybrid table: ");
- routeInfo.setOfflineTables(null);
- } else {
- routeInfo.setTimeBoundaryInfo(timeBoundaryInfo);
- }
- }
-
//Set BrokerRequests to NULL if there is no route.
if (routeInfo.getOfflineExecutionServers().isEmpty()) {
routeInfo.setOfflineBrokerRequest(null);
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
new file mode 100644
index 0000000000..5d3c98596b
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.timeboundary;
+
+import com.google.auto.service.AutoService;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+
+@AutoService(TimeBoundaryStrategy.class)
+public class MinTimeBoundaryStrategy implements TimeBoundaryStrategy {
+
+ @Override
+ public String getName() {
+ return "min";
+ }
+
+ @Override
+ public TimeBoundaryInfo computeTimeBoundary(LogicalTableConfig
logicalTableConfig, TableCache tableCache,
+ RoutingManager routingManager) {
+ TimeBoundaryInfo minTimeBoundaryInfo = null;
+ long minTimeBoundary = Long.MAX_VALUE;
+ Map<String, Object> parameters =
logicalTableConfig.getTimeBoundaryConfig().getParameters();
+ List<String> includedTables =
+ parameters != null ? (List) parameters.getOrDefault("includedTables",
List.of()) : List.of();
+ for (String physicalTableName : includedTables) {
+ TimeBoundaryInfo current =
routingManager.getTimeBoundaryInfo(physicalTableName);
+ if (current != null) {
+ String rawTableName =
TableNameBuilder.extractRawTableName(physicalTableName);
+ Schema schema = tableCache.getSchema(rawTableName);
+ TableConfig tableConfig = tableCache.getTableConfig(physicalTableName);
+ Preconditions.checkArgument(tableConfig != null,
+ "Table config not found for table: %s", physicalTableName);
+ Preconditions.checkArgument(schema != null,
+ "Schema not found for table: %s", physicalTableName);
+ String timeColumnName =
tableConfig.getValidationConfig().getTimeColumnName();
+ DateTimeFieldSpec dateTimeFieldSpec =
schema.getSpecForTimeColumn(timeColumnName);
+ Preconditions.checkArgument(dateTimeFieldSpec != null,
+ "Time column not found in schema for table: %s",
physicalTableName);
+ DateTimeFormatSpec specFormatSpec = dateTimeFieldSpec.getFormatSpec();
+ long currentTimeBoundaryMillis =
specFormatSpec.fromFormatToMillis(current.getTimeValue());
+ if (minTimeBoundaryInfo == null) {
+ minTimeBoundaryInfo = current;
+ minTimeBoundary = currentTimeBoundaryMillis;
+ } else if (minTimeBoundary > currentTimeBoundaryMillis) {
+ minTimeBoundaryInfo = current;
+ minTimeBoundary = currentTimeBoundaryMillis;
+ }
+ }
+ }
+ return minTimeBoundaryInfo;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
new file mode 100644
index 0000000000..c1b97f28c5
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.timeboundary;
+
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+
+
+public interface TimeBoundaryStrategy {
+
+ /**
+ * Returns the time boundary strategy name.
+ *
+ * @return The time boundary strategy name.
+ */
+ String getName();
+
+ /**
+ * Computes the time boundary for the given physical table names.
+ *
+ * @param logicalTableConfig The logical table configuration.
+ * @param tableCache The table cache to use for fetching table metadata.
+ * @param routingManager The routing manager to use for computing the time
boundary.
+ * @return The computed time boundary information.
+ */
+ TimeBoundaryInfo computeTimeBoundary(LogicalTableConfig logicalTableConfig,
TableCache tableCache,
+ RoutingManager routingManager);
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategyService.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategyService.java
new file mode 100644
index 0000000000..0e3f6af4aa
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategyService.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.timeboundary;
+
+import java.util.Map;
+import java.util.ServiceLoader;
+
+
+public class TimeBoundaryStrategyService {
+
+ private static volatile TimeBoundaryStrategyService _instance =
fromServiceLoader();
+ private final Map<String, TimeBoundaryStrategy> _strategyMap;
+
+ private TimeBoundaryStrategyService(Map<String, TimeBoundaryStrategy>
strategyMap) {
+ _strategyMap = strategyMap;
+ }
+
+ public static TimeBoundaryStrategyService fromServiceLoader() {
+ Map<String, TimeBoundaryStrategy> strategyMap = new java.util.HashMap<>();
+ for (TimeBoundaryStrategy strategy :
ServiceLoader.load(TimeBoundaryStrategy.class)) {
+ String strategyName = strategy.getName();
+ if (strategyMap.containsKey(strategyName)) {
+ throw new IllegalStateException("Duplicate TimeBoundaryStrategy found:
" + strategyName);
+ }
+ strategyMap.put(strategyName, strategy);
+ }
+ return new TimeBoundaryStrategyService(strategyMap);
+ }
+
+ /**
+ * Returns the singleton instance of the TimeBoundaryStrategyService.
+ *
+ * @return The singleton instance of the TimeBoundaryStrategyService.
+ */
+ public static TimeBoundaryStrategyService getInstance() {
+ return _instance;
+ }
+
+ /**
+ * Sets the singleton instance of the TimeBoundaryStrategyService.
+ *
+ * @param service The new instance to set.
+ */
+ public static void setInstance(TimeBoundaryStrategyService service) {
+ _instance = service;
+ }
+
+ public TimeBoundaryStrategy getTimeBoundaryStrategy(String name) {
+ TimeBoundaryStrategy strategy = _instance._strategyMap.get(name);
+ if (strategy == null) {
+ throw new IllegalArgumentException("No TimeBoundaryStrategy found for
name: " + name);
+ }
+ return strategy;
+ }
+}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java
index dd8d206b5a..1c08dca4ae 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java
@@ -31,22 +31,31 @@ import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.routing.ServerRouteInfo;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.TableRouteInfo;
import org.apache.pinot.query.testutils.MockRoutingManagerFactory;
+import org.apache.pinot.query.timeboundary.TimeBoundaryStrategy;
+import org.apache.pinot.query.timeboundary.TimeBoundaryStrategyService;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.PhysicalTableConfig;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.mockito.MockedStatic;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -81,6 +90,7 @@ public class BaseTableRouteTest {
public static final Map<String, Schema> TABLE_SCHEMAS = new HashMap<>();
private static final Set<String> DISABLED_TABLES = new HashSet<>();
+
static {
TABLE_SCHEMAS.put("a_REALTIME", getSchemaBuilder("a").build());
TABLE_SCHEMAS.put("b_OFFLINE", getSchemaBuilder("b").build());
@@ -125,7 +135,8 @@ public class BaseTableRouteTest {
TableCache _tableCache;
ImplicitHybridTableRouteProvider _hybridTableRouteProvider;
LogicalTableRouteProvider _logicalTableRouteProvider;
-
+ TimeBoundaryStrategy _timeBoundaryStrategy;
+ MockedStatic<TimeBoundaryStrategyService>
_timeBoundaryStrategyFactoryMockedStatic;
@BeforeClass
public void setUp() {
@@ -154,6 +165,17 @@ public class BaseTableRouteTest {
_tableCache = factory.buildTableCache();
_hybridTableRouteProvider = new ImplicitHybridTableRouteProvider();
_logicalTableRouteProvider = new LogicalTableRouteProvider();
+ _timeBoundaryStrategyFactoryMockedStatic =
mockStatic(TimeBoundaryStrategyService.class);
+ _timeBoundaryStrategy = mock(TimeBoundaryStrategy.class);
+ TimeBoundaryStrategyService mockService =
mock(TimeBoundaryStrategyService.class);
+ when(TimeBoundaryStrategyService.getInstance()).thenReturn(mockService);
+
when(mockService.getTimeBoundaryStrategy(any())).thenReturn(_timeBoundaryStrategy);
+ when(_timeBoundaryStrategy.computeTimeBoundary(any(), any(),
any())).thenReturn(mock(TimeBoundaryInfo.class));
+ }
+
+ @AfterClass
+ public void tearDown() {
+ _timeBoundaryStrategyFactoryMockedStatic.close();
}
@DataProvider(name = "offlineTableProvider")
@@ -413,6 +435,8 @@ public class BaseTableRouteTest {
builder.setPhysicalTableConfigMap(tableConfigMap);
builder.setBrokerTenant("brokerTenant");
+ builder.setTimeBoundaryConfig(
+ new TimeBoundaryConfig("min", Map.of("includedTables",
List.of("randomTable_OFFLINE"))));
LogicalTableConfig logicalTable = builder.build();
when(_tableCache.getLogicalTableConfig(eq(logicalTableName))).thenReturn(logicalTable);
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderCalculateRouteTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderCalculateRouteTest.java
index f7280b3319..f76a3bfdb9 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderCalculateRouteTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderCalculateRouteTest.java
@@ -33,6 +33,7 @@ import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -95,6 +96,12 @@ public class LogicalTableRouteProviderCalculateRouteTest
extends BaseTableRouteT
assertNotNull(requestMap);
assertFalse(requestMap.isEmpty());
}
+
+ if (routeInfo.isHybrid()) {
+ assertNotNull(routeInfo.getTimeBoundaryInfo(), "Time boundary info
should not be null for hybrid table");
+ } else {
+ assertNull(routeInfo.getTimeBoundaryInfo(), "Time boundary info should
be null for non-hybrid table");
+ }
}
@Test(dataProvider = "offlineTableAndRouteProvider")
@@ -107,6 +114,13 @@ public class LogicalTableRouteProviderCalculateRouteTest
extends BaseTableRouteT
assertTableRoute(tableName, "realtimeTableAndRouteProvider", null,
expectedRoutingTable, false, true);
}
+ @Test(dataProvider = "hybridTableAndRouteProvider")
+ void testHybridTableRoute(String tableName, Map<String, Set<String>>
expectedOfflineRoutingTable,
+ Map<String, Set<String>> expectedRealtimeRoutingTable) {
+ assertTableRoute(tableName, "hybridTableAndRouteProvider",
expectedOfflineRoutingTable,
+ expectedRealtimeRoutingTable, expectedOfflineRoutingTable != null,
expectedRealtimeRoutingTable != null);
+ }
+
@Test(dataProvider = "routeNotExistsProvider")
void testRouteNotExists(String tableName) {
assertTableRoute(tableName, "routeNotExistsProvider", null, null, false,
false);
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderGetRouteTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderGetRouteTest.java
index e4417a9daa..d9d0a9fa51 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderGetRouteTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderGetRouteTest.java
@@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.pinot.core.transport.TableRouteInfo;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.PhysicalTableConfig;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -68,6 +69,7 @@ public class LogicalTableRouteProviderGetRouteTest extends
BaseTableRouteTest {
TableRouteInfo routeInfo = getLogicalTableRouteInfo(tableName,
"testOfflineTable");
assertTrue(routeInfo.isExists(), "The table should exist");
assertTrue(routeInfo.isOffline(), "The table should be offline");
+ assertNull(routeInfo.getTimeBoundaryInfo(), "The table should not have
time boundary info");
}
@Test(dataProvider = "realtimeTableProvider")
@@ -75,6 +77,7 @@ public class LogicalTableRouteProviderGetRouteTest extends
BaseTableRouteTest {
TableRouteInfo routeInfo = getLogicalTableRouteInfo(tableName,
"testRealtimeTable");
assertTrue(routeInfo.isExists(), "The table should exist");
assertTrue(routeInfo.isRealtime(), "The table should be realtime");
+ assertNull(routeInfo.getTimeBoundaryInfo(), "The table should not have
time boundary info");
}
@Test(dataProvider = "hybridTableProvider")
@@ -82,6 +85,7 @@ public class LogicalTableRouteProviderGetRouteTest extends
BaseTableRouteTest {
TableRouteInfo routeInfo = getLogicalTableRouteInfo(tableName,
"testHybridTable");
assertTrue(routeInfo.isExists(), "The table should exist");
assertTrue(routeInfo.isHybrid(), "The table should be hybrid");
+ assertNotNull(routeInfo.getTimeBoundaryInfo(), "The table should have time
boundary info");
}
@Test(dataProvider = "routeExistsProvider")
@@ -216,6 +220,7 @@ public class LogicalTableRouteProviderGetRouteTest extends
BaseTableRouteTest {
assertTrue(routeInfo.isRouteExists());
assertFalse(routeInfo.isDisabled());
assertNull(routeInfo.getDisabledTableNames());
+ assertNotNull(routeInfo.getTimeBoundaryInfo());
}
@Test(dataProvider = "disabledTableProvider")
@@ -259,6 +264,7 @@ public class LogicalTableRouteProviderGetRouteTest extends
BaseTableRouteTest {
}
logicalTable.setPhysicalTableConfigMap(tableConfigMap);
logicalTable.setBrokerTenant("brokerTenant");
+ logicalTable.setTimeBoundaryConfig(new TimeBoundaryConfig("min",
Map.of("includedTables", physicalTableNames)));
when(_tableCache.getLogicalTableConfig(eq(logicalTableName))).thenReturn(logicalTable);
TableRouteInfo routeInfo =
@@ -297,6 +303,7 @@ public class LogicalTableRouteProviderGetRouteTest extends
BaseTableRouteTest {
}
logicalTable.setPhysicalTableConfigMap(tableConfigMap);
logicalTable.setBrokerTenant("brokerTenant");
+ logicalTable.setTimeBoundaryConfig(new TimeBoundaryConfig("min",
Map.of("includedTables", physicalTableNames)));
when(_tableCache.getLogicalTableConfig(eq(logicalTableName))).thenReturn(logicalTable);
TableRouteInfo routeInfo =
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategyTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategyTest.java
new file mode 100644
index 0000000000..464ff805d9
--- /dev/null
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategyTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.timeboundary;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
+import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertSame;
+
+
+public class MinTimeBoundaryStrategyTest {
+
+ TableCache _mockTableCache;
+ RoutingManager _mockRoutingManager;
+ TimeBoundaryStrategy _minTimeBoundaryStrategy = new
MinTimeBoundaryStrategy();
+
+ public void setupMocks(Map<String, TimeBoundaryInfo> data) {
+ for (String tableName : data.keySet()) {
+ if (TableNameBuilder.isRealtimeTableResource(tableName)) {
+ continue;
+ }
+ TimeBoundaryInfo timeBoundaryInfo = data.get(tableName);
+
when(_mockRoutingManager.getTimeBoundaryInfo(tableName)).thenReturn(timeBoundaryInfo);
+ Schema schema = mock(Schema.class);
+ TableConfig tableConfig = mock(TableConfig.class);
+ SegmentsValidationAndRetentionConfig validationMock =
mock(SegmentsValidationAndRetentionConfig.class);
+ DateTimeFieldSpec dateTimeFieldSpec = mock(DateTimeFieldSpec.class);
+ DateTimeFormatSpec dateTimeFormatSpec = mock(DateTimeFormatSpec.class);
+
+
when(_mockTableCache.getSchema(TableNameBuilder.extractRawTableName(tableName))).thenReturn(schema);
+ when(_mockTableCache.getTableConfig(tableName)).thenReturn(tableConfig);
+ when(tableConfig.getValidationConfig()).thenReturn(validationMock);
+
when(validationMock.getTimeColumnName()).thenReturn(timeBoundaryInfo.getTimeColumn());
+
when(schema.getSpecForTimeColumn(timeBoundaryInfo.getTimeColumn())).thenReturn(dateTimeFieldSpec);
+ when(dateTimeFieldSpec.getFormatSpec()).thenReturn(dateTimeFormatSpec);
+
when(dateTimeFormatSpec.fromFormatToMillis(any())).thenReturn(Long.valueOf(timeBoundaryInfo.getTimeValue()));
+ }
+ }
+
+
+ @DataProvider
+ public Object[][] timeBoundaryData() {
+ Map<String, TimeBoundaryInfo> timeBoundaryInfoMap = Map.of(
+ "table1_OFFLINE", new TimeBoundaryInfo("timeColumn1", "1747134822000"),
+ "table2_OFFLINE", new TimeBoundaryInfo("timeColumn2", "1747134844000"),
+ "table3_OFFLINE", new TimeBoundaryInfo("timeColumn3", "1747134866000"),
+ "table4_OFFLINE", new TimeBoundaryInfo("timeColumn4", "1747134888000"),
+ "table5_REALTIME", new TimeBoundaryInfo("timeColumn5", "1747134900000")
+ );
+
+ return new Object[][]{
+ {timeBoundaryInfoMap, List.of("table3_OFFLINE"), "table3_OFFLINE"},
+ {timeBoundaryInfoMap, List.of("Invalid_OFFLINE"), "Invalid_OFFLINE"},
+ {timeBoundaryInfoMap, List.of("table2_OFFLINE", "table3_OFFLINE"),
"table2_OFFLINE"},
+ {timeBoundaryInfoMap, List.of("table3_OFFLINE", "table2_OFFLINE",
"table4_OFFLINE"), "table2_OFFLINE"},
+ {timeBoundaryInfoMap, List.of(), "empty_includedTables_OFFLINE"}
+ };
+ }
+
+
+ @Test(dataProvider = "timeBoundaryData")
+ public void testComputeTimeBoundary(Map<String, TimeBoundaryInfo>
timeBoundaryInfoMap,
+ List<String> includedTables, String expectedTableName) {
+ Map<String, Object> parameters = Map.of("includedTables", includedTables);
+ testComputeTimeBoundary(timeBoundaryInfoMap, expectedTableName,
parameters);
+ }
+
+ private void testComputeTimeBoundary(Map<String, TimeBoundaryInfo>
timeBoundaryInfoMap, String expectedTableName,
+ Map<String, Object> parameters) {
+ setupMocks(timeBoundaryInfoMap);
+ TimeBoundaryInfo timeBoundaryInfo =
_minTimeBoundaryStrategy.computeTimeBoundary(
+ createLogicalTableConfig(parameters), _mockTableCache,
_mockRoutingManager);
+ assertSame(timeBoundaryInfo, timeBoundaryInfoMap.get(expectedTableName));
+ }
+
+ @BeforeMethod
+ public void setUp() {
+ _mockTableCache = mock(TableCache.class);
+ _mockRoutingManager = mock(RoutingManager.class);
+ }
+
+ private LogicalTableConfig createLogicalTableConfig(Map<String, Object>
parameters) {
+ LogicalTableConfigBuilder builder = new LogicalTableConfigBuilder()
+ .setTableName("logical_table")
+ .setTimeBoundaryConfig(new TimeBoundaryConfig("min", parameters));
+ return builder.build();
+ }
+}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategyServiceTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategyServiceTest.java
new file mode 100644
index 0000000000..05993a437f
--- /dev/null
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategyServiceTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.timeboundary;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertTrue;
+
+
+public class TimeBoundaryStrategyServiceTest {
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "No TimeBoundaryStrategy found for
name: invalidStrategy")
+ public void testInvalidTimeBoundaryStrategy() {
+
TimeBoundaryStrategyService.getInstance().getTimeBoundaryStrategy("invalidStrategy");
+ }
+
+ @Test
+ public void testMinTimeBoundaryStrategy() {
+ TimeBoundaryStrategy timeBoundaryStrategy =
TimeBoundaryStrategyService.getInstance()
+ .getTimeBoundaryStrategy("min");
+ assertTrue(timeBoundaryStrategy instanceof MinTimeBoundaryStrategy,
"Expected MinTimeBoundaryStrategy instance");
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
index 2a427ce9dc..2c52148098 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
@@ -42,6 +42,7 @@ public class LogicalTableConfig extends BaseJsonConfig {
public static final String QUOTA_CONFIG_KEY = "quota";
public static final String REF_OFFLINE_TABLE_NAME_KEY =
"refOfflineTableName";
public static final String REF_REALTIME_TABLE_NAME_KEY =
"refRealtimeTableName";
+ public static final String TIME_BOUNDARY_CONFIG_KEY = "timeBoundaryConfig";
private String _tableName;
private String _brokerTenant;
@@ -52,6 +53,7 @@ public class LogicalTableConfig extends BaseJsonConfig {
private QuotaConfig _quotaConfig;
private String _refOfflineTableName;
private String _refRealtimeTableName;
+ private TimeBoundaryConfig _timeBoundaryConfig;
public static LogicalTableConfig fromString(String logicalTableString)
throws IOException {
@@ -119,6 +121,14 @@ public class LogicalTableConfig extends BaseJsonConfig {
_refRealtimeTableName = refRealtimeTableName;
}
+ public TimeBoundaryConfig getTimeBoundaryConfig() {
+ return _timeBoundaryConfig;
+ }
+
+ public void setTimeBoundaryConfig(TimeBoundaryConfig timeBoundaryConfig) {
+ _timeBoundaryConfig = timeBoundaryConfig;
+ }
+
private JsonNode toJsonObject() {
return DEFAULT_MAPPER.valueToTree(this);
}
@@ -141,6 +151,10 @@ public class LogicalTableConfig extends BaseJsonConfig {
}
}
+ public boolean isHybridLogicalTable() {
+ return _refOfflineTableName != null && _refRealtimeTableName != null;
+ }
+
@Override
public String toString() {
return toSingleLineJsonString();
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeBoundaryConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeBoundaryConfig.java
new file mode 100644
index 0000000000..fda81adac1
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeBoundaryConfig.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data;
+
+import java.util.Map;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+
+public class TimeBoundaryConfig extends BaseJsonConfig {
+ private String _boundaryStrategy;
+ private Map<String, Object> _parameters;
+
+ public TimeBoundaryConfig() {
+ }
+
+ public TimeBoundaryConfig(String boundaryStrategy, Map<String, Object>
parameters) {
+ _boundaryStrategy = boundaryStrategy;
+ _parameters = parameters;
+ }
+
+ public String getBoundaryStrategy() {
+ return _boundaryStrategy;
+ }
+
+ public void setBoundaryStrategy(String boundaryStrategy) {
+ _boundaryStrategy = boundaryStrategy;
+ }
+
+ public Map<String, Object> getParameters() {
+ return _parameters;
+ }
+
+ public void setParameters(Map<String, Object> parameters) {
+ _parameters = parameters;
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index eb1b7d3e17..b7c0692792 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -424,6 +424,10 @@ public class ControllerRequestURLBuilder {
return StringUtil.join("/", _baseUrl, "segments", tableName,
encode(segmentName), "metadata");
}
+ public String forSegmentMetadata(String tableName, TableType tableType) {
+ return StringUtil.join("/", _baseUrl, "segments", tableName, "metadata") +
"?type=" + tableType.name();
+ }
+
public String forListAllSegmentLineages(String tableName, String tableType) {
return StringUtil.join("/", _baseUrl, "segments", tableName,
"lineage?type=" + tableType);
}
@@ -653,4 +657,8 @@ public class ControllerRequestURLBuilder {
public String forLogicalTableDelete(String logicalTableName) {
return StringUtil.join("/", _baseUrl, "logicalTables", logicalTableName);
}
+
+ public String forTableTimeBoundary(String tableName) {
+ return StringUtil.join("/", _baseUrl, "tables", tableName, "timeBoundary");
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java
index 54ef55d1f0..1b09bc5993 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java
@@ -23,6 +23,7 @@ import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.PhysicalTableConfig;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
public class LogicalTableConfigBuilder {
@@ -33,6 +34,7 @@ public class LogicalTableConfigBuilder {
private QuotaConfig _quotaConfig;
private String _refOfflineTableName;
private String _refRealtimeTableName;
+ private TimeBoundaryConfig _timeBoundaryConfig;
public LogicalTableConfigBuilder setTableName(String tableName) {
@@ -70,6 +72,11 @@ public class LogicalTableConfigBuilder {
return this;
}
+ public LogicalTableConfigBuilder setTimeBoundaryConfig(TimeBoundaryConfig
timeBoundaryConfig) {
+ _timeBoundaryConfig = timeBoundaryConfig;
+ return this;
+ }
+
public LogicalTableConfig build() {
LogicalTableConfig logicalTableConfig = new LogicalTableConfig();
logicalTableConfig.setTableName(_tableName);
@@ -79,6 +86,7 @@ public class LogicalTableConfigBuilder {
logicalTableConfig.setQuotaConfig(_quotaConfig);
logicalTableConfig.setRefOfflineTableName(_refOfflineTableName);
logicalTableConfig.setRefRealtimeTableName(_refRealtimeTableName);
+ logicalTableConfig.setTimeBoundaryConfig(_timeBoundaryConfig);
return logicalTableConfig;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]