jihoonson closed pull request #6086: Fix IllegalArgumentException in TaskLockBox.syncFromStorage() when updating from 0.12.x to 0.12.2 URL: https://github.com/apache/incubator-druid/pull/6086
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index d3a58240d0f..2f8f224289f 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -165,9 +165,9 @@ private static String makeTaskId(String dataSource, int randomBits) } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_REALTIME_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java index e2d20281e04..2ad8fae335f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java @@ -37,9 +37,22 @@ private final String dataSource; private final Interval interval; private final String version; - private final int priority; + private final Integer priority; private final boolean revoked; + public static TaskLock withPriority(TaskLock lock, int priority) + { + return new TaskLock( + lock.type, + lock.getGroupId(), + lock.getDataSource(), + lock.getInterval(), + lock.getVersion(), + priority, + lock.isRevoked() + ); + } + @JsonCreator public TaskLock( @JsonProperty("type") @Nullable TaskLockType type, // nullable for backward compatibility @@ -47,7 +60,7 @@ public TaskLock( @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval, @JsonProperty("version") String version, - @JsonProperty("priority") int priority, + @JsonProperty("priority") @Nullable Integer priority, @JsonProperty("revoked") boolean revoked ) { @@ -116,11 +129,17 @@ public String getVersion() } @JsonProperty - public int getPriority() + @Nullable + public Integer getPriority() { return priority; } + public int getNonNullPriority() + { + return Preconditions.checkNotNull(priority, "priority"); + } + @JsonProperty public boolean isRevoked() { @@ -139,7 +158,7 @@ public boolean equals(Object o) this.dataSource.equals(that.dataSource) && this.interval.equals(that.interval) && this.version.equals(that.version) && - this.priority == that.priority && + Objects.equal(this.priority, that.priority) && this.revoked == that.revoked; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 5dcc35bde4a..fa930143f6a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -202,9 +202,9 @@ public AppenderatorDriverRealtimeIndexTask( } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_REALTIME_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java index 05c9c578a0a..f81999f15f2 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java @@ -183,9 +183,9 @@ public String getType() } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_MERGE_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY); } @VisibleForTesting diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index a49239abcc7..4a72114be82 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -171,9 +171,9 @@ public HadoopIndexTask( } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index ac1c94dae1c..f91eaf9a27b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -241,9 +241,9 @@ public IndexTask( } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java index 675f27a64f6..b1f386c0bc9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java @@ -133,9 +133,9 @@ public boolean apply(@Nullable DataSegment segment) } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_MERGE_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java index fab5a9d6b06..e704d6aa066 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java @@ -152,9 +152,9 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); } public static NoopTask create() diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 5ea6c081020..37a7ccf1139 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -161,9 +161,9 @@ public RealtimeIndexTask( } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_REALTIME_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index 8848f7fa8a5..727c35e0854 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -98,14 +98,6 @@ default int getPriority() return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_TASK_PRIORITY); } - /** - * Returns the default task priority. It can vary depending on the task type. - */ - default int getDefaultPriority() - { - return Tasks.DEFAULT_TASK_PRIORITY; - } - /** * Returns a {@link TaskResource} for this task. Task resources define specific worker requirements a task may * require. diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java index 93f62b70eaf..c3cb42823c4 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java @@ -130,17 +130,23 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right) final TaskLock savedTaskLock = taskAndLock.rhs; if (savedTaskLock.getInterval().toDurationMillis() <= 0) { // "Impossible", but you never know what crazy stuff can be restored from storage. - log.warn("WTF?! Got lock with empty interval for task: %s", task.getId()); + log.warn("WTF?! Got lock[%s] with empty interval for task: %s", savedTaskLock, task.getId()); continue; } - final TaskLockPosse taskLockPosse = createOrFindLockPosse(task, savedTaskLock); + // Create a new taskLock if it doesn't have a proper priority, + // so that every taskLock in memory has the priority. + final TaskLock savedTaskLockWithPriority = savedTaskLock.getPriority() == null + ? TaskLock.withPriority(savedTaskLock, task.getPriority()) + : savedTaskLock; + + final TaskLockPosse taskLockPosse = createOrFindLockPosse(task, savedTaskLockWithPriority); if (taskLockPosse != null) { taskLockPosse.addTask(task); final TaskLock taskLock = taskLockPosse.getTaskLock(); - if (savedTaskLock.getVersion().equals(taskLock.getVersion())) { + if (savedTaskLockWithPriority.getVersion().equals(taskLock.getVersion())) { taskLockCount++; log.info( "Reacquired lock[%s] for task: %s", @@ -151,8 +157,8 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right) taskLockCount++; log.info( "Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task: %s", - savedTaskLock.getInterval(), - savedTaskLock.getVersion(), + savedTaskLockWithPriority.getInterval(), + savedTaskLockWithPriority.getVersion(), taskLock.getVersion(), task.getId() ); @@ -160,8 +166,8 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right) } else { throw new ISE( "Could not reacquire lock on interval[%s] version[%s] for task: %s", - savedTaskLock.getInterval(), - savedTaskLock.getVersion(), + savedTaskLockWithPriority.getInterval(), + savedTaskLockWithPriority.getVersion(), task.getId() ); } @@ -382,11 +388,14 @@ private TaskLockPosse createOrFindLockPosse(Task task, TaskLock taskLock) taskLock.getDataSource(), task.getDataSource() ); + final int taskPriority = task.getPriority(); + final int lockPriority = taskLock.getNonNullPriority(); + Preconditions.checkArgument( - task.getPriority() == taskLock.getPriority(), + lockPriority == taskPriority, "lock priority[%s] is different from task priority[%s]", - taskLock.getPriority(), - task.getPriority() + lockPriority, + taskPriority ); return createOrFindLockPosse( @@ -396,7 +405,7 @@ private TaskLockPosse createOrFindLockPosse(Task task, TaskLock taskLock) taskLock.getDataSource(), taskLock.getInterval(), taskLock.getVersion(), - taskLock.getPriority(), + taskPriority, taskLock.isRevoked() ); } @@ -925,7 +934,7 @@ private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLoc private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority) { final TaskLock existingLock = lockPosse.getTaskLock(); - return existingLock.isRevoked() || existingLock.getPriority() < tryLockPriority; + return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority; } private TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task, Interval interval) @@ -986,7 +995,7 @@ TaskLock getTaskLock() boolean addTask(Task task) { Preconditions.checkArgument(taskLock.getGroupId().equals(task.getGroupId())); - Preconditions.checkArgument(taskLock.getPriority() == task.getPriority()); + Preconditions.checkArgument(taskLock.getNonNullPriority() == task.getPriority()); return taskIds.add(task.getId()); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index f3b7876523e..8b8d5e74db6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -44,7 +44,6 @@ import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionHolder; import io.druid.indexing.common.task.Task; -import io.druid.indexing.common.task.Tasks; import io.druid.indexing.overlord.IndexerMetadataStorageAdapter; import io.druid.indexing.overlord.TaskMaster; import io.druid.indexing.overlord.TaskQueue; @@ -176,11 +175,6 @@ public Response taskPost( public Response apply(TaskQueue taskQueue) { try { - // Set default priority if needed - final Integer priority = task.getContextValue(Tasks.PRIORITY_KEY); - if (priority == null) { - task.addToContext(Tasks.PRIORITY_KEY, task.getDefaultPriority()); - } taskQueue.add(task); return Response.ok(ImmutableMap.of("task", task.getId())).build(); } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java index f2dceb0ee30..442f304dc27 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java @@ -19,6 +19,9 @@ package io.druid.indexing.overlord; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Iterables; import io.druid.indexer.TaskStatus; @@ -261,6 +264,84 @@ public void testSyncFromStorage() throws EntryExistsException Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage); } + @Test + public void testSyncFromStorageWithMissingTaskLockPriority() throws EntryExistsException + { + final Task task = NoopTask.create(); + taskStorage.insert(task, TaskStatus.running(task.getId())); + taskStorage.addLock( + task.getId(), + new TaskLockWithoutPriority(task.getGroupId(), task.getDataSource(), Intervals.of("2017/2018"), "v1") + ); + + final List<TaskLock> beforeLocksInStorage = taskStorage.getActiveTasks().stream() + .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) + .collect(Collectors.toList()); + + final TaskLockbox lockbox = new TaskLockbox(taskStorage); + lockbox.syncFromStorage(); + + final List<TaskLock> afterLocksInStorage = taskStorage.getActiveTasks().stream() + .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) + .collect(Collectors.toList()); + + Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage); + } + + @Test + public void testSyncFromStorageWithMissingTaskPriority() throws EntryExistsException + { + final Task task = NoopTask.create(); + taskStorage.insert(task, TaskStatus.running(task.getId())); + taskStorage.addLock( + task.getId(), + new TaskLock( + TaskLockType.EXCLUSIVE, + task.getGroupId(), + task.getDataSource(), + Intervals.of("2017/2018"), + "v1", + task.getPriority() + ) + ); + + final List<TaskLock> beforeLocksInStorage = taskStorage.getActiveTasks().stream() + .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) + .collect(Collectors.toList()); + + final TaskLockbox lockbox = new TaskLockbox(taskStorage); + lockbox.syncFromStorage(); + + final List<TaskLock> afterLocksInStorage = taskStorage.getActiveTasks().stream() + .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) + .collect(Collectors.toList()); + + Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage); + } + + @Test + public void testSyncFromStorageWithInvalidPriority() throws EntryExistsException + { + final Task task = NoopTask.create(); + taskStorage.insert(task, TaskStatus.running(task.getId())); + taskStorage.addLock( + task.getId(), + new TaskLock( + TaskLockType.EXCLUSIVE, + task.getGroupId(), + task.getDataSource(), + Intervals.of("2017/2018"), + "v1", + 10 + ) + ); + + final TaskLockbox lockbox = new TaskLockbox(taskStorage); + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("lock priority[10] is different from task priority[50]"); + lockbox.syncFromStorage(); + } + @Test public void testRevokedLockSyncFromStorage() throws EntryExistsException { @@ -504,4 +585,67 @@ public void testUnlock() throws EntryExistsException .flatMap(task -> taskStorage.getLocks(task.getId()).stream()) .collect(Collectors.toSet()); } + + private static class TaskLockWithoutPriority extends TaskLock + { + @JsonCreator + TaskLockWithoutPriority( + String groupId, + String dataSource, + Interval interval, + String version + ) + { + super(null, groupId, dataSource, interval, version, 0, false); + } + + @Override + @JsonProperty + public TaskLockType getType() + { + return super.getType(); + } + + @Override + @JsonProperty + public String getGroupId() + { + return super.getGroupId(); + } + + @Override + @JsonProperty + public String getDataSource() + { + return super.getDataSource(); + } + + @Override + @JsonProperty + public Interval getInterval() + { + return super.getInterval(); + } + + @Override + @JsonProperty + public String getVersion() + { + return super.getVersion(); + } + + @JsonIgnore + @Override + public Integer getPriority() + { + return super.getPriority(); + } + + @JsonIgnore + @Override + public boolean isRevoked() + { + return super.isRevoked(); + } + } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java index f947bb21dec..4c08cce4bf2 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java @@ -31,13 +31,12 @@ import io.druid.discovery.DruidLeaderSelector; import io.druid.indexer.TaskLocation; import io.druid.indexer.TaskState; -import io.druid.indexer.TaskStatusPlus; import io.druid.indexer.TaskStatus; +import io.druid.indexer.TaskStatusPlus; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.Task; -import io.druid.indexing.common.task.Tasks; import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.IndexerMetadataStorageAdapter; import io.druid.indexing.overlord.TaskLockbox; @@ -81,7 +80,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -233,12 +231,6 @@ public void testOverlordRun() throws Exception Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("task", taskId_0), response.getEntity()); - final Map<String, Object> context = task_0.getContext(); - Assert.assertEquals(1, context.size()); - final Integer priority = (Integer) context.get(Tasks.PRIORITY_KEY); - Assert.assertNotNull(priority); - Assert.assertEquals(Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY, priority.intValue()); - // Duplicate task - should fail response = overlordResource.taskPost(task_0, req); Assert.assertEquals(400, response.getStatus()); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org