jihoonson commented on a change in pull request #9610: Fix NPE in RemoteTaskRunner event handler causes JVM shutdown URL: https://github.com/apache/druid/pull/9610#discussion_r404391313
########## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java ########## @@ -969,116 +970,129 @@ private boolean cancelWorkerCleanup(String workerHost) ); // Add status listener to the watcher for status changes - zkWorker.addListener( - (client, event) -> { - final String taskId; - final RemoteTaskRunnerWorkItem taskRunnerWorkItem; - synchronized (statusLock) { - try { - switch (event.getType()) { - case CHILD_ADDED: - case CHILD_UPDATED: - taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); - final TaskAnnouncement announcement = jsonMapper.readValue( - event.getData().getData(), TaskAnnouncement.class - ); - - log.info( - "Worker[%s] wrote %s status for task [%s] on [%s]", - zkWorker.getWorker().getHost(), - announcement.getTaskStatus().getStatusCode(), - taskId, - announcement.getTaskLocation() - ); - - // Synchronizing state with ZK - statusLock.notifyAll(); - - final RemoteTaskRunnerWorkItem tmp; - if ((tmp = runningTasks.get(taskId)) != null) { - taskRunnerWorkItem = tmp; - } else { - final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem( - taskId, - announcement.getTaskType(), - zkWorker.getWorker(), - TaskLocation.unknown(), - announcement.getTaskDataSource() - ); - final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent( - taskId, - newTaskRunnerWorkItem - ); - if (existingItem == null) { - log.warn( - "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s", - zkWorker.getWorker().getHost(), - taskId - ); - taskRunnerWorkItem = newTaskRunnerWorkItem; - } else { - taskRunnerWorkItem = existingItem; - } - } - - if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) { - taskRunnerWorkItem.setLocation(announcement.getTaskLocation()); - TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation()); - } + zkWorker.addListener(getStatusListener(worker, zkWorker, retVal)); + zkWorker.start(); + return retVal; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } - if (announcement.getTaskStatus().isComplete()) { - taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus()); - runPendingTasks(); - } - break; - case CHILD_REMOVED: - taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); - taskRunnerWorkItem = runningTasks.remove(taskId); - if (taskRunnerWorkItem != null) { - log.info("Task[%s] just disappeared!", taskId); - taskRunnerWorkItem.setResult(TaskStatus.failure(taskId)); - TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId)); - } else { - log.info("Task[%s] went bye bye.", taskId); - } - break; - case INITIALIZED: - if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) { - retVal.set(zkWorker); - } else { - final String message = StringUtils.format( - "WTF?! Tried to add already-existing worker[%s]", - worker.getHost() - ); - log.makeAlert(message) - .addData("workerHost", worker.getHost()) - .addData("workerIp", worker.getIp()) - .emit(); - retVal.setException(new IllegalStateException(message)); - } - runPendingTasks(); - break; - case CONNECTION_SUSPENDED: - case CONNECTION_RECONNECTED: - case CONNECTION_LOST: - // do nothing + @VisibleForTesting + PathChildrenCacheListener getStatusListener(final Worker worker, final ZkWorker zkWorker, final SettableFuture<ZkWorker> retVal) + { + return (client, event) -> { + final String taskId; + final RemoteTaskRunnerWorkItem taskRunnerWorkItem; + synchronized (statusLock) { + try { + switch (event.getType()) { // lgtm [java/dereferenced-value-may-be-null] + case CHILD_ADDED: + case CHILD_UPDATED: + taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); // lgtm [java/dereferenced-value-may-be-null] + final TaskAnnouncement announcement = jsonMapper.readValue( + event.getData().getData(), TaskAnnouncement.class // lgtm [java/dereferenced-value-may-be-null] + ); + + log.info( + "Worker[%s] wrote %s status for task [%s] on [%s]", + zkWorker.getWorker().getHost(), + announcement.getTaskStatus().getStatusCode(), + taskId, + announcement.getTaskLocation() + ); + + // Synchronizing state with ZK + statusLock.notifyAll(); + + final RemoteTaskRunnerWorkItem tmp; + if ((tmp = runningTasks.get(taskId)) != null) { + taskRunnerWorkItem = tmp; + } else { + final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem( + taskId, + announcement.getTaskType(), + zkWorker.getWorker(), + TaskLocation.unknown(), + announcement.getTaskDataSource() + ); + final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent( + taskId, + newTaskRunnerWorkItem + ); + if (existingItem == null) { + log.warn( + "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s", + zkWorker.getWorker().getHost(), + taskId + ); + taskRunnerWorkItem = newTaskRunnerWorkItem; + } else { + taskRunnerWorkItem = existingItem; } } - catch (Exception e) { - log.makeAlert(e, "Failed to handle new worker status") - .addData("worker", zkWorker.getWorker().getHost()) - .addData("znode", event.getData().getPath()) + + if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) { + taskRunnerWorkItem.setLocation(announcement.getTaskLocation()); + TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation()); + } + + if (announcement.getTaskStatus().isComplete()) { + taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus()); + runPendingTasks(); + } + break; + case CHILD_REMOVED: + taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); // lgtm [java/dereferenced-value-may-be-null] + taskRunnerWorkItem = runningTasks.remove(taskId); + if (taskRunnerWorkItem != null) { + log.info("Task[%s] just disappeared!", taskId); + taskRunnerWorkItem.setResult(TaskStatus.failure(taskId)); + TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId)); + } else { + log.info("Task[%s] went bye bye.", taskId); + } + break; + case INITIALIZED: + if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) { + retVal.set(zkWorker); + } else { + final String message = StringUtils.format( + "This should not happen...tried to add already-existing worker[%s]", + worker.getHost() + ); + log.makeAlert(message) + .addData("workerHost", worker.getHost()) + .addData("workerIp", worker.getIp()) .emit(); + retVal.setException(new IllegalStateException(message)); } + runPendingTasks(); + break; + case CONNECTION_SUSPENDED: + case CONNECTION_RECONNECTED: + case CONNECTION_LOST: + // do nothing + } + } + catch (Exception e) { + String znode = null; + String eventType = null; + if (event != null) { Review comment: I don't think `event` can ever be null. Check out callers of `PathChildrenCache.callListeners()`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org