abhishekagarwal87 commented on code in PR #15682:
URL: https://github.com/apache/druid/pull/15682#discussion_r1469609669
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -549,60 +514,14 @@ private void acquireTaskLock(SegmentAllocationHolder
holder, boolean isTimeChunk
}
}
- /**
- * Adds the task to the found lock posse if not already added and updates
- * in the metadata store. Marks the segment allocation as failed if the
update
- * did not succeed.
- */
- private void addTaskAndPersistLocks(SegmentAllocationHolder holder, boolean
isTimeChunkLock)
- {
- final Task task = holder.task;
- final TaskLock acquiredLock = holder.acquiredLock;
-
- if (holder.taskLockPosse.addTask(task)) {
- log.info("Added task [%s] to TaskLock [%s]", task.getId(), acquiredLock);
-
- // This can also be batched later
- boolean success = updateLockInStorage(task, acquiredLock);
- if (success) {
- holder.markSucceeded();
- } else {
- final Integer partitionId = isTimeChunkLock
- ? null : ((SegmentLock)
acquiredLock).getPartitionId();
- unlock(task, holder.lockRequestInterval, partitionId);
- holder.markFailed("Could not update task lock in metadata store.");
- }
- } else {
- log.info("Task [%s] already present in TaskLock [%s]", task.getId(),
acquiredLock.getGroupId());
- holder.markSucceeded();
- }
- }
-
- private boolean updateLockInStorage(Task task, TaskLock taskLock)
- {
- try {
- taskStorage.addLock(task.getId(), taskLock);
- return true;
- }
- catch (Exception e) {
- log.makeAlert("Failed to persist lock in storage")
- .addData("task", task.getId())
- .addData("dataSource", taskLock.getDataSource())
- .addData("interval", taskLock.getInterval())
- .addData("version", taskLock.getVersion())
- .emit();
-
- return false;
- }
- }
-
- private TaskLockPosse createOrFindLockPosse(LockRequest request)
+ private TaskLockPosse createOrFindLockPosse(LockRequest request, Task task,
boolean persist)
Review Comment:
Please add some comments to describe the logic.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -499,11 +468,7 @@ public List<SegmentAllocateResult> allocateSegments(
allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck,
holderList.getPending());
holderList.getPending().forEach(holder -> acquireTaskLock(holder,
false));
}
- holderList.getPending().forEach(holder -> addTaskAndPersistLocks(holder,
isTimeChunkLock));
- }
- catch (Exception e) {
- holderList.clearStaleLocks(this);
- throw e;
+ holderList.getPending().forEach(SegmentAllocationHolder::markSucceeded);
Review Comment:
why is this changed?
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -499,11 +468,7 @@ public List<SegmentAllocateResult> allocateSegments(
allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck,
holderList.getPending());
holderList.getPending().forEach(holder -> acquireTaskLock(holder,
false));
}
- holderList.getPending().forEach(holder -> addTaskAndPersistLocks(holder,
isTimeChunkLock));
- }
- catch (Exception e) {
- holderList.clearStaleLocks(this);
- throw e;
+ holderList.getPending().forEach(SegmentAllocationHolder::markSucceeded);
Review Comment:
why is this changed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]