This is an automated email from the ASF dual-hosted git repository.
sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b8af790c2f Enable uploading segments to realtime tables (#8584)
b8af790c2f is described below
commit b8af790c2f26b5bafcc8f154e1dae2edc60b43c0
Author: Sajjad Moradi <[email protected]>
AuthorDate: Mon May 16 18:48:40 2022 -0700
Enable uploading segments to realtime tables (#8584)
---
.../apache/pinot/common/utils/SegmentUtils.java | 32 +++--
.../PinotSegmentUploadDownloadRestletResource.java | 13 +-
.../helix/core/PinotHelixResourceManager.java | 48 +++++---
.../segment/RealtimeSegmentAssignment.java | 50 ++++----
...altimeNonReplicaGroupSegmentAssignmentTest.java | 114 ++++++++++++++----
...NonReplicaGroupTieredSegmentAssignmentTest.java | 5 +-
.../RealtimeReplicaGroupSegmentAssignmentTest.java | 132 ++++++++++++++++-----
.../core/data/manager/BaseTableDataManager.java | 3 +-
.../manager/offline/OfflineTableDataManager.java | 12 --
.../manager/realtime/RealtimeTableDataManager.java | 5 +-
.../tests/LLCRealtimeClusterIntegrationTest.java | 96 ++++++++++++++-
.../tests/RealtimeClusterIntegrationTest.java | 16 ++-
12 files changed, 392 insertions(+), 134 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
index 21b3fb4f3e..04be4a1c58 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
@@ -19,7 +19,7 @@
package org.apache.pinot.common.utils;
import com.google.common.base.Preconditions;
-import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
@@ -35,32 +35,28 @@ public class SegmentUtils {
// Returns the partition id of a realtime segment based segment name and
segment metadata info retrieved via Helix.
// Important: The method is costly because it may read data from zookeeper.
Do not use it in any query execution
// path.
- public static int getRealtimeSegmentPartitionId(String segmentName, String
realtimeTableName,
- HelixManager helixManager, String partitionColumn) {
+ @Nullable
+ public static Integer getRealtimeSegmentPartitionId(String segmentName,
String realtimeTableName,
+ HelixManager helixManager,
+ String partitionColumn) {
// A fast path if the segmentName is a LLC segment name and we can get the
partition id from the name directly.
if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
return new LLCSegmentName(segmentName).getPartitionGroupId();
}
- // Otherwise, retrieve the partition id from the segment zk metadata.
Currently only realtime segments from upsert
- // enabled tables have partition ids in their segment metadata.
+ // Otherwise, retrieve the partition id from the segment zk metadata.
SegmentZKMetadata segmentZKMetadata =
ZKMetadataProvider.getSegmentZKMetadata(helixManager.getHelixPropertyStore(),
realtimeTableName, segmentName);
Preconditions
.checkState(segmentZKMetadata != null, "Failed to find segment ZK
metadata for segment: %s of table: %s",
segmentName, realtimeTableName);
SegmentPartitionMetadata segmentPartitionMetadata =
segmentZKMetadata.getPartitionMetadata();
- Preconditions.checkState(segmentPartitionMetadata != null,
- "Segment ZK metadata for segment: %s of table: %s does not contain
partition metadata", segmentName,
- realtimeTableName);
- ColumnPartitionMetadata columnPartitionMetadata =
- segmentPartitionMetadata.getColumnPartitionMap().get(partitionColumn);
- Preconditions.checkState(columnPartitionMetadata != null,
- "Segment ZK metadata for segment: %s of table: %s does not contain
partition metadata for column: %s. Check "
- + "if the table is an upsert table.", segmentName,
realtimeTableName, partitionColumn);
- Set<Integer> partitions = columnPartitionMetadata.getPartitions();
- Preconditions.checkState(partitions.size() == 1,
- "Segment ZK metadata for segment: %s of table: %s contains multiple
partitions for column: %s with %s",
- segmentName, realtimeTableName, partitionColumn, partitions);
- return partitions.iterator().next();
+ if (segmentPartitionMetadata != null) {
+ ColumnPartitionMetadata columnPartitionMetadata =
+
segmentPartitionMetadata.getColumnPartitionMap().get(partitionColumn);
+ if (columnPartitionMetadata != null &&
columnPartitionMetadata.getPartitions().size() == 1) {
+ return columnPartitionMetadata.getPartitions().iterator().next();
+ }
+ }
+ return null;
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 01aa8c832b..0590fb2b1a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -292,16 +292,9 @@ public class PinotSegmentUploadDownloadRestletResource {
LOGGER.warn("Table name is not provided as request query parameter
when uploading segment: {} for table: {}",
segmentName, rawTableName);
}
- String tableNameWithType;
- if (tableType == TableType.OFFLINE) {
- tableNameWithType =
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
- } else {
- tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
- if (!_pinotHelixResourceManager.isUpsertTable(tableNameWithType)) {
- throw new ControllerApplicationException(LOGGER,
- "Cannot upload segment to non-upsert real-time table: " +
tableNameWithType, Response.Status.FORBIDDEN);
- }
- }
+ String tableNameWithType = tableType == TableType.OFFLINE
+ ? TableNameBuilder.OFFLINE.tableNameWithType(rawTableName)
+ : TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
String clientAddress =
InetAddress.getByName(request.getRemoteAddr()).getHostName();
LOGGER.info("Processing upload request for segment: {} of table: {} with
upload type: {} from client: {}, "
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index f9d0e86447..49fe4cdd9e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -133,6 +133,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableCustomConfig;
import org.apache.pinot.spi.config.table.TableStats;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TagOverrideConfig;
import org.apache.pinot.spi.config.table.TenantConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
@@ -167,7 +168,7 @@ public class PinotHelixResourceManager {
// TODO: make this configurable
public static final long EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS = 10 *
60_000L; // 10 minutes
- public static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1_000L; // 1
secondL
+ public static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1_000L; // 1
second
private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new
SimpleDateFormat("yyyyMMdd'T'HHmmss'Z'");
@@ -1997,26 +1998,14 @@ public class PinotHelixResourceManager {
public void assignTableSegment(String tableNameWithType, String segmentName)
{
String segmentZKMetadataPath =
ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType,
segmentName);
- InstancePartitionsType instancePartitionsType;
- if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
- // In an upsert enabled LLC realtime table, all segments of the same
partition are collocated on the same server
- // -- consuming or completed. So it is fine to use CONSUMING as the
InstancePartitionsType.
- // TODO When upload segments is open to all realtime tables, we should
change the type to COMPLETED instead.
- // In addition, RealtimeSegmentAssignment.assignSegment(..) method
should be updated so that the method does not
- // assign segments to CONSUMING instance partition only.
- instancePartitionsType = InstancePartitionsType.CONSUMING;
- } else {
- instancePartitionsType = InstancePartitionsType.OFFLINE;
- }
// Assign instances for the segment and add it into IdealState
try {
TableConfig tableConfig = getTableConfig(tableNameWithType);
Preconditions.checkState(tableConfig != null, "Failed to find table
config for table: " + tableNameWithType);
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+ fetchOrComputeInstancePartitions(tableNameWithType, tableConfig);
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig);
- Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
Collections
- .singletonMap(instancePartitionsType, InstancePartitionsUtils
- .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig,
instancePartitionsType));
synchronized (getTableUpdaterLock(tableNameWithType)) {
HelixHelper.updateIdealState(_helixZkManager, tableNameWithType,
idealState -> {
assert idealState != null;
@@ -2050,6 +2039,35 @@ public class PinotHelixResourceManager {
}
}
+ private Map<InstancePartitionsType, InstancePartitions>
fetchOrComputeInstancePartitions(String tableNameWithType,
+ TableConfig tableConfig) {
+ if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+ return Collections.singletonMap(InstancePartitionsType.OFFLINE,
InstancePartitionsUtils
+ .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig,
InstancePartitionsType.OFFLINE));
+ }
+ if (tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE) {
+ // In an upsert enabled LLC realtime table, all segments of the same
partition are collocated on the same server
+ // -- consuming or completed. So it is fine to use CONSUMING as the
InstancePartitionsType.
+ return Collections.singletonMap(InstancePartitionsType.CONSUMING,
InstancePartitionsUtils
+ .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig,
InstancePartitionsType.CONSUMING));
+ }
+ // for non-upsert realtime tables, if COMPLETED instance partitions is
available or tag override for
+ // completed segments is provided in the tenant config, COMPLETED instance
partitions type is used
+ // otherwise CONSUMING instance partitions type is used.
+ InstancePartitionsType instancePartitionsType =
InstancePartitionsType.COMPLETED;
+ InstancePartitions instancePartitions =
InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
+ InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType,
instancePartitionsType.toString()));
+ if (instancePartitions != null) {
+ return Collections.singletonMap(instancePartitionsType,
instancePartitions);
+ }
+ TagOverrideConfig tagOverrideConfig =
tableConfig.getTenantConfig().getTagOverrideConfig();
+ if (tagOverrideConfig == null || tagOverrideConfig.getRealtimeCompleted()
== null) {
+ instancePartitionsType = InstancePartitionsType.CONSUMING;
+ }
+ return Collections.singletonMap(instancePartitionsType,
+
InstancePartitionsUtils.computeDefaultInstancePartitions(_helixZkManager,
tableConfig, instancePartitionsType));
+ }
+
public boolean isUpsertTable(String tableName) {
TableConfig realtimeTableConfig =
getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(tableName));
if (realtimeTableConfig == null) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index c97b53e75c..3ede57c6d7 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -104,18 +104,20 @@ public class RealtimeSegmentAssignment implements
SegmentAssignment {
@Override
public List<String> assignSegment(String segmentName, Map<String,
Map<String, String>> currentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
- InstancePartitions instancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
- Preconditions.checkState(instancePartitions != null, "Failed to find
CONSUMING instance partitions for table: %s",
- _realtimeTableName);
+ Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance
partition type should be provided");
+ Map.Entry<InstancePartitionsType, InstancePartitions>
typeToInstancePartitions =
+ instancePartitionsMap.entrySet().iterator().next();
+ InstancePartitionsType instancePartitionsType =
typeToInstancePartitions.getKey();
+ InstancePartitions instancePartitions =
typeToInstancePartitions.getValue();
Preconditions
.checkState(instancePartitions.getNumPartitions() == 1, "Instance
partitions: %s should contain 1 partition",
instancePartitions.getInstancePartitionsName());
LOGGER.info("Assigning segment: {} with instance partitions: {} for table:
{}", segmentName, instancePartitions,
_realtimeTableName);
checkReplication(instancePartitions);
-
- List<String> instancesAssigned = assignConsumingSegment(segmentName,
instancePartitions);
-
+ List<String> instancesAssigned = instancePartitionsType ==
InstancePartitionsType.CONSUMING
+ ? assignConsumingSegment(segmentName, instancePartitions)
+ : assignCompletedSegment(segmentName, currentAssignment,
instancePartitions);
LOGGER.info("Assigned segment: {} to instances: {} for table: {}",
segmentName, instancesAssigned,
_realtimeTableName);
return instancesAssigned;
@@ -141,9 +143,7 @@ public class RealtimeSegmentAssignment implements
SegmentAssignment {
* Helper method to assign instances for CONSUMING segment based on the
segment partition id and instance partitions.
*/
private List<String> assignConsumingSegment(String segmentName,
InstancePartitions instancePartitions) {
- int partitionGroupId =
- SegmentUtils.getRealtimeSegmentPartitionId(segmentName,
_realtimeTableName, _helixManager, _partitionColumn);
-
+ int partitionGroupId = getPartitionGroupId(segmentName);
int numReplicaGroups = instancePartitions.getNumReplicaGroups();
if (numReplicaGroups == 1) {
// Non-replica-group based assignment:
@@ -158,7 +158,8 @@ public class RealtimeSegmentAssignment implements
SegmentAssignment {
int numInstances = instances.size();
List<String> instancesAssigned = new ArrayList<>(_replication);
for (int replicaId = 0; replicaId < _replication; replicaId++) {
- instancesAssigned.add(instances.get((partitionGroupId * _replication +
replicaId) % numInstances));
+ int instanceIndex = (partitionGroupId * _replication + replicaId) %
numInstances;
+ instancesAssigned.add(instances.get(instanceIndex));
}
return instancesAssigned;
} else {
@@ -185,8 +186,9 @@ public class RealtimeSegmentAssignment implements
SegmentAssignment {
InstancePartitions completedInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
InstancePartitions consumingInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
- Preconditions.checkState(consumingInstancePartitions != null,
- "Failed to find COMPLETED or CONSUMING instance partitions for table:
%s", _realtimeTableName);
+ Preconditions
+ .checkState(consumingInstancePartitions != null, "Failed to find
CONSUMING instance partitions for table: %s",
+ _realtimeTableName);
Preconditions.checkState(consumingInstancePartitions.getNumPartitions() ==
1,
"Instance partitions: %s should contain 1 partition",
consumingInstancePartitions.getInstancePartitionsName());
boolean includeConsuming = config
@@ -331,8 +333,7 @@ public class RealtimeSegmentAssignment implements
SegmentAssignment {
int numPartitions = instancePartitions.getNumPartitions();
Map<Integer, List<String>> instancePartitionIdToSegmentsMap = new
HashMap<>();
for (String segmentName : currentAssignment.keySet()) {
- int partitionGroupId = SegmentUtils
- .getRealtimeSegmentPartitionId(segmentName, _realtimeTableName,
_helixManager, _partitionColumn);
+ int partitionGroupId = getPartitionGroupId(segmentName);
int instancePartitionId = partitionGroupId % numPartitions;
instancePartitionIdToSegmentsMap.computeIfAbsent(instancePartitionId, k -> new
ArrayList<>())
.add(segmentName);
@@ -368,12 +369,21 @@ public class RealtimeSegmentAssignment implements
SegmentAssignment {
// Replica-group based assignment
// Uniformly spray the segment partitions over the instance partitions
- int segmentPartitionId =
- SegmentUtils.getRealtimeSegmentPartitionId(segmentName,
_realtimeTableName, _helixManager, _partitionColumn);
- int numPartitions = instancePartitions.getNumPartitions();
- int partitionGroupId = segmentPartitionId % numPartitions;
- return SegmentAssignmentUtils
- .assignSegmentWithReplicaGroup(currentAssignment,
instancePartitions, partitionGroupId);
+ int partitionId = getPartitionGroupId(segmentName) %
instancePartitions.getNumPartitions();
+ return
SegmentAssignmentUtils.assignSegmentWithReplicaGroup(currentAssignment,
instancePartitions, partitionId);
+ }
+ }
+
+ private int getPartitionGroupId(String segmentName) {
+ Integer segmentPartitionId =
+ SegmentUtils.getRealtimeSegmentPartitionId(segmentName,
_realtimeTableName, _helixManager, _partitionColumn);
+ if (segmentPartitionId == null) {
+ // This case is for the uploaded segments for which there's no partition
information.
+ // A random, but consistent, partition id is calculated based on the
hash code of the segment name.
+ // Note that '% 10K' is used to prevent having partition ids with large
value which will be problematic later in
+ // instance assignment formula.
+ segmentPartitionId = Math.abs(segmentName.hashCode() % 10_000);
}
+ return segmentPartitionId;
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
index 1fe98d8eba..19d5235d77 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
@@ -18,11 +18,16 @@
*/
package org.apache.pinot.controller.helix.core.assignment.segment;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -34,6 +39,11 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -71,7 +81,7 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest {
TableConfig tableConfig =
new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
.setLLC(true).build();
- _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(null,
tableConfig);
+ _segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(),
tableConfig);
_instancePartitionsMap = new TreeMap<>();
// CONSUMING instances:
@@ -102,11 +112,13 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest
{
@Test
public void testAssignSegment() {
+ Map<InstancePartitionsType, InstancePartitions>
onlyConsumingInstancePartitionMap =
+ ImmutableMap.of(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
String segmentName = _segments.get(segmentId);
List<String> instancesAssigned =
- _segmentAssignment.assignSegment(segmentName, currentAssignment,
_instancePartitionsMap);
+ _segmentAssignment.assignSegment(segmentName, currentAssignment,
onlyConsumingInstancePartitionMap);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
// Segment 0 (partition 0) should be assigned to instance 0, 1, 2
@@ -128,11 +140,13 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest
{
@Test
public void testRelocateCompletedSegments() {
+ Map<InstancePartitionsType, InstancePartitions>
onlyConsumingInstancePartitionMap =
+ ImmutableMap.of(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
String segmentName = _segments.get(segmentId);
List<String> instancesAssigned =
- _segmentAssignment.assignSegment(segmentName, currentAssignment,
_instancePartitionsMap);
+ _segmentAssignment.assignSegment(segmentName, currentAssignment,
onlyConsumingInstancePartitionMap);
addToAssignment(currentAssignment, segmentId, instancesAssigned);
}
@@ -150,10 +164,31 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest
{
SegmentStateModel.OFFLINE);
currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+ // Add an uploaded ONLINE segment to the consuming instances (i.e. no
separation between consuming & completed)
+ List<String> uploadedSegmentNames = ImmutableList.of("UploadedSegment1",
"UploadedSegment2");
+ onlyConsumingInstancePartitionMap =
+ ImmutableMap.of(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
+ for (String uploadedSegName : uploadedSegmentNames) {
+ List<String> instancesAssigned =
+ _segmentAssignment.assignSegment(uploadedSegName, currentAssignment,
onlyConsumingInstancePartitionMap);
+ currentAssignment.put(uploadedSegName,
+ SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.ONLINE));
+ }
+ // Now there should be 103 segments assigned
+ assertEquals(currentAssignment.size(), NUM_SEGMENTS +
uploadedSegmentNames.size() + 1);
+ // Each segment should have 3 replicas and all assigned instances should
be prefixed with consuming
+ currentAssignment.forEach((type, instanceStateMap) -> {
+ assertEquals(instanceStateMap.size(), NUM_REPLICAS);
+ instanceStateMap.forEach((instance, state) -> {
+ if (!instance.startsWith("badInstance_")) {
+ assertTrue(instance.startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
+ }
+ });
+ });
+
// Rebalance without COMPLETED instance partitions should not change the
segment assignment
- Map<InstancePartitionsType, InstancePartitions>
noRelocationInstancePartitionsMap = new TreeMap<>();
- noRelocationInstancePartitionsMap
- .put(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
+ Map<InstancePartitionsType, InstancePartitions>
noRelocationInstancePartitionsMap =
+ ImmutableMap.of(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
assertEquals(_segmentAssignment
.rebalanceTable(currentAssignment,
noRelocationInstancePartitionsMap, null, null, new BaseConfiguration()),
currentAssignment);
@@ -162,29 +197,36 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest
{
// instances
Map<String, Map<String, String>> newAssignment = _segmentAssignment
.rebalanceTable(currentAssignment, _instancePartitionsMap, null, null,
new BaseConfiguration());
- assertEquals(newAssignment.size(), NUM_SEGMENTS + 1);
+ assertEquals(newAssignment.size(), NUM_SEGMENTS +
uploadedSegmentNames.size() + 1);
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) {
- // COMPLETED (ONLINE) segments
- Map<String, String> instanceStateMap =
newAssignment.get(_segments.get(segmentId));
- for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
-
assertTrue(entry.getKey().startsWith(COMPLETED_INSTANCE_NAME_PREFIX));
- assertEquals(entry.getValue(), SegmentStateModel.ONLINE);
- }
+ // check COMPLETED (ONLINE) segments
+ newAssignment.get(_segments.get(segmentId)).forEach((instance, state)
-> {
+ assertTrue(instance.startsWith(COMPLETED_INSTANCE_NAME_PREFIX));
+ assertEquals(state, SegmentStateModel.ONLINE);
+ });
} else {
- // CONSUMING segments
- Map<String, String> instanceStateMap =
newAssignment.get(_segments.get(segmentId));
- for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
-
assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
- assertEquals(entry.getValue(), SegmentStateModel.CONSUMING);
- }
+ // check CONSUMING segments
+ newAssignment.get(_segments.get(segmentId)).forEach((instance, state)
-> {
+ assertTrue(instance.startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
+ assertEquals(state, SegmentStateModel.CONSUMING);
+ });
}
}
- // Relocated segments should be balanced (each instance should have at
least 28 segments assigned)
+ // check the uploaded segments
+ for (String uploadedSegName : uploadedSegmentNames) {
+ newAssignment.get(uploadedSegName).forEach((instance, state) -> {
+ assertTrue(instance.startsWith(COMPLETED_INSTANCE_NAME_PREFIX));
+ assertEquals(state, SegmentStateModel.ONLINE);
+ });
+ }
+
+ // Relocated segments should be balanced (each instance should have at
least 29 segments assigned)
int[] numSegmentsAssignedPerInstance =
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment,
COMPLETED_INSTANCES);
assertEquals(numSegmentsAssignedPerInstance.length,
NUM_COMPLETED_INSTANCES);
- int expectedMinNumSegmentsPerInstance = (NUM_SEGMENTS - NUM_PARTITIONS) *
NUM_REPLICAS / NUM_COMPLETED_INSTANCES;
+ int expectedMinNumSegmentsPerInstance =
+ (NUM_SEGMENTS - NUM_PARTITIONS) * NUM_REPLICAS /
NUM_COMPLETED_INSTANCES + 1;
for (int i = 0; i < NUM_COMPLETED_INSTANCES; i++) {
assertTrue(numSegmentsAssignedPerInstance[i] >=
expectedMinNumSegmentsPerInstance);
}
@@ -232,6 +274,28 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest {
}
}
+ @Test
+ public void testAssignSegmentForUploadedSegments() {
+ // CONSUMING instance partition has been tested in previous method, only
test COMPLETED here
+ Map<InstancePartitionsType, InstancePartitions>
onlyCompletedInstancePartitionMap =
+ ImmutableMap.of(InstancePartitionsType.COMPLETED,
_instancePartitionsMap.get(InstancePartitionsType.COMPLETED));
+ Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
+ Map<String, List<String>> expectedUploadedSegmentToInstances =
ImmutableMap.of(
+ "uploadedSegment_0", ImmutableList.of("completedInstance_0",
"completedInstance_1", "completedInstance_2"),
+ "uploadedSegment_1", ImmutableList.of("completedInstance_3",
"completedInstance_4", "completedInstance_5"),
+ "uploadedSegment_2", ImmutableList.of("completedInstance_6",
"completedInstance_7", "completedInstance_8"),
+ "uploadedSegment_3", ImmutableList.of("completedInstance_9",
"completedInstance_0", "completedInstance_1"),
+ "uploadedSegment_4", ImmutableList.of("completedInstance_2",
"completedInstance_3", "completedInstance_4")
+ );
+ expectedUploadedSegmentToInstances.forEach((segmentName,
expectedInstances) -> {
+ List<String> actualInstances =
+ _segmentAssignment.assignSegment(segmentName, currentAssignment,
onlyCompletedInstancePartitionMap);
+ assertEquals(actualInstances, expectedInstances);
+ currentAssignment
+ .put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(actualInstances,
SegmentStateModel.ONLINE));
+ });
+ }
+
private void addToAssignment(Map<String, Map<String, String>>
currentAssignment, int segmentId,
List<String> instancesAssigned) {
// Change the state of the last segment in the same partition from
CONSUMING to ONLINE if exists
@@ -246,4 +310,12 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest {
currentAssignment.put(_segments.get(segmentId),
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.CONSUMING));
}
+
+ private HelixManager createHelixManager() {
+ HelixManager helixManager = mock(HelixManager.class);
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
+ when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+ when(propertyStore.get(anyString(), isNull(), anyInt())).thenReturn(new
ZNRecord("0"));
+ return helixManager;
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
index 1724d0aeb8..97aa23aab2 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.controller.helix.core.assignment.segment;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.HashMap;
@@ -162,11 +163,13 @@ public class
RealtimeNonReplicaGroupTieredSegmentAssignmentTest {
@Test
public void testRelocateCompletedSegments() {
+ Map<InstancePartitionsType, InstancePartitions>
onlyConsumingInstancePartitionMap =
+ ImmutableMap.of(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
String segmentName = _segments.get(segmentId);
List<String> instancesAssigned =
- _segmentAssignment.assignSegment(segmentName, currentAssignment,
_instancePartitionsMap);
+ _segmentAssignment.assignSegment(segmentName, currentAssignment,
onlyConsumingInstancePartitionMap);
addToAssignment(currentAssignment, segmentId, instancesAssigned);
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
index 09cd07bc9e..ba8929d267 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
@@ -18,12 +18,16 @@
*/
package org.apache.pinot.controller.helix.core.assignment.segment;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -36,6 +40,11 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -74,7 +83,7 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
.setLLC(true).setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)
.build();
- _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(null,
tableConfig);
+ _segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(),
tableConfig);
_instancePartitionsMap = new TreeMap<>();
// CONSUMING instances:
@@ -116,19 +125,16 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
_instancePartitionsMap.put(InstancePartitionsType.COMPLETED,
completedInstancePartitions);
}
- @Test
- public void testFactory() {
- assertTrue(_segmentAssignment instanceof RealtimeSegmentAssignment);
- }
-
@Test
public void testAssignSegment() {
+ Map<InstancePartitionsType, InstancePartitions>
onlyConsumingInstancePartitionMap =
+ ImmutableMap.of(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
int numInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / NUM_REPLICAS;
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
String segmentName = _segments.get(segmentId);
List<String> instancesAssigned =
- _segmentAssignment.assignSegment(segmentName, currentAssignment,
_instancePartitionsMap);
+ _segmentAssignment.assignSegment(segmentName, currentAssignment,
onlyConsumingInstancePartitionMap);
assertEquals(instancesAssigned.size(), NUM_REPLICAS);
// Segment 0 (partition 0) should be assigned to instance 0, 3, 6
@@ -151,11 +157,13 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
@Test
public void testRelocateCompletedSegments() {
+ Map<InstancePartitionsType, InstancePartitions>
onlyConsumingInstancePartitionMap =
+ ImmutableMap.of(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
String segmentName = _segments.get(segmentId);
List<String> instancesAssigned =
- _segmentAssignment.assignSegment(segmentName, currentAssignment,
_instancePartitionsMap);
+ _segmentAssignment.assignSegment(segmentName, currentAssignment,
onlyConsumingInstancePartitionMap);
addToAssignment(currentAssignment, segmentId, instancesAssigned);
}
@@ -173,10 +181,31 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
SegmentStateModel.OFFLINE);
currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+ // Add 3 uploaded ONLINE segments to the consuming instances (i.e. no
separation between consuming & completed)
+ List<String> uploadedSegmentNames = ImmutableList.of("UploadedSegment0",
"UploadedSegment1", "UploadedSegment2");
+ onlyConsumingInstancePartitionMap =
+ ImmutableMap.of(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
+ for (String uploadedSegName : uploadedSegmentNames) {
+ List<String> instancesAssigned =
+ _segmentAssignment.assignSegment(uploadedSegName, currentAssignment,
onlyConsumingInstancePartitionMap);
+ currentAssignment.put(uploadedSegName,
+ SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.ONLINE));
+ }
+
+ assertEquals(currentAssignment.size(), NUM_SEGMENTS +
uploadedSegmentNames.size() + 1);
+ // Each segment should have 3 replicas and all assigned instances should
be prefixed with consuming
+ currentAssignment.forEach((type, instanceStateMap) -> {
+ assertEquals(instanceStateMap.size(), NUM_REPLICAS);
+ instanceStateMap.forEach((instance, state) -> {
+ if (!instance.startsWith("badInstance_")) {
+ assertTrue(instance.startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
+ }
+ });
+ });
+
// Rebalance without COMPLETED instance partitions should not change the
segment assignment
- Map<InstancePartitionsType, InstancePartitions>
noRelocationInstancePartitionsMap = new TreeMap<>();
- noRelocationInstancePartitionsMap
- .put(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
+ Map<InstancePartitionsType, InstancePartitions>
noRelocationInstancePartitionsMap =
+ ImmutableMap.of(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
assertEquals(_segmentAssignment
.rebalanceTable(currentAssignment,
noRelocationInstancePartitionsMap, null, null, new BaseConfiguration()),
currentAssignment);
@@ -185,31 +214,38 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
// instances
Map<String, Map<String, String>> newAssignment = _segmentAssignment
.rebalanceTable(currentAssignment, _instancePartitionsMap, null, null,
new BaseConfiguration());
- assertEquals(newAssignment.size(), NUM_SEGMENTS + 1);
+ assertEquals(newAssignment.size(), NUM_SEGMENTS +
uploadedSegmentNames.size() + 1);
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) {
- // COMPLETED (ONLINE) segments
- Map<String, String> instanceStateMap =
newAssignment.get(_segments.get(segmentId));
- for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
-
assertTrue(entry.getKey().startsWith(COMPLETED_INSTANCE_NAME_PREFIX));
- assertEquals(entry.getValue(), SegmentStateModel.ONLINE);
- }
+ // check COMPLETED (ONLINE) segments
+ newAssignment.get(_segments.get(segmentId)).forEach((instance, state)
-> {
+ assertTrue(instance.startsWith(COMPLETED_INSTANCE_NAME_PREFIX));
+ assertEquals(state, SegmentStateModel.ONLINE);
+ });
} else {
- // CONSUMING segments
- Map<String, String> instanceStateMap =
newAssignment.get(_segments.get(segmentId));
- for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
-
assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
- assertEquals(entry.getValue(), SegmentStateModel.CONSUMING);
- }
+ // check CONSUMING segments
+ newAssignment.get(_segments.get(segmentId)).forEach((instance, state)
-> {
+ assertTrue(instance.startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
+ assertEquals(state, SegmentStateModel.CONSUMING);
+ });
}
}
- // Relocated segments should be balanced (each instance should have 24
segments assigned)
+ // check the uploaded segments
+ for (String uploadedSegName : uploadedSegmentNames) {
+ newAssignment.get(uploadedSegName).forEach((instance, state) -> {
+ assertTrue(instance.startsWith(COMPLETED_INSTANCE_NAME_PREFIX));
+ assertEquals(state, SegmentStateModel.ONLINE);
+ });
+ }
+
+ // Relocated segments should be balanced
int[] numSegmentsAssignedPerInstance =
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment,
COMPLETED_INSTANCES);
- int[] expectedNumSegmentsAssignedPerInstance = new
int[NUM_COMPLETED_INSTANCES];
- int numSegmentsPerInstance = (NUM_SEGMENTS - NUM_PARTITIONS) *
NUM_REPLICAS / NUM_COMPLETED_INSTANCES;
- Arrays.fill(expectedNumSegmentsAssignedPerInstance,
numSegmentsPerInstance);
- assertEquals(numSegmentsAssignedPerInstance,
expectedNumSegmentsAssignedPerInstance);
+ int expectedNumSegmentsPerInstance = (NUM_SEGMENTS - NUM_PARTITIONS) *
NUM_REPLICAS / NUM_COMPLETED_INSTANCES;
+ for (int actualNumSegments : numSegmentsAssignedPerInstance) {
+ assertTrue(actualNumSegments == expectedNumSegmentsPerInstance
+ || actualNumSegments == expectedNumSegmentsPerInstance + 1);
+ }
// Rebalance with COMPLETED instance partitions including CONSUMING
segments should give the same assignment
BaseConfiguration rebalanceConfig = new BaseConfiguration();
@@ -256,6 +292,34 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
}
}
+ @Test
+ public void testAssignSegmentForUploadedSegments() {
+ // CONSUMING instance partition has been tested in previous method, only
test COMPLETED here
+ Map<InstancePartitionsType, InstancePartitions>
onlyCompletedInstancePartitionMap =
+ ImmutableMap.of(InstancePartitionsType.COMPLETED,
_instancePartitionsMap.get(InstancePartitionsType.COMPLETED));
+ Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
+ // COMPLETED instances:
+ // {
+ // 0_0=[instance_0, instance_1, instance_2, instance_3],
+ // 0_1=[instance_4, instance_5, instance_6, instance_7],
+ // 0_2=[instance_8, instance_9, instance_10, instance_11]
+ // }
+ Map<String, List<String>> expectedUploadedSegmentToInstances =
ImmutableMap.of(
+ "uploadedSegment_0", ImmutableList.of("completedInstance_0",
"completedInstance_4", "completedInstance_8"),
+ "uploadedSegment_1", ImmutableList.of("completedInstance_1",
"completedInstance_5", "completedInstance_9"),
+ "uploadedSegment_2", ImmutableList.of("completedInstance_2",
"completedInstance_6", "completedInstance_10"),
+ "uploadedSegment_3", ImmutableList.of("completedInstance_3",
"completedInstance_7", "completedInstance_11"),
+ "uploadedSegment_4", ImmutableList.of("completedInstance_0",
"completedInstance_4", "completedInstance_8")
+ );
+ expectedUploadedSegmentToInstances.forEach((segmentName,
expectedInstances) -> {
+ List<String> actualInstances =
+ _segmentAssignment.assignSegment(segmentName, currentAssignment,
onlyCompletedInstancePartitionMap);
+ assertEquals(actualInstances, expectedInstances);
+ currentAssignment
+ .put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(actualInstances,
SegmentStateModel.ONLINE));
+ });
+ }
+
private void addToAssignment(Map<String, Map<String, String>>
currentAssignment, int segmentId,
List<String> instancesAssigned) {
// Change the state of the last segment in the same partition from
CONSUMING to ONLINE if exists
@@ -270,4 +334,12 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
currentAssignment.put(_segments.get(segmentId),
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.CONSUMING));
}
+
+ private HelixManager createHelixManager() {
+ HelixManager helixManager = mock(HelixManager.class);
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
+ when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+ when(propertyStore.get(anyString(), isNull(), anyInt())).thenReturn(new
ZNRecord("0"));
+ return helixManager;
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 6276db12c0..5b28f04fd3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -189,7 +189,8 @@ public abstract class BaseTableDataManager implements
TableDataManager {
@Override
public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig)
throws Exception {
- throw new UnsupportedOperationException();
+ Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType);
+ addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
schema));
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
index e3cf839b61..7e17da42cb 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
@@ -18,13 +18,8 @@
*/
package org.apache.pinot.core.data.manager.offline;
-import java.io.File;
import javax.annotation.concurrent.ThreadSafe;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.core.data.manager.BaseTableDataManager;
-import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.spi.data.Schema;
/**
@@ -44,11 +39,4 @@ public class OfflineTableDataManager extends
BaseTableDataManager {
@Override
protected void doShutdown() {
}
-
- @Override
- public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig)
- throws Exception {
- Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType);
- addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
schema));
- }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 730bb81752..198023632c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -369,8 +369,11 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
private void handleUpsert(ImmutableSegmentImpl immutableSegment) {
String segmentName = immutableSegment.getSegmentName();
- int partitionGroupId = SegmentUtils
+ Integer partitionGroupId = SegmentUtils
.getRealtimeSegmentPartitionId(segmentName, _tableNameWithType,
_helixManager, _primaryKeyColumns.get(0));
+ Preconditions.checkNotNull(partitionGroupId, String
+ .format("PartitionGroupId is not available for segment: '%s'
(upsert-enabled table: %s)", segmentName,
+ _tableNameWithType));
PartitionUpsertMetadataManager partitionUpsertMetadataManager =
_tableUpsertMetadataManager.getOrCreatePartitionManager(partitionGroupId);
ThreadSafeMutableRoaringBitmap validDocIds = new
ThreadSafeMutableRoaringBitmap();
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 864f6b7287..7fb120cfb7 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -21,16 +21,24 @@ package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.commons.io.FileUtils;
+import org.apache.http.HttpStatus;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.ReadMode;
@@ -42,6 +50,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -103,6 +112,85 @@ public class LLCRealtimeClusterIntegrationTest extends
RealtimeClusterIntegratio
}
}
+ @Override
+ protected void createSegmentsAndUpload(List<File> avroFiles, Schema schema,
TableConfig tableConfig)
+ throws Exception {
+ if (!_tarDir.exists()) {
+ _tarDir.mkdir();
+ }
+ if (!_segmentDir.exists()) {
+ _segmentDir.mkdir();
+ }
+
+ // create segments out of the avro files (segments will be placed in
_tarDir)
+ List<File> copyOfAvroFiles = new ArrayList<>(avroFiles);
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(copyOfAvroFiles,
tableConfig, schema, 0, _segmentDir, _tarDir);
+
+ // upload segments to controller
+ uploadSegmentsToController(getTableName(), _tarDir, false, false);
+
+ // upload the first segment again to verify refresh
+ uploadSegmentsToController(getTableName(), _tarDir, true, false);
+
+ // upload the first segment again to verify refresh with different segment
crc
+ uploadSegmentsToController(getTableName(), _tarDir, true, true);
+
+ // add avro files to the original list so H2 will have the uploaded data
as well
+ avroFiles.addAll(copyOfAvroFiles);
+ }
+
+ private void uploadSegmentsToController(String tableName, File tarDir,
boolean onlyFirstSegment, boolean changeCrc)
+ throws Exception {
+ File[] segmentTarFiles = tarDir.listFiles();
+ assertNotNull(segmentTarFiles);
+ int numSegments = segmentTarFiles.length;
+ assertTrue(numSegments > 0);
+ if (onlyFirstSegment) {
+ numSegments = 1;
+ }
+ URI uploadSegmentHttpURI =
FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
+ try (FileUploadDownloadClient fileUploadDownloadClient = new
FileUploadDownloadClient()) {
+ if (numSegments == 1) {
+ File segmentTarFile = segmentTarFiles[0];
+ if (changeCrc) {
+ changeCrcInSegmentZKMetadata(tableName, segmentTarFile.toString());
+ }
+ assertEquals(fileUploadDownloadClient
+ .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(),
segmentTarFile, tableName,
+ TableType.REALTIME).getStatusCode(), HttpStatus.SC_OK);
+ } else {
+ // Upload segments in parallel
+ ExecutorService executorService =
Executors.newFixedThreadPool(numSegments);
+ List<Future<Integer>> futures = new ArrayList<>(numSegments);
+ for (File segmentTarFile : segmentTarFiles) {
+ futures.add(executorService.submit(() -> fileUploadDownloadClient
+ .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(),
segmentTarFile, tableName,
+ TableType.REALTIME).getStatusCode()));
+ }
+ executorService.shutdown();
+ for (Future<Integer> future : futures) {
+ assertEquals((int) future.get(), HttpStatus.SC_OK);
+ }
+ }
+ }
+ }
+
+ private void changeCrcInSegmentZKMetadata(String tableName, String
segmentFilePath) {
+ int startIdx = segmentFilePath.indexOf("mytable_");
+ int endIdx = segmentFilePath.indexOf(".tar.gz");
+ String segmentName = segmentFilePath.substring(startIdx, endIdx);
+ String tableNameWithType =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+ SegmentZKMetadata segmentZKMetadata =
_helixResourceManager.getSegmentZKMetadata(tableNameWithType, segmentName);
+ segmentZKMetadata.setCrc(111L);
+ _helixResourceManager.updateZkMetadata(tableNameWithType,
segmentZKMetadata);
+ }
+
+ @Override
+ protected long getCountStarResult() {
+ // all the data that was ingested from Kafka also got uploaded via the
controller's upload endpoint
+ return super.getCountStarResult() * 2;
+ }
+
@BeforeClass
@Override
public void setUp()
@@ -138,9 +226,11 @@ public class LLCRealtimeClusterIntegrationTest extends
RealtimeClusterIntegratio
String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
List<SegmentZKMetadata> segmentsZKMetadata =
ZKMetadataProvider.getSegmentsZKMetadata(_propertyStore,
realtimeTableName);
- for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
- assertEquals(segmentZKMetadata.getSizeThresholdToFlushSegment(),
- getRealtimeSegmentFlushSize() / getNumKafkaPartitions());
+ for (SegmentZKMetadata segMetadata : segmentsZKMetadata) {
+ if (segMetadata.getStatus() !=
CommonConstants.Segment.Realtime.Status.UPLOADED) {
+ assertEquals(segMetadata.getSizeThresholdToFlushSegment(),
+ getRealtimeSegmentFlushSize() / getNumKafkaPartitions());
+ }
}
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
index 6208c36c5b..dfa7a34d82 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
@@ -23,6 +23,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.Random;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -55,12 +57,17 @@ public class RealtimeClusterIntegrationTest extends
BaseClusterIntegrationTestSe
List<File> avroFiles = unpackAvroData(_tempDir);
// Create and upload the schema and table config
- addSchema(createSchema());
- addTableConfig(createRealtimeTableConfig(avroFiles.get(0)));
+ Schema schema = createSchema();
+ addSchema(schema);
+ TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0));
+ addTableConfig(tableConfig);
// Push data into Kafka
pushAvroIntoKafka(avroFiles);
+ // create segments and upload them to controller
+ createSegmentsAndUpload(avroFiles, schema, tableConfig);
+
// Set up the H2 connection
setUpH2Connection(avroFiles);
@@ -71,6 +78,11 @@ public class RealtimeClusterIntegrationTest extends
BaseClusterIntegrationTestSe
waitForAllDocsLoaded(600_000L);
}
+ protected void createSegmentsAndUpload(List<File> avroFile, Schema schema,
TableConfig tableConfig)
+ throws Exception {
+ // Do nothing. This is specific to LLC use cases for now.
+ }
+
@Override
protected void overrideServerConf(PinotConfiguration configuration) {
configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION,
false);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]