This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 830083827a Make TableDataManagerProvider pluggable (#14470)
830083827a is described below
commit 830083827aee3f0d9c4699698c57a46b775c9892
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Nov 19 09:35:49 2024 -0800
Make TableDataManagerProvider pluggable (#14470)
---
.../core/data/manager/InstanceDataManager.java | 5 +-
.../manager/offline/DimensionTableDataManager.java | 1 +
.../DefaultTableDataManagerProvider.java} | 24 +++---
.../manager/provider/TableDataManagerProvider.java | 50 +++++++++++++
.../realtime/RealtimeSegmentDataManagerTest.java | 9 ++-
.../executor/QueryExecutorExceptionsTest.java | 9 ++-
.../core/query/executor/QueryExecutorTest.java | 9 ++-
.../pinot/queries/ExplainPlanQueriesTest.java | 9 ++-
.../queries/SegmentWithNullValueVectorTest.java | 9 ++-
.../segment/index/loader/IndexLoadingConfig.java | 4 +-
.../org/apache/pinot/server/conf/ServerConf.java | 2 +-
.../starter/helix/HelixInstanceDataManager.java | 11 ++-
.../helix/HelixInstanceDataManagerConfig.java | 40 ++--------
.../config/instance/InstanceDataManagerConfig.java | 2 +-
.../apache/pinot/spi/utils/CommonConstants.java | 85 ++++++++++++++--------
15 files changed, 163 insertions(+), 106 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index 95a135f1e4..ffacffc897 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -24,7 +24,6 @@ import java.util.Set;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
-import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -33,6 +32,7 @@ import
org.apache.pinot.core.data.manager.realtime.SegmentUploader;
import org.apache.pinot.core.util.SegmentRefreshSemaphore;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -40,6 +40,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
* The <code>InstanceDataManager</code> class is the instance level data
manager, which manages all tables and segments
* served by the instance.
*/
[email protected]
@ThreadSafe
public interface InstanceDataManager {
@@ -49,7 +50,7 @@ public interface InstanceDataManager {
* <p>NOTE: The config is the subset of server config with prefix
'pinot.server.instance'
*/
void init(PinotConfiguration config, HelixManager helixManager,
ServerMetrics serverMetrics)
- throws ConfigurationException;
+ throws Exception;
/**
* Returns the instance id.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
index 20759934ca..6ed4edfd48 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
@@ -32,6 +32,7 @@ import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.ImmutableSegment;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
similarity index 84%
rename from
pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
rename to
pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
index 302df13509..fff6232943 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.data.manager.offline;
+package org.apache.pinot.core.data.manager.provider;
import com.google.common.cache.Cache;
import java.util.Map;
@@ -28,6 +28,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import org.apache.pinot.core.data.manager.offline.DimensionTableDataManager;
+import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.utils.SegmentLocks;
@@ -39,15 +41,16 @@ import org.apache.pinot.spi.utils.IngestionConfigUtils;
/**
- * Factory for {@link TableDataManager}.
+ * Default implementation of {@link TableDataManagerProvider}.
*/
-public class TableDataManagerProvider {
- private final InstanceDataManagerConfig _instanceDataManagerConfig;
- private final HelixManager _helixManager;
- private final SegmentLocks _segmentLocks;
- private final Semaphore _segmentBuildSemaphore;
+public class DefaultTableDataManagerProvider implements
TableDataManagerProvider {
+ private InstanceDataManagerConfig _instanceDataManagerConfig;
+ private HelixManager _helixManager;
+ private SegmentLocks _segmentLocks;
+ private Semaphore _segmentBuildSemaphore;
- public TableDataManagerProvider(InstanceDataManagerConfig
instanceDataManagerConfig, HelixManager helixManager,
+ @Override
+ public void init(InstanceDataManagerConfig instanceDataManagerConfig,
HelixManager helixManager,
SegmentLocks segmentLocks) {
_instanceDataManagerConfig = instanceDataManagerConfig;
_helixManager = helixManager;
@@ -56,10 +59,7 @@ public class TableDataManagerProvider {
_segmentBuildSemaphore = maxParallelSegmentBuilds > 0 ? new
Semaphore(maxParallelSegmentBuilds, true) : null;
}
- public TableDataManager getTableDataManager(TableConfig tableConfig) {
- return getTableDataManager(tableConfig, null, null, () -> true);
- }
-
+ @Override
public TableDataManager getTableDataManager(TableConfig tableConfig,
@Nullable ExecutorService segmentPreloadExecutor,
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
Supplier<Boolean> isServerReadyToServeQueries) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
new file mode 100644
index 0000000000..6fc27bc4ac
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.provider;
+
+import com.google.common.cache.Cache;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.utils.SegmentLocks;
+import org.apache.pinot.spi.annotations.InterfaceAudience;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+
+
+/**
+ * Factory for {@link TableDataManager}.
+ */
[email protected]
+public interface TableDataManagerProvider {
+
+ void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager
helixManager, SegmentLocks segmentLocks);
+
+ default TableDataManager getTableDataManager(TableConfig tableConfig) {
+ return getTableDataManager(tableConfig, null, null, () -> true);
+ }
+
+ TableDataManager getTableDataManager(TableConfig tableConfig, @Nullable
ExecutorService segmentPreloadExecutor,
+ @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
+ Supplier<Boolean> isServerReadyToServeQueries);
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index 910edf7d9b..f7ca4530bd 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -43,7 +43,8 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.config.TableConfigUtils;
-import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import
org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider;
+import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import
org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
@@ -779,9 +780,9 @@ public class RealtimeSegmentDataManagerTest {
InstanceDataManagerConfig instanceDataManagerConfig =
mock(InstanceDataManagerConfig.class);
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
- TableDataManager tableDataManager =
- new TableDataManagerProvider(instanceDataManagerConfig, helixManager,
new SegmentLocks()).getTableDataManager(
- tableConfig);
+ TableDataManagerProvider tableDataManagerProvider = new
DefaultTableDataManagerProvider();
+ tableDataManagerProvider.init(instanceDataManagerConfig, helixManager, new
SegmentLocks());
+ TableDataManager tableDataManager =
tableDataManagerProvider.getTableDataManager(tableConfig);
tableDataManager.start();
tableDataManager.shutDown();
Assert.assertFalse(SegmentBuildTimeLeaseExtender.isExecutorShutdown());
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
index c664c2f013..12fdda9442 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
@@ -34,7 +34,8 @@ import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.core.data.manager.InstanceDataManager;
-import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import
org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider;
+import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
@@ -133,9 +134,9 @@ public class QueryExecutorExceptionsTest {
// Mock the instance data manager
InstanceDataManagerConfig instanceDataManagerConfig =
mock(InstanceDataManagerConfig.class);
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(INDEX_DIR.getAbsolutePath());
- TableDataManager tableDataManager =
- new TableDataManagerProvider(instanceDataManagerConfig,
mock(HelixManager.class),
- new SegmentLocks()).getTableDataManager(tableConfig);
+ TableDataManagerProvider tableDataManagerProvider = new
DefaultTableDataManagerProvider();
+ tableDataManagerProvider.init(instanceDataManagerConfig,
mock(HelixManager.class), new SegmentLocks());
+ TableDataManager tableDataManager =
tableDataManagerProvider.getTableDataManager(tableConfig);
tableDataManager.start();
//we don't add index segments to the data manager to simulate
numSegmentsAcquired < numSegmentsQueried
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index 404bd3efc3..4a171128c8 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -40,7 +40,8 @@ import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.TimeSeriesContext;
import org.apache.pinot.core.data.manager.InstanceDataManager;
-import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import
org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider;
+import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock;
@@ -154,9 +155,9 @@ public class QueryExecutorTest {
// Mock the instance data manager
InstanceDataManagerConfig instanceDataManagerConfig =
mock(InstanceDataManagerConfig.class);
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
- TableDataManager tableDataManager =
- new TableDataManagerProvider(instanceDataManagerConfig,
mock(HelixManager.class),
- new SegmentLocks()).getTableDataManager(tableConfig);
+ TableDataManagerProvider tableDataManagerProvider = new
DefaultTableDataManagerProvider();
+ tableDataManagerProvider.init(instanceDataManagerConfig,
mock(HelixManager.class), new SegmentLocks());
+ TableDataManager tableDataManager =
tableDataManagerProvider.getTableDataManager(tableConfig);
tableDataManager.start();
for (ImmutableSegment indexSegment : _indexSegments) {
tableDataManager.addSegment(indexSegment);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
index dce00515d0..64d5d8e150 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
@@ -41,7 +41,8 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.ExplainPlanRows;
import org.apache.pinot.core.data.manager.InstanceDataManager;
-import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import
org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider;
+import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
@@ -276,9 +277,9 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest
{
// Mock the instance data manager
InstanceDataManagerConfig instanceDataManagerConfig =
mock(InstanceDataManagerConfig.class);
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
- TableDataManager tableDataManager =
- new TableDataManagerProvider(instanceDataManagerConfig,
mock(HelixManager.class),
- new SegmentLocks()).getTableDataManager(TABLE_CONFIG);
+ TableDataManagerProvider tableDataManagerProvider = new
DefaultTableDataManagerProvider();
+ tableDataManagerProvider.init(instanceDataManagerConfig,
mock(HelixManager.class), new SegmentLocks());
+ TableDataManager tableDataManager =
tableDataManagerProvider.getTableDataManager(TABLE_CONFIG);
tableDataManager.start();
for (IndexSegment indexSegment : _indexSegments) {
tableDataManager.addSegment((ImmutableSegment) indexSegment);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
index 3ef16168a2..04db06a140 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
@@ -36,7 +36,8 @@ import org.apache.helix.HelixManager;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.core.data.manager.InstanceDataManager;
-import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import
org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider;
+import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
import org.apache.pinot.core.query.executor.QueryExecutor;
@@ -138,9 +139,9 @@ public class SegmentWithNullValueVectorTest {
// Mock the instance data manager
InstanceDataManagerConfig instanceDataManagerConfig =
mock(InstanceDataManagerConfig.class);
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
- TableDataManager tableDataManager =
- new TableDataManagerProvider(instanceDataManagerConfig,
mock(HelixManager.class),
- new SegmentLocks()).getTableDataManager(tableConfig);
+ TableDataManagerProvider tableDataManagerProvider = new
DefaultTableDataManagerProvider();
+ tableDataManagerProvider.init(instanceDataManagerConfig,
mock(HelixManager.class), new SegmentLocks());
+ TableDataManager tableDataManager =
tableDataManagerProvider.getTableDataManager(tableConfig);
tableDataManager.start();
tableDataManager.addSegment(_segment);
_instanceDataManager = mock(InstanceDataManager.class);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
index 56558f8c3b..77512a2a38 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
@@ -42,7 +42,6 @@ import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.TimestampIndexUtils;
@@ -160,8 +159,7 @@ public class IndexLoadingConfig {
if (avgMultiValueCount != null) {
_realtimeAvgMultiValueCount = Integer.parseInt(avgMultiValueCount);
}
- _segmentStoreURI =
-
_instanceDataManagerConfig.getConfig().getProperty(CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI);
+ _segmentStoreURI = _instanceDataManagerConfig.getSegmentStoreUri();
_segmentDirectoryLoader =
_instanceDataManagerConfig.getSegmentDirectoryLoader();
Map<String, Map<String, String>> tierConfigs =
_instanceDataManagerConfig.getTierConfigs();
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
index a02df2aba6..fb6120be4b 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
@@ -105,7 +105,7 @@ public class ServerConf {
}
public String getInstanceDataManagerClassName() {
- return _serverConf.getProperty(CONFIG_OF_INSTANCE_DATA_MANAGER_CLASS,
DEFAULT_DATA_MANAGER_CLASS);
+ return _serverConf.getProperty(CONFIG_OF_INSTANCE_DATA_MANAGER_CLASS,
DEFAULT_INSTANCE_DATA_MANAGER_CLASS);
}
public double getQueryLogMaxRate() {
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 9476f27858..88fdfa1590 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -38,7 +38,6 @@ import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
-import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixManager;
@@ -49,7 +48,7 @@ import
org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.core.data.manager.InstanceDataManager;
-import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
@@ -68,6 +67,7 @@ import
org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -110,14 +110,17 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
@Override
public synchronized void init(PinotConfiguration config, HelixManager
helixManager, ServerMetrics serverMetrics)
- throws ConfigurationException {
+ throws Exception {
LOGGER.info("Initializing Helix instance data manager");
_instanceDataManagerConfig = new HelixInstanceDataManagerConfig(config);
LOGGER.info("HelixInstanceDataManagerConfig: {}",
_instanceDataManagerConfig.getConfig());
_instanceId = _instanceDataManagerConfig.getInstanceId();
_helixManager = helixManager;
- _tableDataManagerProvider = new
TableDataManagerProvider(_instanceDataManagerConfig, helixManager,
_segmentLocks);
+ String tableDataManagerProviderClass =
_instanceDataManagerConfig.getTableDataManagerProviderClass();
+ LOGGER.info("Initializing table data manager provider of class: {}",
tableDataManagerProviderClass);
+ _tableDataManagerProvider =
PluginManager.get().createInstance(tableDataManagerProviderClass);
+ _tableDataManagerProvider.init(_instanceDataManagerConfig, helixManager,
_segmentLocks);
_segmentUploader = new
PinotFSSegmentUploader(_instanceDataManagerConfig.getSegmentStoreUri(),
ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(),
serverMetrics);
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index a959e0b509..aade26f339 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -29,7 +29,6 @@ import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.CommonConstants.Server;
import org.apache.pinot.spi.utils.ReadMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,22 +45,6 @@ public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig
// Average number of values in multi-valued columns in any table in this
instance.
// This value is used to allocate initial memory for multi-valued columns in
realtime segments in consuming state.
private static final String AVERAGE_MV_COUNT =
"realtime.averageMultiValueEntriesPerRow";
- // Key of instance id
- public static final String INSTANCE_ID = "id";
- // Key of instance data directory
- public static final String INSTANCE_DATA_DIR = "dataDir";
- // Key of consumer directory
- public static final String CONSUMER_DIR = "consumerDir";
- // Key of instance segment tar directory
- public static final String INSTANCE_SEGMENT_TAR_DIR = "segmentTarDir";
- // Key of segment directory
- public static final String INSTANCE_BOOTSTRAP_SEGMENT_DIR =
"bootstrap.segment.dir";
- // Key of instance level segment read mode
- public static final String READ_MODE = "readMode";
- // Key of the segment format this server can read
- public static final String SEGMENT_FORMAT_VERSION = "segment.format.version";
- // Key of whether to enable reloading consuming segments
- public static final String INSTANCE_RELOAD_CONSUMING_SEGMENT =
"reload.consumingSegment";
// Key of segment directory loader
public static final String SEGMENT_DIRECTORY_LOADER =
"segment.directory.loader";
// Prefix for upsert config
@@ -99,13 +82,6 @@ public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig
private static final String ENABLE_STREAM_SEGMENT_DOWNLOAD_UNTAR =
"segment.stream.download.untar";
private static final boolean DEFAULT_ENABLE_STREAM_SEGMENT_DOWNLOAD_UNTAR =
false;
- // Whether memory for realtime consuming segments should be allocated
off-heap.
- private static final String REALTIME_OFFHEAP_ALLOCATION =
"realtime.alloc.offheap";
- // And whether the allocation should be direct (default is to allocate via
mmap)
- // Direct memory allocation may mean setting heap size appropriately when
starting JVM.
- // The metric ServerGauge.REALTIME_OFFHEAP_MEMORY_USED should indicate how
much memory is needed.
- private static final String DIRECT_REALTIME_OFFHEAP_ALLOCATION =
"realtime.alloc.offheap.direct";
-
// Number of simultaneous segments that can be refreshed on one server.
// Segment refresh works by loading the old as well as new versions of
segments in memory, assigning
// new incoming queries to use the new version. The old version is dropped
when all the queries that
@@ -206,18 +182,18 @@ public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig
}
@Override
- public String getConsumerClientIdSuffix() {
- return
_serverConfig.getProperty(CONFIG_OF_REALTIME_SEGMENT_CONSUMER_CLIENT_ID_SUFFIX);
+ public String getTableDataManagerProviderClass() {
+ return _serverConfig.getProperty(TABLE_DATA_MANAGER_PROVIDER_CLASS,
DEFAULT_TABLE_DATA_MANAGER_PROVIDER_CLASS);
}
@Override
- public String getInstanceBootstrapSegmentDir() {
- return _serverConfig.getProperty(INSTANCE_BOOTSTRAP_SEGMENT_DIR);
+ public String getConsumerClientIdSuffix() {
+ return _serverConfig.getProperty(CONSUMER_CLIENT_ID_SUFFIX);
}
@Override
public String getSegmentStoreUri() {
- return _serverConfig.getProperty(CONFIG_OF_SEGMENT_STORE_URI);
+ return _serverConfig.getProperty(SEGMENT_STORE_URI);
}
@Override
@@ -232,16 +208,16 @@ public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig
@Override
public boolean isRealtimeOffHeapAllocation() {
- return _serverConfig.getProperty(REALTIME_OFFHEAP_ALLOCATION, true);
+ return _serverConfig.getProperty(REALTIME_OFFHEAP_ALLOCATION,
DEFAULT_REALTIME_OFFHEAP_ALLOCATION);
}
@Override
public boolean isDirectRealtimeOffHeapAllocation() {
- return _serverConfig.getProperty(DIRECT_REALTIME_OFFHEAP_ALLOCATION,
false);
+ return _serverConfig.getProperty(REALTIME_OFFHEAP_DIRECT_ALLOCATION,
DEFAULT_REALTIME_OFFHEAP_DIRECT_ALLOCATION);
}
public boolean shouldReloadConsumingSegment() {
- return _serverConfig.getProperty(INSTANCE_RELOAD_CONSUMING_SEGMENT,
Server.DEFAULT_RELOAD_CONSUMING_SEGMENT);
+ return _serverConfig.getProperty(RELOAD_CONSUMING_SEGMENT,
DEFAULT_RELOAD_CONSUMING_SEGMENT);
}
@Override
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
index c9d406d19e..52e9b6f9f2 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
@@ -35,7 +35,7 @@ public interface InstanceDataManagerConfig {
String getInstanceSegmentTarDir();
- String getInstanceBootstrapSegmentDir();
+ String getTableDataManagerProviderClass();
String getSegmentStoreUri();
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 21800684b9..89046dec81 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -24,6 +24,7 @@ import java.math.BigDecimal;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.config.instance.InstanceType;
@@ -379,8 +380,8 @@ public class CommonConstants {
// to determine whether the query could have successfully been run on the
v2 / multi-stage query engine. If not,
// a counter metric will be incremented - if this counter remains 0 during
regular query workload execution, it
// signals that users can potentially migrate their query workload to the
multistage query engine.
- public static final String
CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC
- = "pinot.broker.enable.multistage.migration.metric";
+ public static final String
CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC =
+ "pinot.broker.enable.multistage.migration.metric";
public static final boolean DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC =
false;
public static class Request {
@@ -614,14 +615,55 @@ public class CommonConstants {
public static final String QUERY_EXECUTOR_CONFIG_PREFIX =
"pinot.server.query.executor";
public static final String METRICS_CONFIG_PREFIX = "pinot.server.metrics";
- public static final String CONFIG_OF_INSTANCE_ID =
"pinot.server.instance.id";
- public static final String CONFIG_OF_INSTANCE_DATA_DIR =
"pinot.server.instance.dataDir";
- public static final String CONFIG_OF_CONSUMER_DIR =
"pinot.server.instance.consumerDir";
- public static final String CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR =
"pinot.server.instance.segmentTarDir";
- public static final String CONFIG_OF_INSTANCE_READ_MODE =
"pinot.server.instance.readMode";
- public static final String CONFIG_OF_INSTANCE_RELOAD_CONSUMING_SEGMENT =
- "pinot.server.instance.reload.consumingSegment";
public static final String CONFIG_OF_INSTANCE_DATA_MANAGER_CLASS =
"pinot.server.instance.data.manager.class";
+ public static final String DEFAULT_INSTANCE_DATA_MANAGER_CLASS =
+ "org.apache.pinot.server.starter.helix.HelixInstanceDataManager";
+ // Following configs are used in HelixInstanceDataManagerConfig, where the
config prefix is trimmed. We keep the
+ // full config for reference purpose.
+ public static final String INSTANCE_ID = "id";
+ public static final String CONFIG_OF_INSTANCE_ID =
INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + INSTANCE_ID;
+ public static final String INSTANCE_DATA_DIR = "dataDir";
+ public static final String CONFIG_OF_INSTANCE_DATA_DIR =
+ INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + INSTANCE_DATA_DIR;
+ public static final String DEFAULT_INSTANCE_BASE_DIR =
+ FileUtils.getTempDirectoryPath() + File.separator + "PinotServer";
+ public static final String DEFAULT_INSTANCE_DATA_DIR =
DEFAULT_INSTANCE_BASE_DIR + File.separator + "index";
+ public static final String CONSUMER_DIR = "consumerDir";
+ public static final String CONFIG_OF_CONSUMER_DIR =
INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + CONSUMER_DIR;
+ public static final String INSTANCE_SEGMENT_TAR_DIR = "segmentTarDir";
+ public static final String CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR =
+ INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + INSTANCE_SEGMENT_TAR_DIR;
+ public static final String DEFAULT_INSTANCE_SEGMENT_TAR_DIR =
+ DEFAULT_INSTANCE_BASE_DIR + File.separator + "segmentTar";
+ public static final String CONSUMER_CLIENT_ID_SUFFIX =
"consumer.client.id.suffix";
+ public static final String CONFIG_OF_CONSUMER_CLIENT_ID_SUFFIX =
+ INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + CONSUMER_CLIENT_ID_SUFFIX;
+ public static final String SEGMENT_STORE_URI = "segment.store.uri";
+ public static final String CONFIG_OF_SEGMENT_STORE_URI =
+ INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + SEGMENT_STORE_URI;
+ public static final String TABLE_DATA_MANAGER_PROVIDER_CLASS =
"table.data.manager.provider.class";
+ public static final String CONFIG_OF_TABLE_DATA_MANAGER_PROVIDER_CLASS =
+ INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." +
TABLE_DATA_MANAGER_PROVIDER_CLASS;
+ public static final String DEFAULT_TABLE_DATA_MANAGER_PROVIDER_CLASS =
+
"org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider";
+ public static final String READ_MODE = "readMode";
+ public static final String CONFIG_OF_READ_MODE =
INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + READ_MODE;
+ public static final String DEFAULT_READ_MODE = "mmap";
+ public static final String SEGMENT_FORMAT_VERSION =
"segment.format.version";
+ public static final String CONFIG_OF_SEGMENT_FORMAT_VERSION =
+ INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + SEGMENT_FORMAT_VERSION;
+ public static final String REALTIME_OFFHEAP_ALLOCATION =
"realtime.alloc.offheap";
+ public static final String CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION =
+ INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." +
REALTIME_OFFHEAP_ALLOCATION;
+ public static final boolean DEFAULT_REALTIME_OFFHEAP_ALLOCATION = true;
+ public static final String REALTIME_OFFHEAP_DIRECT_ALLOCATION =
"realtime.alloc.offheap.direct";
+ public static final String CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION =
+ INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." +
REALTIME_OFFHEAP_DIRECT_ALLOCATION;
+ public static final boolean DEFAULT_REALTIME_OFFHEAP_DIRECT_ALLOCATION =
false;
+ public static final String RELOAD_CONSUMING_SEGMENT =
"reload.consumingSegment";
+ public static final String CONFIG_OF_RELOAD_CONSUMING_SEGMENT =
+ INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + RELOAD_CONSUMING_SEGMENT;
+ public static final boolean DEFAULT_RELOAD_CONSUMING_SEGMENT = true;
// Query logger related configs
public static final String CONFIG_OF_QUERY_LOG_MAX_RATE =
"pinot.server.query.log.maxRatePerSecond";
@@ -671,10 +713,6 @@ public class CommonConstants {
public static final String CONFIG_OF_SERVER_RESOURCE_PACKAGES =
"server.restlet.api.resource.packages";
public static final String DEFAULT_SERVER_RESOURCE_PACKAGES =
"org.apache.pinot.server.api.resources";
- public static final String CONFIG_OF_SEGMENT_FORMAT_VERSION =
"pinot.server.instance.segment.format.version";
- public static final String CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION =
"pinot.server.instance.realtime.alloc.offheap";
- public static final String CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION =
- "pinot.server.instance.realtime.alloc.offheap.direct";
public static final String PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY =
"pinot.server.storage.factory";
public static final String PREFIX_OF_CONFIG_OF_PINOT_CRYPTER =
"pinot.server.crypter";
public static final String CONFIG_OF_VALUE_PRUNER_IN_PREDICATE_THRESHOLD =
@@ -726,17 +764,7 @@ public class CommonConstants {
// Default to 0.0 (no limit)
public static final double DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT = 0.0;
- public static final String DEFAULT_READ_MODE = "mmap";
public static final String CONFIG_OF_MMAP_DEFAULT_ADVICE =
"pinot.server.mmap.advice.default";
- // Whether to reload consuming segment on scheme update
- public static final boolean DEFAULT_RELOAD_CONSUMING_SEGMENT = true;
- public static final String DEFAULT_INSTANCE_BASE_DIR =
- System.getProperty("java.io.tmpdir") + File.separator + "PinotServer";
- public static final String DEFAULT_INSTANCE_DATA_DIR =
DEFAULT_INSTANCE_BASE_DIR + File.separator + "index";
- public static final String DEFAULT_INSTANCE_SEGMENT_TAR_DIR =
- DEFAULT_INSTANCE_BASE_DIR + File.separator + "segmentTar";
- public static final String DEFAULT_DATA_MANAGER_CLASS =
- "org.apache.pinot.server.starter.helix.HelixInstanceDataManager";
public static final String DEFAULT_QUERY_EXECUTOR_CLASS =
"org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl";
// The order of the pruners matters. Pruning with segment metadata ahead
of those using segment data like bloom
@@ -795,12 +823,8 @@ public class CommonConstants {
public static final String SERVER_GRPCTLS_PREFIX = "pinot.server.grpctls";
public static final String SERVER_NETTY_PREFIX = "pinot.server.netty";
- // The complete config key is pinot.server.instance.segment.store.uri
- public static final String CONFIG_OF_SEGMENT_STORE_URI =
"segment.store.uri";
public static final String CONFIG_OF_LOGGER_ROOT_DIR =
"pinot.server.logger.root.dir";
- public static final String
CONFIG_OF_REALTIME_SEGMENT_CONSUMER_CLIENT_ID_SUFFIX =
"consumer.client.id.suffix";
-
public static final String LUCENE_MAX_REFRESH_THREADS =
"pinot.server.lucene.max.refresh.threads";
public static final int DEFAULT_LUCENE_MAX_REFRESH_THREADS = 1;
public static final String LUCENE_MIN_REFRESH_INTERVAL_MS =
"pinot.server.lucene.min.refresh.interval.ms";
@@ -1022,8 +1046,7 @@ public class CommonConstants {
public static final String CONFIG_OF_QUERY_KILLED_METRIC_ENABLED =
"accounting.query.killed.metric.enabled";
public static final boolean DEFAULT_QUERY_KILLED_METRIC_ENABLED = false;
- public static final String CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE =
- "accounting.enable.thread.sampling.mse.debug";
+ public static final String CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE =
"accounting.enable.thread.sampling.mse.debug";
public static final Boolean DEFAULT_ENABLE_THREAD_SAMPLING_MSE = true;
}
@@ -1256,8 +1279,8 @@ public class CommonConstants {
public static final int V1 = 1;
}
- public static final String KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN
- = "pinot.query.multistage.explain.include.segment.plan";
+ public static final String KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN =
+ "pinot.query.multistage.explain.include.segment.plan";
public static final boolean
DEFAULT_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN = false;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]