This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 767f32fc11 add preloading support for dedup tables (#14187)
767f32fc11 is described below
commit 767f32fc113dbf5c027bc10b1cb695611194614e
Author: Xiaobing <[email protected]>
AuthorDate: Tue Oct 22 08:25:03 2024 -0700
add preloading support for dedup tables (#14187)
* add preloading suuport for dedup tables
---
.../apache/pinot/common/metrics/ServerMeter.java | 1 +
.../apache/pinot/common/metrics/ServerTimer.java | 1 +
.../common/utils/config/TableConfigSerDeTest.java | 7 +-
.../manager/realtime/RealtimeTableDataManager.java | 59 +++++--
.../tests/DedupPreloadIntegrationTest.java | 162 +++++++++++++++++++
.../dedup/BasePartitionDedupMetadataManager.java | 171 +++++++++++++++++----
.../local/dedup/BaseTableDedupMetadataManager.java | 33 ++--
...ConcurrentMapPartitionDedupMetadataManager.java | 11 ++
.../pinot/segment/local/dedup/DedupContext.java | 33 ++--
.../local/dedup/PartitionDedupMetadataManager.java | 18 +++
.../local/dedup/TableDedupMetadataManager.java | 2 +
.../BasePartitionDedupMetadataManagerTest.java | 86 +++++++++++
...apPartitionDedupMetadataManagerWithTTLTest.java | 7 +-
...artitionDedupMetadataManagerWithoutTTLTest.java | 4 +-
.../TableDedupMetadataManagerFactoryTest.java | 68 ++++++++
.../mutable/MutableSegmentDedupeTest.java | 2 +-
.../apache/pinot/spi/config/table/DedupConfig.java | 14 +-
17 files changed, 592 insertions(+), 87 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 77595d6181..a68e77f144 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -52,6 +52,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
// number of times partition of a record did not match the partition of the
stream
REALTIME_PARTITION_MISMATCH("mismatch", false),
REALTIME_DEDUP_DROPPED("rows", false),
+ DEDUP_PRELOAD_FAILURE("count", false),
UPSERT_KEYS_IN_WRONG_SEGMENT("rows", false),
PARTIAL_UPSERT_OUT_OF_ORDER("rows", false),
PARTIAL_UPSERT_KEYS_NOT_REPLACED("rows", false),
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index 63b42440a6..6738dbe194 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -58,6 +58,7 @@ public enum ServerTimer implements AbstractMetrics.Timer {
DEDUP_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS("milliseconds", false,
"Total time taken to delete expired dedup primary keys based on
metadataTTL or deletedKeysTTL"),
+ DEDUP_PRELOAD_TIME_MS("milliseconds", false, "Total time taken to preload a
table partition of a dedup table"),
SECONDARY_Q_WAIT_TIME_MS("milliseconds", false,
"Time spent waiting in the secondary queue when BinaryWorkloadScheduler
is used."),
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index 47de3d6225..5972994e2d 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -65,6 +65,7 @@ import static org.testng.Assert.*;
public class TableConfigSerDeTest {
private static final double NO_DICTIONARY_THRESHOLD_RATIO = 0.72;
+
@Test
public void testSerDe()
throws IOException {
@@ -192,8 +193,8 @@ public class TableConfigSerDeTest {
}
{
// With query config
- QueryConfig queryConfig = new QueryConfig(1000L, true, true,
Collections.singletonMap("func(a)", "b"), null,
- null);
+ QueryConfig queryConfig =
+ new QueryConfig(1000L, true, true,
Collections.singletonMap("func(a)", "b"), null, null);
TableConfig tableConfig =
tableConfigBuilder.setQueryConfig(queryConfig).build();
checkQueryConfig(tableConfig);
@@ -270,7 +271,7 @@ public class TableConfigSerDeTest {
}
{
// with dedup config - with metadata ttl and metadata time column
- DedupConfig dedupConfig = new DedupConfig(true, HashFunction.MD5, null,
null, 10, "dedupTimeColumn");
+ DedupConfig dedupConfig = new DedupConfig(true, HashFunction.MD5, null,
null, 10, "dedupTimeColumn", false);
TableConfig tableConfig =
tableConfigBuilder.setDedupConfig(dedupConfig).build();
// Serialize then de-serialize
checkTableConfigWithDedupConfigWithTTL(JsonUtils.stringToObject(tableConfig.toJsonString(),
TableConfig.class));
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 297254529d..b4b33baa02 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -402,6 +402,15 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
&& _tableUpsertMetadataManager.getUpsertMode() ==
UpsertConfig.Mode.PARTIAL;
}
+ private void handleSegmentPreload(SegmentZKMetadata zkMetadata,
IndexLoadingConfig indexLoadingConfig) {
+ // Today a table can use either upsert or dedup but not both at the same
time, so preloading is done by either the
+ // upsert manager or the dedup manager.
+ // TODO: if a table can enable both dedup and upsert in the future, we
need to revisit the preloading logic here,
+ // as we can only preload segments once but have to restore metadata
for both dedup and upsert managers.
+ handleUpsertPreload(zkMetadata, indexLoadingConfig);
+ handleDedupPreload(zkMetadata, indexLoadingConfig);
+ }
+
/**
* Handles upsert preload if the upsert preload is enabled.
*/
@@ -417,6 +426,21 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
_tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId).preloadSegments(indexLoadingConfig);
}
+ /**
+ * Handles dedup preload if the dedup preload is enabled.
+ */
+ private void handleDedupPreload(SegmentZKMetadata zkMetadata,
IndexLoadingConfig indexLoadingConfig) {
+ if (_tableDedupMetadataManager == null ||
!_tableDedupMetadataManager.isEnablePreload()) {
+ return;
+ }
+ String segmentName = zkMetadata.getSegmentName();
+ Integer partitionId =
SegmentUtils.getRealtimeSegmentPartitionId(segmentName, zkMetadata, null);
+ Preconditions.checkState(partitionId != null,
+ String.format("Failed to get partition id for segment: %s in
dedup-enabled table: %s", segmentName,
+ _tableNameWithType));
+
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionId).preloadSegments(indexLoadingConfig);
+ }
+
protected void doAddOnlineSegment(String segmentName)
throws Exception {
SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
@@ -424,7 +448,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
"Segment: %s of table: %s is not committed, cannot make it ONLINE",
segmentName, _tableNameWithType);
IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
indexLoadingConfig.setSegmentTier(zkMetadata.getTier());
- handleUpsertPreload(zkMetadata, indexLoadingConfig);
+ handleSegmentPreload(zkMetadata, indexLoadingConfig);
SegmentDataManager segmentDataManager =
_segmentDataManagerMap.get(segmentName);
if (segmentDataManager == null) {
addNewOnlineSegment(zkMetadata, indexLoadingConfig);
@@ -470,7 +494,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
return;
}
IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
- handleUpsertPreload(zkMetadata, indexLoadingConfig);
+ handleSegmentPreload(zkMetadata, indexLoadingConfig);
SegmentDataManager segmentDataManager =
_segmentDataManagerMap.get(segmentName);
if (segmentDataManager != null) {
_logger.warn("Segment: {} ({}) already exists, skipping adding it as
CONSUMING segment", segmentName,
@@ -567,22 +591,29 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
private void handleDedup(ImmutableSegmentImpl immutableSegment) {
// TODO(saurabh) refactor commons code with handleUpsert
String segmentName = immutableSegment.getSegmentName();
- Integer partitionGroupId =
+ _logger.info("Adding immutable segment: {} with dedup enabled",
segmentName);
+ Integer partitionId =
SegmentUtils.getRealtimeSegmentPartitionId(segmentName,
_tableNameWithType, _helixManager, null);
- Preconditions.checkNotNull(partitionGroupId,
- String.format("PartitionGroupId is not available for segment: '%s'
(dedup-enabled table: %s)", segmentName,
+ Preconditions.checkNotNull(partitionId,
+ String.format("PartitionId is not available for segment: '%s'
(dedup-enabled table: %s)", segmentName,
_tableNameWithType));
PartitionDedupMetadataManager partitionDedupMetadataManager =
-
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId);
+ _tableDedupMetadataManager.getOrCreatePartitionManager(partitionId);
immutableSegment.enableDedup(partitionDedupMetadataManager);
SegmentDataManager oldSegmentManager =
_segmentDataManagerMap.get(segmentName);
- if (oldSegmentManager != null) {
- LOGGER.info("Replacing mutable segment: {} with immutable segment: {} in
partition dedup metadata manager",
- oldSegmentManager.getSegment().getSegmentName(), segmentName);
-
partitionDedupMetadataManager.replaceSegment(oldSegmentManager.getSegment(),
immutableSegment);
- } else {
- LOGGER.info("Adding immutable segment: {} to partition dedup metadata
manager", segmentName);
+ if (partitionDedupMetadataManager.isPreloading()) {
+ partitionDedupMetadataManager.preloadSegment(immutableSegment);
+ LOGGER.info("Preloaded immutable segment: {} with dedup enabled",
segmentName);
+ return;
+ }
+ if (oldSegmentManager == null) {
partitionDedupMetadataManager.addSegment(immutableSegment);
+ LOGGER.info("Added new immutable segment: {} with dedup enabled",
segmentName);
+ } else {
+ IndexSegment oldSegment = oldSegmentManager.getSegment();
+ partitionDedupMetadataManager.replaceSegment(oldSegment,
immutableSegment);
+ LOGGER.info("Replaced {} segment: {} with dedup enabled",
+ oldSegment instanceof ImmutableSegment ? "immutable" : "mutable",
segmentName);
}
}
@@ -603,8 +634,8 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
_serverMetrics.addValueToTableGauge(_tableNameWithType,
ServerGauge.SEGMENT_COUNT, 1L);
ImmutableSegmentDataManager newSegmentManager = new
ImmutableSegmentDataManager(immutableSegment);
if (partitionUpsertMetadataManager.isPreloading()) {
- // Preloading segment is ensured to be handled by a single thread, so no
need to take the segment upsert lock.
- // Besides, preloading happens before the table partition is made ready
for any queries.
+ // Register segment after it is preloaded and has initialized its
validDocIds. The order of preloading and
+ // registering segment doesn't matter much as preloading happens before
table partition is ready for queries.
partitionUpsertMetadataManager.preloadSegment(immutableSegment);
registerSegment(segmentName, newSegmentManager,
partitionUpsertMetadataManager);
_logger.info("Preloaded immutable segment: {} with upsert enabled",
segmentName);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupPreloadIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupPreloadIntegrationTest.java
new file mode 100644
index 0000000000..c2589bb520
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupPreloadIntegrationTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.DedupConfig;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
+import org.apache.pinot.spi.config.table.RoutingConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class DedupPreloadIntegrationTest extends BaseClusterIntegrationTestSet
{
+
+ private List<File> _avroFiles;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ // Start the Pinot cluster
+ startZk();
+ // Start a customized controller with more frequent realtime segment
validation
+ startController();
+ startBroker();
+ startServer();
+
+ _avroFiles = unpackAvroData(_tempDir);
+ startKafka();
+ pushAvroIntoKafka(_avroFiles);
+
+ Schema schema = createSchema();
+ addSchema(schema);
+ TableConfig tableConfig = createDedupTableConfig(_avroFiles.get(0), "id",
getNumKafkaPartitions());
+ addTableConfig(tableConfig);
+
+ waitForAllDocsLoaded(600_000L);
+ }
+
+ @Override
+ protected void overrideServerConf(PinotConfiguration serverConf) {
+
serverConf.setProperty(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX
+ ".max.segment.preload.threads",
+ "1");
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ dropRealtimeTable(getTableName());
+ stopServer();
+ stopBroker();
+ stopController();
+ stopKafka();
+ stopZk();
+ FileUtils.deleteDirectory(_tempDir);
+ }
+
+ @Override
+ protected int getRealtimeSegmentFlushSize() {
+ // Create > 1 segments
+ return 2;
+ }
+
+ @Override
+ protected String getSchemaFileName() {
+ return "dedupIngestionTestSchema.schema";
+ }
+
+ @Override
+ protected String getAvroTarFileName() {
+ return "dedupIngestionTestData.tar.gz";
+ }
+
+ @Override
+ protected String getPartitionColumn() {
+ return "id";
+ }
+
+ @Override
+ protected long getCountStarResult() {
+ // Three distinct records are expected with pk values of 100000, 100001,
100002
+ return 5;
+ }
+
+ @Test
+ public void testValues()
+ throws Exception {
+ assertEquals(getCurrentCountStarResult(), getCountStarResult());
+
+ // Validate the older value persist
+ for (int i = 0; i < getCountStarResult(); i++) {
+ assertEquals(
+ getPinotConnection().execute("SELECT name FROM " + getTableName() +
" WHERE id = " + i).getResultSet(0)
+ .getString(0), "" + i);
+ }
+
+ // Restart the servers and check again
+ restartServers();
+ waitForAllDocsLoaded(600_000L);
+
+ // Validate the older value persist
+ for (int i = 0; i < getCountStarResult(); i++) {
+ assertEquals(
+ getPinotConnection().execute("SELECT name FROM " + getTableName() +
" WHERE id = " + i).getResultSet(0)
+ .getString(0), "" + i);
+ }
+ }
+
+ @Override
+ protected TableConfig createDedupTableConfig(File sampleAvroFile, String
primaryKeyColumn, int numPartitions) {
+ AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile;
+ Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new
HashMap<>();
+ columnPartitionConfigMap.put(primaryKeyColumn, new
ColumnPartitionConfig("Murmur", numPartitions));
+
+ DedupConfig dedupConfig = new DedupConfig(true, HashFunction.NONE, null,
null, 0, null, true);
+
+ return new
TableConfigBuilder(TableType.REALTIME).setTableName(getTableName())
+
.setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
+
.setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig())
+
.setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
+
.setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled()).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+ .setSegmentPartitionConfig(new
SegmentPartitionConfig(columnPartitionConfigMap))
+ .setReplicaGroupStrategyConfig(new
ReplicaGroupStrategyConfig(primaryKeyColumn, 1)).setDedupConfig(dedupConfig)
+ .build();
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
index 3892d36d92..08ca8633a7 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
@@ -24,14 +24,23 @@ import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
+import org.apache.helix.HelixManager;
import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerTimer;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.utils.SegmentPreloadUtils;
import org.apache.pinot.segment.local.utils.WatermarkUtils;
+import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.spi.config.table.HashFunction;
@@ -46,6 +55,7 @@ public abstract class BasePartitionDedupMetadataManager
implements PartitionDedu
protected final String _tableNameWithType;
protected final List<String> _primaryKeyColumns;
protected final int _partitionId;
+ protected final DedupContext _context;
protected final ServerMetrics _serverMetrics;
protected final HashFunction _hashFunction;
protected final double _metadataTTL;
@@ -58,16 +68,21 @@ public abstract class BasePartitionDedupMetadataManager
implements PartitionDedu
// Initialize with 1 pending operation to indicate the metadata manager can
take more operations
private int _numPendingOperations = 1;
private boolean _closed;
+ // The lock and boolean flag ensure only one thread can start preloading and
preloading happens only once.
+ private final Lock _preloadLock = new ReentrantLock();
+ private volatile boolean _isPreloading;
protected BasePartitionDedupMetadataManager(String tableNameWithType, int
partitionId, DedupContext dedupContext) {
_tableNameWithType = tableNameWithType;
_partitionId = partitionId;
+ _context = dedupContext;
_primaryKeyColumns = dedupContext.getPrimaryKeyColumns();
_hashFunction = dedupContext.getHashFunction();
- _serverMetrics = dedupContext.getServerMetrics();
+ _isPreloading = dedupContext.isPreloadEnabled();
_metadataTTL = dedupContext.getMetadataTTL() >= 0 ?
dedupContext.getMetadataTTL() : 0;
_dedupTimeColumn = dedupContext.getDedupTimeColumn();
_tableIndexDir = dedupContext.getTableIndexDir();
+ _serverMetrics = ServerMetrics.get();
_logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId +
"-" + getClass().getSimpleName());
if (_metadataTTL > 0) {
Preconditions.checkArgument(_dedupTimeColumn != null,
@@ -87,7 +102,58 @@ public abstract class BasePartitionDedupMetadataManager
implements PartitionDedu
}
@Override
- public void addSegment(IndexSegment segment) {
+ public boolean isPreloading() {
+ return _isPreloading;
+ }
+
+ @Override
+ public void preloadSegments(IndexLoadingConfig indexLoadingConfig) {
+ if (!_isPreloading) {
+ return;
+ }
+ TableDataManager tableDataManager = _context.getTableDataManager();
+ Preconditions.checkNotNull(tableDataManager, "Preloading segments requires
tableDataManager");
+ HelixManager helixManager = tableDataManager.getHelixManager();
+ ExecutorService segmentPreloadExecutor =
tableDataManager.getSegmentPreloadExecutor();
+ // Preloading the segments for dedup table for fast metadata recovery, as
done for upsert table.
+ _preloadLock.lock();
+ try {
+ // Check the flag again to ensure preloading happens only once.
+ if (!_isPreloading) {
+ return;
+ }
+ // From now on, the _isPreloading flag is true until the segments are
preloaded.
+ long startTime = System.currentTimeMillis();
+ doPreloadSegments(tableDataManager, indexLoadingConfig, helixManager,
segmentPreloadExecutor);
+ long duration = System.currentTimeMillis() - startTime;
+ _serverMetrics.addTimedTableValue(_tableNameWithType,
ServerTimer.DEDUP_PRELOAD_TIME_MS, duration,
+ TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ // We should continue even if preloading fails, so that segments not
being preloaded successfully can get
+ // loaded via the normal segment loading logic as done on the Helix task
threads.
+ _logger.warn("Failed to preload segments from partition: {} of table:
{}, skipping", _partitionId,
+ _tableNameWithType, e);
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.DEDUP_PRELOAD_FAILURE, 1);
+ if (e instanceof InterruptedException) {
+ // Restore the interrupted status in case the upper callers want to
check.
+ Thread.currentThread().interrupt();
+ }
+ } finally {
+ _isPreloading = false;
+ _preloadLock.unlock();
+ }
+ }
+
+ // Keep this hook method for subclasses to modify the preloading logic.
+ protected void doPreloadSegments(TableDataManager tableDataManager,
IndexLoadingConfig indexLoadingConfig,
+ HelixManager helixManager, ExecutorService segmentPreloadExecutor)
+ throws Exception {
+ SegmentPreloadUtils.preloadSegments(tableDataManager, _partitionId,
indexLoadingConfig, helixManager,
+ segmentPreloadExecutor, null);
+ }
+
+ @Override
+ public void preloadSegment(ImmutableSegment segment) {
String segmentName = segment.getSegmentName();
if (segment instanceof EmptyIndexSegment) {
_logger.info("Skip adding empty segment: {}", segmentName);
@@ -97,21 +163,58 @@ public abstract class BasePartitionDedupMetadataManager
implements PartitionDedu
"Got unsupported segment implementation: %s for segment: %s, table:
%s", segment.getClass(), segmentName,
_tableNameWithType);
if (!startOperation()) {
- _logger.info("Skip adding segment: {} because dedup metadata manager is
already stopped",
- segment.getSegmentName());
+ _logger.info("Skip preloading segment: {} because dedup metadata manager
is already stopped", segmentName);
return;
}
try {
- addOrReplaceSegment(null, segment);
+ if (skipSegmentOutOfTTL(segment, true)) {
+ return;
+ }
+ try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new
DedupUtils.DedupRecordInfoReader(segment,
+ _primaryKeyColumns, _dedupTimeColumn)) {
+ Iterator<DedupRecordInfo> dedupRecordInfoIterator =
+ DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader,
segment.getSegmentMetadata().getTotalDocs());
+ doPreloadSegment(segment, dedupRecordInfoIterator);
+ updatePrimaryKeyGauge();
+ }
} catch (Exception e) {
throw new RuntimeException(
- String.format("Caught exception while adding segment: %s of table:
%s to %s", segment.getSegmentName(),
+ String.format("Caught exception while preloading segment: %s of
table: %s in %s", segmentName,
_tableNameWithType, this.getClass().getSimpleName()), e);
} finally {
finishOperation();
}
}
+ protected abstract void doPreloadSegment(ImmutableSegment segment,
Iterator<DedupRecordInfo> dedupRecordInfoIterator);
+
+ @Override
+ public void addSegment(IndexSegment segment) {
+ String segmentName = segment.getSegmentName();
+ if (segment instanceof EmptyIndexSegment) {
+ _logger.info("Skip adding empty segment: {}", segmentName);
+ return;
+ }
+ Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
+ "Got unsupported segment implementation: %s for segment: %s, table:
%s", segment.getClass(), segmentName,
+ _tableNameWithType);
+ if (!startOperation()) {
+ _logger.info("Skip adding segment: {} because dedup metadata manager is
already stopped", segmentName);
+ return;
+ }
+ try {
+ if (!skipSegmentOutOfTTL(segment, true)) {
+ addOrReplaceSegment(null, segment);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Caught exception while adding segment: %s of table:
%s to %s", segmentName, _tableNameWithType,
+ this.getClass().getSimpleName()), e);
+ } finally {
+ finishOperation();
+ }
+ }
+
@Override
public void replaceSegment(IndexSegment oldSegment, IndexSegment newSegment)
{
if (!startOperation()) {
@@ -120,7 +223,9 @@ public abstract class BasePartitionDedupMetadataManager
implements PartitionDedu
return;
}
try {
- addOrReplaceSegment(oldSegment, newSegment);
+ if (!skipSegmentOutOfTTL(newSegment, true)) {
+ addOrReplaceSegment(oldSegment, newSegment);
+ }
} catch (Exception e) {
throw new RuntimeException(
String.format("Caught exception while replacing segment: %s with
segment: %s of table: %s in %s",
@@ -131,19 +236,27 @@ public abstract class BasePartitionDedupMetadataManager
implements PartitionDedu
}
}
- private void addOrReplaceSegment(@Nullable IndexSegment oldSegment,
IndexSegment newSegment)
- throws IOException {
- // If metadataTTL is enabled, we can skip adding dedup metadata for
segment that's already out of the TTL.
- if (_metadataTTL > 0) {
- double maxDedupTime = getMaxDedupTime(newSegment);
+ protected boolean skipSegmentOutOfTTL(IndexSegment segment, boolean
updateWatermark) {
+ if (_metadataTTL <= 0) {
+ return false;
+ }
+ // If metadataTTL is enabled, we can skip adding dedup metadata for
segment already out of the TTL. Different
+ // from upsert table, there is no need to initialize things like
validDocIds bitmap for those skipped segments.
+ double maxDedupTime = getMaxDedupTime(segment);
+ if (updateWatermark) {
_largestSeenTime.getAndUpdate(time -> Math.max(time, maxDedupTime));
- if (isOutOfMetadataTTL(maxDedupTime)) {
- String action = oldSegment == null ? "adding" : "replacing";
- _logger.info("Skip {} segment: {} as max dedupTime: {} is out of TTL:
{}", action, newSegment.getSegmentName(),
- maxDedupTime, _metadataTTL);
- return;
- }
}
+ if (!isOutOfMetadataTTL(maxDedupTime)) {
+ return false;
+ }
+ _logger.info("Skip segment: {} as max dedupTime: {} is out of TTL: {}",
segment.getSegmentName(), maxDedupTime,
+ _metadataTTL);
+ // Return true if skipped. Boolean value allows subclasses to disable
skipping.
+ return true;
+ }
+
+ private void addOrReplaceSegment(@Nullable IndexSegment oldSegment,
IndexSegment newSegment)
+ throws IOException {
try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new
DedupUtils.DedupRecordInfoReader(newSegment,
_primaryKeyColumns, _dedupTimeColumn)) {
Iterator<DedupRecordInfo> dedupRecordInfoIterator =
@@ -169,21 +282,17 @@ public abstract class BasePartitionDedupMetadataManager
implements PartitionDedu
_logger.info("Skip removing segment: {} because metadata manager is
already stopped", segment.getSegmentName());
return;
}
- // Skip removing the dedup metadata of segment out of TTL. The expired
metadata is removed in batches.
- if (_metadataTTL > 0) {
- double maxDedupTime = getMaxDedupTime(segment);
- if (isOutOfMetadataTTL(maxDedupTime)) {
- _logger.info("Skip removing segment: {} as max dedupTime: {} is out of
TTL: {}", segment.getSegmentName(),
- maxDedupTime, _metadataTTL);
+ try {
+ if (skipSegmentOutOfTTL(segment, false)) {
return;
}
- }
- try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new
DedupUtils.DedupRecordInfoReader(segment,
- _primaryKeyColumns, _dedupTimeColumn)) {
- Iterator<DedupRecordInfo> dedupRecordInfoIterator =
- DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader,
segment.getSegmentMetadata().getTotalDocs());
- doRemoveSegment(segment, dedupRecordInfoIterator);
- updatePrimaryKeyGauge();
+ try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new
DedupUtils.DedupRecordInfoReader(segment,
+ _primaryKeyColumns, _dedupTimeColumn)) {
+ Iterator<DedupRecordInfo> dedupRecordInfoIterator =
+ DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader,
segment.getSegmentMetadata().getTotalDocs());
+ doRemoveSegment(segment, dedupRecordInfoIterator);
+ updatePrimaryKeyGauge();
+ }
} catch (Exception e) {
throw new RuntimeException(
String.format("Caught exception while removing segment: %s of table:
%s from %s", segment.getSegmentName(),
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
index 80639ebd5e..8172bb86f3 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.dedup;
import com.google.common.base.Preconditions;
+import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -27,14 +28,20 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.spi.config.table.DedupConfig;
+import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class BaseTableDedupMetadataManager implements
TableDedupMetadataManager {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BaseTableDedupMetadataManager.class);
+
protected final Map<Integer, PartitionDedupMetadataManager>
_partitionMetadataManagerMap = new ConcurrentHashMap<>();
protected String _tableNameWithType;
protected DedupContext _dedupContext;
+ private boolean _enablePreload;
@Override
public void init(TableConfig tableConfig, Schema schema, TableDataManager
tableDataManager,
@@ -57,19 +64,18 @@ public abstract class BaseTableDedupMetadataManager
implements TableDedupMetadat
"When metadataTTL is configured, metadata time column or time column
must be configured for "
+ "dedup enabled table: %s", _tableNameWithType);
}
-
+ _enablePreload = dedupConfig.isEnablePreload() &&
tableDataManager.getSegmentPreloadExecutor() != null;
+ HashFunction hashFunction = dedupConfig.getHashFunction();
+ File tableIndexDir = tableDataManager.getTableDataDir();
DedupContext.Builder dedupContextBuider = new DedupContext.Builder();
- dedupContextBuider
- .setTableConfig(tableConfig)
- .setSchema(schema)
- .setPrimaryKeyColumns(primaryKeyColumns)
- .setHashFunction(dedupConfig.getHashFunction())
- .setMetadataTTL(metadataTTL)
- .setDedupTimeColumn(dedupTimeColumn)
- .setTableIndexDir(tableDataManager.getTableDataDir())
- .setTableDataManager(tableDataManager)
- .setServerMetrics(serverMetrics);
+
dedupContextBuider.setTableConfig(tableConfig).setSchema(schema).setPrimaryKeyColumns(primaryKeyColumns)
+
.setHashFunction(hashFunction).setEnablePreload(_enablePreload).setMetadataTTL(metadataTTL)
+
.setDedupTimeColumn(dedupTimeColumn).setTableIndexDir(tableIndexDir).setTableDataManager(tableDataManager);
_dedupContext = dedupContextBuider.build();
+ LOGGER.info(
+ "Initialized {} for table: {} with primary key columns: {}, hash
function: {}, enable preload: {}, metadata "
+ + "TTL: {}, dedup time column: {}, table index dir: {}",
getClass().getSimpleName(), _tableNameWithType,
+ primaryKeyColumns, hashFunction, _enablePreload, metadataTTL,
dedupTimeColumn, tableIndexDir);
initCustomVariables();
}
@@ -89,6 +95,11 @@ public abstract class BaseTableDedupMetadataManager
implements TableDedupMetadat
protected void initCustomVariables() {
}
+ @Override
+ public boolean isEnablePreload() {
+ return _enablePreload;
+ }
+
@Override
public void stop() {
for (PartitionDedupMetadataManager metadataManager :
_partitionMetadataManagerMap.values()) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
index 4461266e35..b4ef9ca63a 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -38,6 +39,16 @@ class ConcurrentMapPartitionDedupMetadataManager extends
BasePartitionDedupMetad
super(tableNameWithType, partitionId, dedupContext);
}
+ @Override
+ protected void doPreloadSegment(ImmutableSegment segment,
Iterator<DedupRecordInfo> dedupRecordInfoIterator) {
+ while (dedupRecordInfoIterator.hasNext()) {
+ DedupRecordInfo dedupRecordInfo = dedupRecordInfoIterator.next();
+ double dedupTime = dedupRecordInfo.getDedupTime();
+
_primaryKeyToSegmentAndTimeMap.put(HashUtils.hashPrimaryKey(dedupRecordInfo.getPrimaryKey(),
_hashFunction),
+ Pair.of(segment, dedupTime));
+ }
+ }
+
@Override
protected void doAddOrReplaceSegment(IndexSegment oldSegment, IndexSegment
newSegment,
Iterator<DedupRecordInfo> dedupRecordInfoIteratorOfNewSegment) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
index a523f26957..4407676ad7 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
@@ -22,7 +22,6 @@ import com.google.common.base.Preconditions;
import java.io.File;
import java.util.List;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -34,24 +33,24 @@ public class DedupContext {
private final Schema _schema;
private final List<String> _primaryKeyColumns;
private final HashFunction _hashFunction;
+ private final boolean _enablePreload;
private final double _metadataTTL;
private final String _dedupTimeColumn;
private final File _tableIndexDir;
private final TableDataManager _tableDataManager;
- private final ServerMetrics _serverMetrics;
private DedupContext(TableConfig tableConfig, Schema schema, List<String>
primaryKeyColumns,
- HashFunction hashFunction, double metadataTTL, String dedupTimeColumn,
File tableIndexDir,
- TableDataManager tableDataManager, ServerMetrics serverMetrics) {
+ HashFunction hashFunction, boolean enablePreload, double metadataTTL,
String dedupTimeColumn, File tableIndexDir,
+ TableDataManager tableDataManager) {
_tableConfig = tableConfig;
_schema = schema;
_primaryKeyColumns = primaryKeyColumns;
_hashFunction = hashFunction;
+ _enablePreload = enablePreload;
_metadataTTL = metadataTTL;
_dedupTimeColumn = dedupTimeColumn;
_tableIndexDir = tableIndexDir;
_tableDataManager = tableDataManager;
- _serverMetrics = serverMetrics;
}
public TableConfig getTableConfig() {
@@ -70,6 +69,10 @@ public class DedupContext {
return _hashFunction;
}
+ public boolean isPreloadEnabled() {
+ return _enablePreload;
+ }
+
public double getMetadataTTL() {
return _metadataTTL;
}
@@ -86,20 +89,16 @@ public class DedupContext {
return _tableDataManager;
}
- public ServerMetrics getServerMetrics() {
- return _serverMetrics;
- }
-
public static class Builder {
private TableConfig _tableConfig;
private Schema _schema;
private List<String> _primaryKeyColumns;
private HashFunction _hashFunction;
+ private boolean _enablePreload;
private double _metadataTTL;
private String _dedupTimeColumn;
private File _tableIndexDir;
private TableDataManager _tableDataManager;
- private ServerMetrics _serverMetrics;
public Builder setTableConfig(TableConfig tableConfig) {
_tableConfig = tableConfig;
@@ -121,6 +120,11 @@ public class DedupContext {
return this;
}
+ public Builder setEnablePreload(boolean enablePreload) {
+ _enablePreload = enablePreload;
+ return this;
+ }
+
public Builder setMetadataTTL(double metadataTTL) {
_metadataTTL = metadataTTL;
return this;
@@ -141,19 +145,14 @@ public class DedupContext {
return this;
}
- public Builder setServerMetrics(ServerMetrics serverMetrics) {
- _serverMetrics = serverMetrics;
- return this;
- }
-
public DedupContext build() {
Preconditions.checkState(_tableConfig != null, "Table config must be
set");
Preconditions.checkState(_schema != null, "Schema must be set");
Preconditions.checkState(CollectionUtils.isNotEmpty(_primaryKeyColumns),
"Primary key columns must be set");
Preconditions.checkState(_hashFunction != null, "Hash function must be
set");
Preconditions.checkState(_tableIndexDir != null, "Table index directory
must be set");
- return new DedupContext(_tableConfig, _schema, _primaryKeyColumns,
_hashFunction, _metadataTTL, _dedupTimeColumn,
- _tableIndexDir, _tableDataManager, _serverMetrics);
+ return new DedupContext(_tableConfig, _schema, _primaryKeyColumns,
_hashFunction, _enablePreload, _metadataTTL,
+ _dedupTimeColumn, _tableIndexDir, _tableDataManager);
}
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
index 835ce6dfa7..ff2667ec14 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
@@ -19,6 +19,8 @@
package org.apache.pinot.segment.local.dedup;
import java.io.Closeable;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.data.readers.PrimaryKey;
@@ -37,6 +39,22 @@ public interface PartitionDedupMetadataManager extends
Closeable {
addSegment(newSegment);
}
+ /**
+ * Preload segments for the table partition. Segments can be added
differently during preloading.
+ * TODO: As commented in PartitionUpsertMetadataManager, revisit this method
and see if we can use the same
+ * IndexLoadingConfig for all segments. Tier info might be different
for different segments.
+ */
+ void preloadSegments(IndexLoadingConfig indexLoadingConfig);
+
+ boolean isPreloading();
+
+ /**
+ * Different from adding a segment, when preloading a segment, the dedup
metadata may be updated more efficiently.
+ * Basically the dedup metadata can be directly updated for each primary
key, without doing the more costly
+ * read-compare-update.
+ */
+ void preloadSegment(ImmutableSegment immutableSegment);
+
/**
* Removes the dedup metadata for the given segment.
*/
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
index 5c0bd1d830..949f7ab669 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
@@ -36,6 +36,8 @@ public interface TableDedupMetadataManager extends Closeable {
*/
PartitionDedupMetadataManager getOrCreatePartitionManager(int partitionId);
+ boolean isEnablePreload();
+
/**
* Stops the metadata manager. After invoking this method, no access to the
metadata will be accepted.
*/
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManagerTest.java
new file mode 100644
index 0000000000..475d4a3929
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManagerTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import java.io.IOException;
+import java.util.Iterator;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class BasePartitionDedupMetadataManagerTest {
+ @Test
+ public void testPreloadSegments()
+ throws IOException {
+ String realtimeTableName = "testTable_REALTIME";
+ DedupContext dedupContext = mock(DedupContext.class);
+ when(dedupContext.isPreloadEnabled()).thenReturn(true);
+ TableDataManager tableDataManager = mock(TableDataManager.class);
+ when(dedupContext.getTableDataManager()).thenReturn(tableDataManager);
+ IndexLoadingConfig indexLoadingConfig = mock(IndexLoadingConfig.class);
+
when(indexLoadingConfig.getTableConfig()).thenReturn(mock(TableConfig.class));
+
+ try (DummyPartitionDedupMetadataManager dedupMetadataManager = new
DummyPartitionDedupMetadataManager(
+ realtimeTableName, 0, dedupContext)) {
+ assertTrue(dedupMetadataManager.isPreloading());
+ dedupMetadataManager.preloadSegments(indexLoadingConfig);
+ assertFalse(dedupMetadataManager.isPreloading());
+ dedupMetadataManager.stop();
+ }
+ }
+
+ private static class DummyPartitionDedupMetadataManager extends
BasePartitionDedupMetadataManager {
+
+ protected DummyPartitionDedupMetadataManager(String tableNameWithType, int
partitionId, DedupContext context) {
+ super(tableNameWithType, partitionId, context);
+ }
+
+ @Override
+ protected void doPreloadSegment(ImmutableSegment segment,
Iterator<DedupRecordInfo> dedupRecordInfoIterator) {
+ }
+
+ @Override
+ protected void doAddOrReplaceSegment(@Nullable IndexSegment oldSegment,
IndexSegment newSegment,
+ Iterator<DedupRecordInfo> dedupRecordInfoIteratorOfNewSegment) {
+ }
+
+ @Override
+ protected void doRemoveSegment(IndexSegment segment,
Iterator<DedupRecordInfo> dedupRecordInfoIterator) {
+ }
+
+ @Override
+ protected void doRemoveExpiredPrimaryKeys() {
+ }
+
+ @Override
+ protected long getNumPrimaryKeys() {
+ return 0;
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
index c0697eb4c3..2823d68d34 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.TreeMap;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
@@ -67,8 +66,7 @@ public class
ConcurrentMapPartitionDedupMetadataManagerWithTTLTest {
_dedupContextBuilder.setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
.setPrimaryKeyColumns(List.of("primaryKeyColumn")).setMetadataTTL(METADATA_TTL)
.setDedupTimeColumn(DEDUP_TIME_COLUMN_NAME).setTableIndexDir(mock(File.class))
-
.setTableDataManager(mock(TableDataManager.class)).setServerMetrics(mock(ServerMetrics.class))
- .setTableIndexDir(TEMP_DIR);
+
.setTableDataManager(mock(TableDataManager.class)).setTableIndexDir(TEMP_DIR);
}
@AfterMethod
@@ -81,8 +79,7 @@ public class
ConcurrentMapPartitionDedupMetadataManagerWithTTLTest {
DedupContext.Builder dedupContextBuider = new DedupContext.Builder();
dedupContextBuider.setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
.setPrimaryKeyColumns(List.of("primaryKeyColumn")).setHashFunction(HashFunction.NONE).setMetadataTTL(1)
-
.setDedupTimeColumn(null).setTableIndexDir(mock(File.class)).setTableDataManager(mock(TableDataManager.class))
- .setServerMetrics(mock(ServerMetrics.class));
+
.setDedupTimeColumn(null).setTableIndexDir(mock(File.class)).setTableDataManager(mock(TableDataManager.class));
DedupContext dedupContext = dedupContextBuider.build();
assertThrows(IllegalArgumentException.class,
() -> new
ConcurrentMapPartitionDedupMetadataManager(DedupTestUtils.REALTIME_TABLE_NAME,
0, dedupContext));
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
index f7eadeaf09..90c68f1f42 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
@@ -58,8 +57,7 @@ public class
ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest {
_dedupContextBuilder = new DedupContext.Builder();
_dedupContextBuilder.setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
.setPrimaryKeyColumns(List.of("primaryKeyColumn")).setTableIndexDir(mock(File.class))
-
.setTableDataManager(mock(TableDataManager.class)).setServerMetrics(mock(ServerMetrics.class))
- .setTableIndexDir(TEMP_DIR);
+
.setTableDataManager(mock(TableDataManager.class)).setTableIndexDir(TEMP_DIR);
}
@AfterMethod
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactoryTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactoryTest.java
new file mode 100644
index 0000000000..f3247c8227
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactoryTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.table.DedupConfig;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+
+
+public class TableDedupMetadataManagerFactoryTest {
+ @Test
+ public void testEnablePreload() {
+ DedupConfig dedupConfig =
+ new DedupConfig(true, HashFunction.MD5, null, Collections.emptyMap(),
10, "timeCol", true);
+ Schema schema =
+ new
Schema.SchemaBuilder().setSchemaName("mytable").addSingleValueDimension("myCol",
FieldSpec.DataType.STRING)
+ .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName("mytable").setDedupConfig(dedupConfig).build();
+
+ // Preloading is not enabled as there is no preloading thread.
+ TableDataManager tableDataManager = mock(TableDataManager.class);
+ when(tableDataManager.getTableDataDir()).thenReturn(new File("mytable"));
+ when(tableDataManager.getSegmentPreloadExecutor()).thenReturn(null);
+ TableDedupMetadataManager tableDedupMetadataManager =
+ TableDedupMetadataManagerFactory.create(tableConfig, schema,
tableDataManager, null);
+ assertNotNull(tableDedupMetadataManager);
+ assertFalse(tableDedupMetadataManager.isEnablePreload());
+
+ // Enabled as enablePreload is true and there is preloading thread.
+ tableDataManager = mock(TableDataManager.class);
+ when(tableDataManager.getTableDataDir()).thenReturn(new File("mytable"));
+
when(tableDataManager.getSegmentPreloadExecutor()).thenReturn(mock(ExecutorService.class));
+ tableDedupMetadataManager =
TableDedupMetadataManagerFactory.create(tableConfig, schema, tableDataManager,
null);
+ assertNotNull(tableDedupMetadataManager);
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
index 8c4594dc91..b4544979e3 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
@@ -65,7 +65,7 @@ public class MutableSegmentDedupeTest {
.setDedupConfig(new DedupConfig(dedupEnabled,
HashFunction.NONE)).build();
CompositeTransformer recordTransformer =
CompositeTransformer.getDefaultTransformer(tableConfig, schema);
File jsonFile = new File(dataResourceUrl.getFile());
- DedupConfig dedupConfig = new DedupConfig(true, HashFunction.NONE, null,
null, metadataTTL, dedupTimeColumn);
+ DedupConfig dedupConfig = new DedupConfig(true, HashFunction.NONE, null,
null, metadataTTL, dedupTimeColumn, false);
PartitionDedupMetadataManager partitionDedupMetadataManager =
(dedupEnabled) ? getTableDedupMetadataManager(schema,
dedupConfig).getOrCreatePartitionManager(0) : null;
_mutableSegmentImpl =
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
index 78d7b3b9f0..dfc8151e35 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
@@ -24,6 +24,7 @@ import
com.fasterxml.jackson.annotation.JsonPropertyDescription;
import java.util.Map;
import org.apache.pinot.spi.config.BaseJsonConfig;
+
public class DedupConfig extends BaseJsonConfig {
@JsonPropertyDescription("Whether dedup is enabled or not.")
private final boolean _dedupEnabled;
@@ -43,9 +44,12 @@ public class DedupConfig extends BaseJsonConfig {
+ " from the table config will be used.")
private final String _dedupTimeColumn;
+ @JsonPropertyDescription("Whether to preload segments for fast dedup
metadata recovery")
+ private final boolean _enablePreload;
+
public DedupConfig(@JsonProperty(value = "dedupEnabled", required = true)
boolean dedupEnabled,
@JsonProperty(value = "hashFunction") HashFunction hashFunction) {
- this(dedupEnabled, hashFunction, null, null, 0, null);
+ this(dedupEnabled, hashFunction, null, null, 0, null, false);
}
@JsonCreator
@@ -54,13 +58,15 @@ public class DedupConfig extends BaseJsonConfig {
@JsonProperty(value = "metadataManagerClass") String
metadataManagerClass,
@JsonProperty(value = "metadataManagerConfigs") Map<String, String>
metadataManagerConfigs,
@JsonProperty(value = "metadataTTL") double metadataTTL,
- @JsonProperty(value = "dedupTimeColumn") String dedupTimeColumn) {
+ @JsonProperty(value = "dedupTimeColumn") String dedupTimeColumn,
+ @JsonProperty(value = "enablePreload") boolean enablePreload) {
_dedupEnabled = dedupEnabled;
_hashFunction = hashFunction == null ? HashFunction.NONE : hashFunction;
_metadataManagerClass = metadataManagerClass;
_metadataManagerConfigs = metadataManagerConfigs;
_metadataTTL = metadataTTL;
_dedupTimeColumn = dedupTimeColumn;
+ _enablePreload = enablePreload;
}
public HashFunction getHashFunction() {
@@ -86,4 +92,8 @@ public class DedupConfig extends BaseJsonConfig {
public String getDedupTimeColumn() {
return _dedupTimeColumn;
}
+
+ public boolean isEnablePreload() {
+ return _enablePreload;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]