jtuglu1 commented on code in PR #18851:
URL: https://github.com/apache/druid/pull/18851#discussion_r2648834253
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -603,58 +544,62 @@ private Worker toWorker(DiscoveryDruidNode node)
@VisibleForTesting
void addWorker(final Worker worker)
{
- synchronized (workers) {
- log.info("Worker[%s] reportin' for duty!", worker.getHost());
- cancelWorkerCleanup(worker.getHost());
-
- WorkerHolder holder = workers.get(worker.getHost());
- if (holder == null) {
- List<TaskAnnouncement> expectedAnnouncements = new ArrayList<>();
- synchronized (statusLock) {
- // It might be a worker that existed before, temporarily went away
and came back. We might have a set of
- // tasks that we think are running on this worker. Provide that
information to WorkerHolder that
- // manages the task syncing with that worker.
- for (Map.Entry<String, HttpRemoteTaskRunnerWorkItem> e :
tasks.entrySet()) {
- HttpRemoteTaskRunnerWorkItem workItem = e.getValue();
- if (workItem.isRunningOnWorker(worker)) {
- // This announcement is only used to notify when a task has
disappeared on the worker
- // So it is okay to set the dataSource and taskResource to null
as they will not be used
- expectedAnnouncements.add(
- TaskAnnouncement.create(
- workItem.getTaskId(),
- workItem.getTaskType(),
- null,
- TaskStatus.running(workItem.getTaskId()),
- workItem.getLocation(),
- null
- )
+ log.info("Adding worker[%s]", worker.getHost());
+ synchronized (workerStateLock) {
+ workers.compute(
+ worker.getHost(), (key, workerEntry) -> {
+ cancelWorkerCleanup(worker.getHost());
+
+ // There cannot be any new tasks assigned to this worker as the
entry has not been published yet.
+ // That being said, there can be callbacks in taskAddedOrUpdated()
where some task suddenly begins running
+ // on this worker. That method still blocks on this key lock, so
it will occur strictly before/after this insertion.
+ if (workerEntry == null) {
+ log.info("Unrecognized worker[%s], rebuilding task mapping",
worker.getHost());
+ final List<TaskAnnouncement> expectedAnnouncements = new
ArrayList<>();
+ // It might be a worker that existed before, temporarily went
away and came back. We might have a set of
+ // tasks that we think are running on this worker. Provide that
information to WorkerHolder that
+ // manages the task syncing with that worker.
+ tasks.forEach((taskId, taskEntry) -> {
Review Comment:
Sure, I had considered this but opted for parity with old design.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]