This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch 0.12.2
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.12.2 by this push:
new a6eaaa5 Prevent KafkaSupervisor NPE in generateSequenceName (#5900)
(#5902) (#5972)
a6eaaa5 is described below
commit a6eaaa5be95dc95bc6a1a391021e65afc94d97e1
Author: Jihoon Son <[email protected]>
AuthorDate: Mon Jul 9 11:22:03 2018 -0700
Prevent KafkaSupervisor NPE in generateSequenceName (#5900) (#5902) (#5972)
* Prevent KafkaSupervisor NPE in checkPendingCompletionTasks (#5900)
* throw IAE in generateSequenceName if groupId not found in taskGroups
* add null check in checkPendingCompletionTasks
* Add warn log in checkPendingCompletionTasks
* Address PR comments
Replace warn with error log
* Address PR comments
* change signature of generateSequenceName to take a TaskGroup object
instead of int
* Address comments
* Remove unnecessary method from KafkaSupervisorTest
---
.../indexing/kafka/supervisor/KafkaSupervisor.java | 63 +++++++++++-----------
.../kafka/supervisor/KafkaSupervisorTest.java | 9 +---
2 files changed, 35 insertions(+), 37 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 7cc79f6..5eb783c 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -143,7 +143,8 @@ public class KafkaSupervisor implements Supervisor
* time, there should only be up to a maximum of [taskCount]
actively-reading task groups (tracked in the [taskGroups]
* map) + zero or more pending-completion task groups (tracked in
[pendingCompletionTaskGroups]).
*/
- private static class TaskGroup
+ @VisibleForTesting
+ static class TaskGroup
{
// This specifies the partitions and starting offsets for this task group.
It is set on group creation from the data
// in [partitionGroups] and never changes during the lifetime of this task
group, which will live until a task in
@@ -758,8 +759,8 @@ public class KafkaSupervisor implements Supervisor
resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition
-> {
final int groupId = getTaskGroupIdForPartition(partition);
killTaskGroupForPartitions(ImmutableSet.of(partition));
- sequenceTaskGroup.remove(generateSequenceName(groupId));
- taskGroups.remove(groupId);
+ final TaskGroup removedGroup = taskGroups.remove(groupId);
+ sequenceTaskGroup.remove(generateSequenceName(removedGroup));
partitionGroups.get(groupId).replaceAll((partitionId, offset) ->
NOT_SET);
});
} else {
@@ -867,12 +868,13 @@ public class KafkaSupervisor implements Supervisor
}
@VisibleForTesting
- String generateSequenceName(int groupId)
+ String generateSequenceName(TaskGroup taskGroup)
{
+ Preconditions.checkNotNull(taskGroup, "taskGroup cannot be null");
return generateSequenceName(
- taskGroups.get(groupId).partitionOffsets,
- taskGroups.get(groupId).minimumMessageTime,
- taskGroups.get(groupId).maximumMessageTime
+ taskGroup.partitionOffsets,
+ taskGroup.minimumMessageTime,
+ taskGroup.maximumMessageTime
);
}
@@ -1066,18 +1068,19 @@ public class KafkaSupervisor implements Supervisor
}
return false;
} else {
+ final TaskGroup taskGroup = new TaskGroup(
+ ImmutableMap.copyOf(
+ kafkaTask.getIOConfig()
+ .getStartPartitions()
+ .getPartitionOffsetMap()
+ ),
kafkaTask.getIOConfig().getMinimumMessageTime(),
+ kafkaTask.getIOConfig().getMaximumMessageTime()
+ );
if (taskGroups.putIfAbsent(
taskGroupId,
- new TaskGroup(
- ImmutableMap.copyOf(
- kafkaTask.getIOConfig()
- .getStartPartitions()
- .getPartitionOffsetMap()
- ),
kafkaTask.getIOConfig().getMinimumMessageTime(),
-
kafkaTask.getIOConfig().getMaximumMessageTime()
- )
+ taskGroup
) == null) {
-
sequenceTaskGroup.put(generateSequenceName(taskGroupId),
taskGroups.get(taskGroupId));
+
sequenceTaskGroup.put(generateSequenceName(taskGroup),
taskGroups.get(taskGroupId));
log.info("Created new task group [%d]",
taskGroupId);
}
taskGroupsToVerify.add(taskGroupId);
@@ -1234,7 +1237,7 @@ public class KafkaSupervisor implements Supervisor
// killing all tasks or no task left in the group ?
// clear state about the taskgroup so that get latest offset information
is fetched from metadata store
log.warn("Clearing task group [%d] information as no valid tasks left
the group", groupId);
- sequenceTaskGroup.remove(generateSequenceName(groupId));
+ sequenceTaskGroup.remove(generateSequenceName(taskGroup));
taskGroups.remove(groupId);
partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
}
@@ -1410,7 +1413,7 @@ public class KafkaSupervisor implements Supervisor
partitionGroups.get(groupId).replaceAll((partition, offset) ->
NOT_SET);
}
- sequenceTaskGroup.remove(generateSequenceName(groupId));
+ sequenceTaskGroup.remove(generateSequenceName(group));
// remove this task group from the list of current task groups now that
it has been handled
taskGroups.remove(groupId);
}
@@ -1612,8 +1615,7 @@ public class KafkaSupervisor implements Supervisor
// reset partitions offsets for this task group so that they will be
re-read from metadata storage
partitionGroups.get(groupId).replaceAll((partition, offset) ->
NOT_SET);
- sequenceTaskGroup.remove(generateSequenceName(groupId));
-
+ sequenceTaskGroup.remove(generateSequenceName(group));
// kill all the tasks in this pending completion group
killTasksInGroup(group);
// set a flag so the other pending completion groups for this set of
partitions will also stop
@@ -1673,7 +1675,7 @@ public class KafkaSupervisor implements Supervisor
// be recreated with the next set of offsets
if (taskData.status.isSuccess()) {
futures.add(stopTasksInGroup(taskGroup));
- sequenceTaskGroup.remove(generateSequenceName(groupId));
+ sequenceTaskGroup.remove(generateSequenceName(taskGroup));
iTaskGroups.remove();
break;
}
@@ -1706,15 +1708,16 @@ public class KafkaSupervisor implements Supervisor
DateTimes.nowUtc().plus(ioConfig.getTaskDuration()).plus(ioConfig.getEarlyMessageRejectionPeriod().get())
) : Optional.<DateTime>absent());
+ final TaskGroup taskGroup = new TaskGroup(
+ generateStartingOffsetsForPartitionGroup(groupId),
+ minimumMessageTime,
+ maximumMessageTime
+ );
taskGroups.put(
groupId,
- new TaskGroup(
- generateStartingOffsetsForPartitionGroup(groupId),
- minimumMessageTime,
- maximumMessageTime
- )
+ taskGroup
);
- sequenceTaskGroup.put(generateSequenceName(groupId),
taskGroups.get(groupId));
+ sequenceTaskGroup.put(generateSequenceName(taskGroup),
taskGroups.get(groupId));
}
}
@@ -1749,8 +1752,8 @@ public class KafkaSupervisor implements Supervisor
for (Integer partition : startPartitions.keySet()) {
endPartitions.put(partition, Long.MAX_VALUE);
}
-
- String sequenceName = generateSequenceName(groupId);
+ TaskGroup group = taskGroups.get(groupId);
+ String sequenceName = generateSequenceName(group);
Map<String, String> consumerProperties =
Maps.newHashMap(ioConfig.getConsumerProperties());
DateTime minimumMessageTime =
taskGroups.get(groupId).minimumMessageTime.orNull();
@@ -1911,7 +1914,7 @@ public class KafkaSupervisor implements Supervisor
String taskSequenceName = ((KafkaIndexTask)
taskOptional.get()).getIOConfig().getBaseSequenceName();
if (taskGroups.get(taskGroupId) != null) {
- return generateSequenceName(taskGroupId).equals(taskSequenceName);
+ return
generateSequenceName(taskGroups.get(taskGroupId)).equals(taskSequenceName);
} else {
return generateSequenceName(
((KafkaIndexTask) taskOptional.get()).getIOConfig()
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 29041e2..3c2b198 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -2088,19 +2088,14 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
@Override
- protected String generateSequenceName(int groupId)
- {
- return StringUtils.format("sequenceName-%d", groupId);
- }
-
- @Override
protected String generateSequenceName(
Map<Integer, Long> startPartitions,
Optional<DateTime> minimumMessageTime,
Optional<DateTime> maximumMessageTime
)
{
- return
generateSequenceName(getTaskGroupIdForPartition(startPartitions.keySet().iterator().next()));
+ final int groupId =
getTaskGroupIdForPartition(startPartitions.keySet().iterator().next());
+ return StringUtils.format("sequenceName-%d", groupId);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]