This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to
refs/heads/sharded_consumer_type_support_with_kinesis by this push:
new f39dbb7 Remove new partition groups creation in commit
f39dbb7 is described below
commit f39dbb71d1981b3265614fce17133b11675c7cfe
Author: Neha Pawar <[email protected]>
AuthorDate: Thu Jan 7 17:42:23 2021 -0800
Remove new partition groups creation in commit
---
.../realtime/PinotLLCRealtimeSegmentManager.java | 57 +++++++++-------------
.../realtime/LLRealtimeSegmentDataManager.java | 3 +-
2 files changed, 23 insertions(+), 37 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 8bf9cd0..5fd5c3f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -469,6 +469,8 @@ public class PinotLLCRealtimeSegmentManager {
private void commitSegmentMetadataInternal(String realtimeTableName,
CommittingSegmentDescriptor committingSegmentDescriptor) {
String committingSegmentName =
committingSegmentDescriptor.getSegmentName();
+ LLCSegmentName committingLLCSegment = new
LLCSegmentName(committingSegmentName);
+ int committingSegmentPartitionGroupId =
committingLLCSegment.getPartitionGroupId();
LOGGER.info("Committing segment metadata for segment: {}",
committingSegmentName);
TableConfig tableConfig = getTableConfig(realtimeTableName);
@@ -495,51 +497,40 @@ public class PinotLLCRealtimeSegmentManager {
// Step-2
- // Say we currently were consuming from 2 shards A, B. Of those, A is the
one committing.
+ // Example: Say we currently were consuming from 2 shards A, B. Of those,
A is the one committing.
- // get current partition groups - this gives current state of latest
segments for each partition [A - DONE], [B - IN_PROGRESS]
+ // Get current partition groups - this gives current state of latest
segments for each partition [A - DONE], [B - IN_PROGRESS]
List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
getCurrentPartitionGroupMetadataList(idealState);
PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
IngestionConfigUtils.getStreamConfigMap(tableConfig));
- // find new partition groups [A],[B],[C],[D] (assume A split into C D)
+ // Find new partition groups [A],[B],[C],[D] (assume A split into C D)
// If segment has consumed all of A, we will receive B,C,D
// If segment is still not reached last msg of A, we will receive A,B,C,D
+ // If there were no splits/merges we would receive A,B
List<PartitionGroupInfo> newPartitionGroupInfoList =
getPartitionGroupInfoList(streamConfig,
currentPartitionGroupMetadataList);
int numPartitions = newPartitionGroupInfoList.size();
- // create new segment metadata, only if PartitionGroupInfo was returned
for it in the newPartitionGroupInfoList
- Map<Integer, PartitionGroupMetadata> currentGroupIdToMetadata =
currentPartitionGroupMetadataList.stream().collect(
- Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p));
-
- List<String> newConsumingSegmentNames = new ArrayList<>();
+ // Only if committingSegment's partitionGroup is present in the
newPartitionGroupInfoList, we create new segment metadata
+ String newConsumingSegmentName = null;
String rawTableName =
TableNameBuilder.extractRawTableName(realtimeTableName);
long newSegmentCreationTimeMs = getCurrentTimeMs();
for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) {
- int newPartitionGroupId = partitionGroupInfo.getPartitionGroupId();
- PartitionGroupMetadata currentPartitionGroupMetadata =
currentGroupIdToMetadata.get(newPartitionGroupId);
- if (currentPartitionGroupMetadata == null) { // not present in current
state. New partition found.
- // make new segment
- // fixme: letting validation manager do this would be best, otherwise
we risk creating multiple CONSUMING segments
- String newLLCSegmentName =
- setupNewPartitionGroup(tableConfig, streamConfig,
partitionGroupInfo, newSegmentCreationTimeMs,
- instancePartitions, numPartitions, numReplicas);
- newConsumingSegmentNames.add(newLLCSegmentName);
- } else {
- LLCSegmentName committingLLCSegment = new
LLCSegmentName(committingSegmentName);
- // Update this only for committing segment. All other partitions
should get updated by their own commit call
- if (newPartitionGroupId == committingLLCSegment.getPartitionGroupId())
{
-
Preconditions.checkState(currentPartitionGroupMetadata.getStatus().equals(Status.DONE.toString()));
- LLCSegmentName newLLCSegmentName = new LLCSegmentName(rawTableName,
newPartitionGroupId,
- currentPartitionGroupMetadata.getSequenceNumber() + 1,
newSegmentCreationTimeMs);
- createNewSegmentZKMetadata(tableConfig, streamConfig,
newLLCSegmentName, newSegmentCreationTimeMs,
- committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, numPartitions, numReplicas);
- newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName());
- }
+ if (partitionGroupInfo.getPartitionGroupId() ==
committingSegmentPartitionGroupId) {
+ LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName,
committingSegmentPartitionGroupId,
+ committingLLCSegment.getSequenceNumber() + 1,
newSegmentCreationTimeMs);
+ createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment,
newSegmentCreationTimeMs,
+ committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, numPartitions, numReplicas);
+ newConsumingSegmentName = newLLCSegment.getSegmentName();
+ break;
}
}
+ // TODO: create new partition groups also here
+ // Cannot do it at the moment, because of the timestamp suffix on the
segment name.
+ // Different committing segments could create a CONSUMING segment for
same new partitionGroup, with different name
+
// Step-3
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
@@ -554,7 +545,7 @@ public class PinotLLCRealtimeSegmentManager {
Lock lock = _idealStateUpdateLocks[lockIndex];
try {
lock.lock();
- updateIdealStateOnSegmentCompletion(realtimeTableName,
committingSegmentName, newConsumingSegmentNames,
+ updateIdealStateOnSegmentCompletion(realtimeTableName,
committingSegmentName, newConsumingSegmentName,
segmentAssignment, instancePartitionsMap);
} finally {
lock.unlock();
@@ -846,7 +837,7 @@ public class PinotLLCRealtimeSegmentManager {
*/
@VisibleForTesting
void updateIdealStateOnSegmentCompletion(String realtimeTableName, String
committingSegmentName,
- List<String> newSegmentNames, SegmentAssignment segmentAssignment,
+ String newSegmentName, SegmentAssignment segmentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState
-> {
assert idealState != null;
@@ -863,11 +854,7 @@ public class PinotLLCRealtimeSegmentManager {
"Exceeded max segment completion time for segment " +
committingSegmentName);
}
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
committingSegmentName,
- null, segmentAssignment, instancePartitionsMap);
- for (String newSegmentName : newSegmentNames) {
-
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
null,
- newSegmentName, segmentAssignment, instancePartitionsMap);
- }
+ newSegmentName, segmentAssignment, instancePartitionsMap);
return idealState;
}, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f));
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 40b49b8..1569d8e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -307,11 +307,10 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
_stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT;
return true;
} else if (_endOfPartitionGroup) {
+ // FIXME: handle numDocsIndexed == 0 case
segmentLogger.info("Stopping consumption due to end of
partitionGroup reached nRows={} numRowsIndexed={}, numRowsConsumed={}",
_numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
_stopReason =
SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP;
- // fixme: what happens if reached endOfPartitionGroup but
numDocsIndexed == 0
- // If we decide to only setupNewPartitions via ValidationManager,
we don't need commit on endOfShard
return true;
}
return false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]