jihoonson commented on a change in pull request #9610: Fix NPE in 
RemoteTaskRunner event handler causes JVM shutdown
URL: https://github.com/apache/druid/pull/9610#discussion_r404391313
 
 

 ##########
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
 ##########
 @@ -969,116 +970,129 @@ private boolean cancelWorkerCleanup(String workerHost)
       );
 
       // Add status listener to the watcher for status changes
-      zkWorker.addListener(
-          (client, event) -> {
-            final String taskId;
-            final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
-            synchronized (statusLock) {
-              try {
-                switch (event.getType()) {
-                  case CHILD_ADDED:
-                  case CHILD_UPDATED:
-                    taskId = 
ZKPaths.getNodeFromPath(event.getData().getPath());
-                    final TaskAnnouncement announcement = jsonMapper.readValue(
-                        event.getData().getData(), TaskAnnouncement.class
-                    );
-
-                    log.info(
-                        "Worker[%s] wrote %s status for task [%s] on [%s]",
-                        zkWorker.getWorker().getHost(),
-                        announcement.getTaskStatus().getStatusCode(),
-                        taskId,
-                        announcement.getTaskLocation()
-                    );
-
-                    // Synchronizing state with ZK
-                    statusLock.notifyAll();
-
-                    final RemoteTaskRunnerWorkItem tmp;
-                    if ((tmp = runningTasks.get(taskId)) != null) {
-                      taskRunnerWorkItem = tmp;
-                    } else {
-                      final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = 
new RemoteTaskRunnerWorkItem(
-                          taskId,
-                          announcement.getTaskType(),
-                          zkWorker.getWorker(),
-                          TaskLocation.unknown(),
-                          announcement.getTaskDataSource()
-                      );
-                      final RemoteTaskRunnerWorkItem existingItem = 
runningTasks.putIfAbsent(
-                          taskId,
-                          newTaskRunnerWorkItem
-                      );
-                      if (existingItem == null) {
-                        log.warn(
-                            "Worker[%s] announced a status for a task I didn't 
know about, adding to runningTasks: %s",
-                            zkWorker.getWorker().getHost(),
-                            taskId
-                        );
-                        taskRunnerWorkItem = newTaskRunnerWorkItem;
-                      } else {
-                        taskRunnerWorkItem = existingItem;
-                      }
-                    }
-
-                    if 
(!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
-                      
taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
-                      TaskRunnerUtils.notifyLocationChanged(listeners, taskId, 
announcement.getTaskLocation());
-                    }
+      zkWorker.addListener(getStatusListener(worker, zkWorker, retVal));
+      zkWorker.start();
+      return retVal;
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
 
-                    if (announcement.getTaskStatus().isComplete()) {
-                      taskComplete(taskRunnerWorkItem, zkWorker, 
announcement.getTaskStatus());
-                      runPendingTasks();
-                    }
-                    break;
-                  case CHILD_REMOVED:
-                    taskId = 
ZKPaths.getNodeFromPath(event.getData().getPath());
-                    taskRunnerWorkItem = runningTasks.remove(taskId);
-                    if (taskRunnerWorkItem != null) {
-                      log.info("Task[%s] just disappeared!", taskId);
-                      taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
-                      TaskRunnerUtils.notifyStatusChanged(listeners, taskId, 
TaskStatus.failure(taskId));
-                    } else {
-                      log.info("Task[%s] went bye bye.", taskId);
-                    }
-                    break;
-                  case INITIALIZED:
-                    if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == 
null) {
-                      retVal.set(zkWorker);
-                    } else {
-                      final String message = StringUtils.format(
-                          "WTF?! Tried to add already-existing worker[%s]",
-                          worker.getHost()
-                      );
-                      log.makeAlert(message)
-                         .addData("workerHost", worker.getHost())
-                         .addData("workerIp", worker.getIp())
-                         .emit();
-                      retVal.setException(new IllegalStateException(message));
-                    }
-                    runPendingTasks();
-                    break;
-                  case CONNECTION_SUSPENDED:
-                  case CONNECTION_RECONNECTED:
-                  case CONNECTION_LOST:
-                    // do nothing
+  @VisibleForTesting
+  PathChildrenCacheListener getStatusListener(final Worker worker, final 
ZkWorker zkWorker, final SettableFuture<ZkWorker> retVal)
+  {
+    return (client, event) -> {
+      final String taskId;
+      final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
+      synchronized (statusLock) {
+        try {
+          switch (event.getType()) { // lgtm 
[java/dereferenced-value-may-be-null]
+            case CHILD_ADDED:
+            case CHILD_UPDATED:
+              taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); // 
lgtm [java/dereferenced-value-may-be-null]
+              final TaskAnnouncement announcement = jsonMapper.readValue(
+                  event.getData().getData(), TaskAnnouncement.class // lgtm 
[java/dereferenced-value-may-be-null]
+              );
+
+              log.info(
+                  "Worker[%s] wrote %s status for task [%s] on [%s]",
+                  zkWorker.getWorker().getHost(),
+                  announcement.getTaskStatus().getStatusCode(),
+                  taskId,
+                  announcement.getTaskLocation()
+              );
+
+              // Synchronizing state with ZK
+              statusLock.notifyAll();
+
+              final RemoteTaskRunnerWorkItem tmp;
+              if ((tmp = runningTasks.get(taskId)) != null) {
+                taskRunnerWorkItem = tmp;
+              } else {
+                final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new 
RemoteTaskRunnerWorkItem(
+                    taskId,
+                    announcement.getTaskType(),
+                    zkWorker.getWorker(),
+                    TaskLocation.unknown(),
+                    announcement.getTaskDataSource()
+                );
+                final RemoteTaskRunnerWorkItem existingItem = 
runningTasks.putIfAbsent(
+                    taskId,
+                    newTaskRunnerWorkItem
+                );
+                if (existingItem == null) {
+                  log.warn(
+                      "Worker[%s] announced a status for a task I didn't know 
about, adding to runningTasks: %s",
+                      zkWorker.getWorker().getHost(),
+                      taskId
+                  );
+                  taskRunnerWorkItem = newTaskRunnerWorkItem;
+                } else {
+                  taskRunnerWorkItem = existingItem;
                 }
               }
-              catch (Exception e) {
-                log.makeAlert(e, "Failed to handle new worker status")
-                   .addData("worker", zkWorker.getWorker().getHost())
-                   .addData("znode", event.getData().getPath())
+
+              if 
(!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
+                taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
+                TaskRunnerUtils.notifyLocationChanged(listeners, taskId, 
announcement.getTaskLocation());
+              }
+
+              if (announcement.getTaskStatus().isComplete()) {
+                taskComplete(taskRunnerWorkItem, zkWorker, 
announcement.getTaskStatus());
+                runPendingTasks();
+              }
+              break;
+            case CHILD_REMOVED:
+              taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); // 
lgtm [java/dereferenced-value-may-be-null]
+              taskRunnerWorkItem = runningTasks.remove(taskId);
+              if (taskRunnerWorkItem != null) {
+                log.info("Task[%s] just disappeared!", taskId);
+                taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
+                TaskRunnerUtils.notifyStatusChanged(listeners, taskId, 
TaskStatus.failure(taskId));
+              } else {
+                log.info("Task[%s] went bye bye.", taskId);
+              }
+              break;
+            case INITIALIZED:
+              if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
+                retVal.set(zkWorker);
+              } else {
+                final String message = StringUtils.format(
+                    "This should not happen...tried to add already-existing 
worker[%s]",
+                    worker.getHost()
+                );
+                log.makeAlert(message)
+                   .addData("workerHost", worker.getHost())
+                   .addData("workerIp", worker.getIp())
                    .emit();
+                retVal.setException(new IllegalStateException(message));
               }
+              runPendingTasks();
+              break;
+            case CONNECTION_SUSPENDED:
+            case CONNECTION_RECONNECTED:
+            case CONNECTION_LOST:
+              // do nothing
+          }
+        }
+        catch (Exception e) {
+          String znode = null;
+          String eventType = null;
+          if (event != null) {
 
 Review comment:
   I don't think `event` can ever be null. Check out callers of 
`PathChildrenCache.callListeners()`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to