This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch 0.12.2
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.12.2 by this push:
     new a6eaaa5  Prevent KafkaSupervisor NPE in generateSequenceName (#5900) 
(#5902) (#5972)
a6eaaa5 is described below

commit a6eaaa5be95dc95bc6a1a391021e65afc94d97e1
Author: Jihoon Son <[email protected]>
AuthorDate: Mon Jul 9 11:22:03 2018 -0700

    Prevent KafkaSupervisor NPE in generateSequenceName (#5900) (#5902) (#5972)
    
    * Prevent KafkaSupervisor NPE in checkPendingCompletionTasks (#5900)
    
    * throw IAE in generateSequenceName if groupId not found in taskGroups
    * add null check in checkPendingCompletionTasks
    
    * Add warn log in checkPendingCompletionTasks
    
    * Address PR comments
    
    Replace warn with error log
    
    * Address PR comments
    
    * change signature of generateSequenceName to take a TaskGroup object 
instead of int
    
    * Address comments
    
    * Remove unnecessary method from KafkaSupervisorTest
---
 .../indexing/kafka/supervisor/KafkaSupervisor.java | 63 +++++++++++-----------
 .../kafka/supervisor/KafkaSupervisorTest.java      |  9 +---
 2 files changed, 35 insertions(+), 37 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 7cc79f6..5eb783c 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -143,7 +143,8 @@ public class KafkaSupervisor implements Supervisor
    * time, there should only be up to a maximum of [taskCount] 
actively-reading task groups (tracked in the [taskGroups]
    * map) + zero or more pending-completion task groups (tracked in 
[pendingCompletionTaskGroups]).
    */
-  private static class TaskGroup
+  @VisibleForTesting
+  static class TaskGroup
   {
     // This specifies the partitions and starting offsets for this task group. 
It is set on group creation from the data
     // in [partitionGroups] and never changes during the lifetime of this task 
group, which will live until a task in
@@ -758,8 +759,8 @@ public class KafkaSupervisor implements Supervisor
           
resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition
 -> {
             final int groupId = getTaskGroupIdForPartition(partition);
             killTaskGroupForPartitions(ImmutableSet.of(partition));
-            sequenceTaskGroup.remove(generateSequenceName(groupId));
-            taskGroups.remove(groupId);
+            final TaskGroup removedGroup = taskGroups.remove(groupId);
+            sequenceTaskGroup.remove(generateSequenceName(removedGroup));
             partitionGroups.get(groupId).replaceAll((partitionId, offset) -> 
NOT_SET);
           });
         } else {
@@ -867,12 +868,13 @@ public class KafkaSupervisor implements Supervisor
   }
 
   @VisibleForTesting
-  String generateSequenceName(int groupId)
+  String generateSequenceName(TaskGroup taskGroup)
   {
+    Preconditions.checkNotNull(taskGroup, "taskGroup cannot be null");
     return generateSequenceName(
-        taskGroups.get(groupId).partitionOffsets,
-        taskGroups.get(groupId).minimumMessageTime,
-        taskGroups.get(groupId).maximumMessageTime
+        taskGroup.partitionOffsets,
+        taskGroup.minimumMessageTime,
+        taskGroup.maximumMessageTime
     );
   }
 
@@ -1066,18 +1068,19 @@ public class KafkaSupervisor implements Supervisor
                             }
                             return false;
                           } else {
+                            final TaskGroup taskGroup = new TaskGroup(
+                                ImmutableMap.copyOf(
+                                    kafkaTask.getIOConfig()
+                                             .getStartPartitions()
+                                             .getPartitionOffsetMap()
+                                ), 
kafkaTask.getIOConfig().getMinimumMessageTime(),
+                                kafkaTask.getIOConfig().getMaximumMessageTime()
+                            );
                             if (taskGroups.putIfAbsent(
                                 taskGroupId,
-                                new TaskGroup(
-                                    ImmutableMap.copyOf(
-                                        kafkaTask.getIOConfig()
-                                                 .getStartPartitions()
-                                                 .getPartitionOffsetMap()
-                                    ), 
kafkaTask.getIOConfig().getMinimumMessageTime(),
-                                    
kafkaTask.getIOConfig().getMaximumMessageTime()
-                                )
+                                taskGroup
                             ) == null) {
-                              
sequenceTaskGroup.put(generateSequenceName(taskGroupId), 
taskGroups.get(taskGroupId));
+                              
sequenceTaskGroup.put(generateSequenceName(taskGroup), 
taskGroups.get(taskGroupId));
                               log.info("Created new task group [%d]", 
taskGroupId);
                             }
                             taskGroupsToVerify.add(taskGroupId);
@@ -1234,7 +1237,7 @@ public class KafkaSupervisor implements Supervisor
       // killing all tasks or no task left in the group ?
       // clear state about the taskgroup so that get latest offset information 
is fetched from metadata store
       log.warn("Clearing task group [%d] information as no valid tasks left 
the group", groupId);
-      sequenceTaskGroup.remove(generateSequenceName(groupId));
+      sequenceTaskGroup.remove(generateSequenceName(taskGroup));
       taskGroups.remove(groupId);
       partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
     }
@@ -1410,7 +1413,7 @@ public class KafkaSupervisor implements Supervisor
         partitionGroups.get(groupId).replaceAll((partition, offset) -> 
NOT_SET);
       }
 
-      sequenceTaskGroup.remove(generateSequenceName(groupId));
+      sequenceTaskGroup.remove(generateSequenceName(group));
       // remove this task group from the list of current task groups now that 
it has been handled
       taskGroups.remove(groupId);
     }
@@ -1612,8 +1615,7 @@ public class KafkaSupervisor implements Supervisor
 
           // reset partitions offsets for this task group so that they will be 
re-read from metadata storage
           partitionGroups.get(groupId).replaceAll((partition, offset) -> 
NOT_SET);
-          sequenceTaskGroup.remove(generateSequenceName(groupId));
-
+          sequenceTaskGroup.remove(generateSequenceName(group));
           // kill all the tasks in this pending completion group
           killTasksInGroup(group);
           // set a flag so the other pending completion groups for this set of 
partitions will also stop
@@ -1673,7 +1675,7 @@ public class KafkaSupervisor implements Supervisor
         // be recreated with the next set of offsets
         if (taskData.status.isSuccess()) {
           futures.add(stopTasksInGroup(taskGroup));
-          sequenceTaskGroup.remove(generateSequenceName(groupId));
+          sequenceTaskGroup.remove(generateSequenceName(taskGroup));
           iTaskGroups.remove();
           break;
         }
@@ -1706,15 +1708,16 @@ public class KafkaSupervisor implements Supervisor
             
DateTimes.nowUtc().plus(ioConfig.getTaskDuration()).plus(ioConfig.getEarlyMessageRejectionPeriod().get())
         ) : Optional.<DateTime>absent());
 
+        final TaskGroup taskGroup = new TaskGroup(
+            generateStartingOffsetsForPartitionGroup(groupId),
+            minimumMessageTime,
+            maximumMessageTime
+        );
         taskGroups.put(
             groupId,
-            new TaskGroup(
-                generateStartingOffsetsForPartitionGroup(groupId),
-                minimumMessageTime,
-                maximumMessageTime
-            )
+            taskGroup
         );
-        sequenceTaskGroup.put(generateSequenceName(groupId), 
taskGroups.get(groupId));
+        sequenceTaskGroup.put(generateSequenceName(taskGroup), 
taskGroups.get(groupId));
       }
     }
 
@@ -1749,8 +1752,8 @@ public class KafkaSupervisor implements Supervisor
     for (Integer partition : startPartitions.keySet()) {
       endPartitions.put(partition, Long.MAX_VALUE);
     }
-
-    String sequenceName = generateSequenceName(groupId);
+    TaskGroup group = taskGroups.get(groupId);
+    String sequenceName = generateSequenceName(group);
 
     Map<String, String> consumerProperties = 
Maps.newHashMap(ioConfig.getConsumerProperties());
     DateTime minimumMessageTime = 
taskGroups.get(groupId).minimumMessageTime.orNull();
@@ -1911,7 +1914,7 @@ public class KafkaSupervisor implements Supervisor
 
     String taskSequenceName = ((KafkaIndexTask) 
taskOptional.get()).getIOConfig().getBaseSequenceName();
     if (taskGroups.get(taskGroupId) != null) {
-      return generateSequenceName(taskGroupId).equals(taskSequenceName);
+      return 
generateSequenceName(taskGroups.get(taskGroupId)).equals(taskSequenceName);
     } else {
       return generateSequenceName(
           ((KafkaIndexTask) taskOptional.get()).getIOConfig()
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 29041e2..3c2b198 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -2088,19 +2088,14 @@ public class KafkaSupervisorTest extends EasyMockSupport
     }
 
     @Override
-    protected String generateSequenceName(int groupId)
-    {
-      return StringUtils.format("sequenceName-%d", groupId);
-    }
-
-    @Override
     protected String generateSequenceName(
         Map<Integer, Long> startPartitions,
         Optional<DateTime> minimumMessageTime,
         Optional<DateTime> maximumMessageTime
     )
     {
-      return 
generateSequenceName(getTaskGroupIdForPartition(startPartitions.keySet().iterator().next()));
+      final int groupId = 
getTaskGroupIdForPartition(startPartitions.keySet().iterator().next());
+      return StringUtils.format("sequenceName-%d", groupId);
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to