This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 34c04daa9f Fix infinite iteration in http sync monitoring (#13731)
34c04daa9f is described below

commit 34c04daa9f731c0d3230062d9977fd67dd42a6f0
Author: AmatyaAvadhanula <amatya.avadhan...@imply.io>
AuthorDate: Wed Feb 8 15:14:11 2023 +0530

    Fix infinite iteration in http sync monitoring (#13731)
    
    * Fix infinite iteration in http task runner
    
    * Fix infinite iteration in http server view
    
    * Add tests
---
 .../overlord/hrtr/HttpRemoteTaskRunner.java        | 46 ++++++++------
 .../overlord/hrtr/HttpRemoteTaskRunnerTest.java    | 72 ++++++++++++++++++++++
 .../druid/client/HttpServerInventoryView.java      | 64 +++++++++++--------
 .../druid/client/HttpServerInventoryViewTest.java  | 44 +++++++++++++
 4 files changed, 182 insertions(+), 44 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 6eb1a9c28d..65f2477080 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -31,6 +31,7 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -574,7 +575,8 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
     );
   }
 
-  private void addWorker(final Worker worker)
+  @VisibleForTesting
+  void addWorker(final Worker worker)
   {
     synchronized (workers) {
       log.info("Worker[%s] reportin' for duty!", worker.getHost());
@@ -752,23 +754,7 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
           log.debug("Running the Sync Monitoring.");
 
           try {
-            for (Map.Entry<String, WorkerHolder> e : workers.entrySet()) {
-              WorkerHolder workerHolder = e.getValue();
-              if (!workerHolder.getUnderlyingSyncer().isOK()) {
-                synchronized (workers) {
-                  // check again that server is still there and only then 
reset.
-                  if (workers.containsKey(e.getKey())) {
-                    log.makeAlert(
-                        "Worker[%s] is not syncing properly. Current state is 
[%s]. Resetting it.",
-                        workerHolder.getWorker().getHost(),
-                        workerHolder.getUnderlyingSyncer().getDebugInfo()
-                    ).emit();
-                    removeWorker(workerHolder.getWorker());
-                    addWorker(workerHolder.getWorker());
-                  }
-                }
-              }
-            }
+            syncMonitoring();
           }
           catch (Exception ex) {
             if (ex instanceof InterruptedException) {
@@ -784,6 +770,30 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
     );
   }
 
+  @VisibleForTesting
+  void syncMonitoring()
+  {
+    // Ensure that the collection is not being modified during iteration. 
Iterate over a copy
+    final Set<Map.Entry<String, WorkerHolder>> workerEntrySet = 
ImmutableSet.copyOf(workers.entrySet());
+    for (Map.Entry<String, WorkerHolder> e : workerEntrySet) {
+      WorkerHolder workerHolder = e.getValue();
+      if (!workerHolder.getUnderlyingSyncer().isOK()) {
+        synchronized (workers) {
+          // check again that server is still there and only then reset.
+          if (workers.containsKey(e.getKey())) {
+            log.makeAlert(
+                "Worker[%s] is not syncing properly. Current state is [%s]. 
Resetting it.",
+                workerHolder.getWorker().getHost(),
+                workerHolder.getUnderlyingSyncer().getDebugInfo()
+            ).emit();
+            removeWorker(workerHolder.getWorker());
+            addWorker(workerHolder.getWorker());
+          }
+        }
+      }
+    }
+  }
+
   /**
    * This method returns the debugging information exposed by {@link 
HttpRemoteTaskRunnerResource} and meant
    * for that use only. It must not be used for any other purpose.
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index e78b78517a..fe1b0ca498 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -58,6 +58,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.coordination.ChangeRequestHttpSyncer;
 import org.apache.druid.server.initialization.IndexerZkConfig;
 import org.apache.druid.server.initialization.ZkPathsConfig;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -70,6 +71,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -1668,6 +1670,52 @@ public class HttpRemoteTaskRunnerTest
 
   }
 
+  @Test(timeout = 60_000L)
+  public void testSyncMonitoring_finiteIteration()
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
+        TestHelper.makeJsonMapper(),
+        new HttpRemoteTaskRunnerConfig(),
+        EasyMock.createNiceMock(HttpClient.class),
+        DSuppliers.of(new 
AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+        new NoopProvisioningStrategy<>(),
+        druidNodeDiscoveryProvider,
+        EasyMock.createMock(TaskStorage.class),
+        EasyMock.createNiceMock(CuratorFramework.class),
+        new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
+        new NoopServiceEmitter()
+    )
+    {
+      @Override
+      protected WorkerHolder createWorkerHolder(
+          ObjectMapper smileMapper,
+          HttpClient httpClient,
+          HttpRemoteTaskRunnerConfig config,
+          ScheduledExecutorService workersSyncExec,
+          WorkerHolder.Listener listener,
+          Worker worker,
+          List<TaskAnnouncement> knownAnnouncements
+      )
+      {
+        return createNonSyncingWorkerHolder(worker);
+      }
+    };
+
+    taskRunner.start();
+    taskRunner.addWorker(createWorker("abc"));
+    taskRunner.addWorker(createWorker("xyz"));
+    taskRunner.addWorker(createWorker("lol"));
+    Assert.assertEquals(3, taskRunner.getWorkerSyncerDebugInfo().size());
+    taskRunner.syncMonitoring();
+    Assert.assertEquals(3, taskRunner.getWorkerSyncerDebugInfo().size());
+  }
+
   public static HttpRemoteTaskRunner createTaskRunnerForTestTaskAddedOrUpdated(
       TaskStorage taskStorage,
       List<Object> listenerNotificationsAccumulator
@@ -1730,6 +1778,30 @@ public class HttpRemoteTaskRunnerTest
     return taskRunner;
   }
 
+  private Worker createWorker(String host)
+  {
+    Worker worker = EasyMock.createMock(Worker.class);
+    EasyMock.expect(worker.getHost()).andReturn(host).anyTimes();
+    EasyMock.replay(worker);
+    return worker;
+  }
+
+  private WorkerHolder createNonSyncingWorkerHolder(Worker worker)
+  {
+    ChangeRequestHttpSyncer syncer = 
EasyMock.createMock(ChangeRequestHttpSyncer.class);
+    EasyMock.expect(syncer.isOK()).andReturn(false).anyTimes();
+    
EasyMock.expect(syncer.getDebugInfo()).andReturn(Collections.emptyMap()).anyTimes();
+    WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class);
+    
EasyMock.expect(workerHolder.getUnderlyingSyncer()).andReturn(syncer).anyTimes();
+    EasyMock.expect(workerHolder.getWorker()).andReturn(worker).anyTimes();
+    workerHolder.start();
+    EasyMock.expectLastCall();
+    workerHolder.stop();
+    EasyMock.expectLastCall();
+    EasyMock.replay(syncer, workerHolder);
+    return workerHolder;
+  }
+
   private static WorkerHolder createWorkerHolder(
       ObjectMapper smileMapper,
       HttpClient httpClient,
diff --git 
a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java 
b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
index decab1f7cc..893d455dc7 100644
--- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
+++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
@@ -21,11 +21,13 @@ package org.apache.druid.client;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import com.google.common.net.HostAndPort;
 import org.apache.druid.concurrent.LifecycleLock;
@@ -58,6 +60,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
@@ -375,7 +378,8 @@ public class HttpServerInventoryView implements 
ServerInventoryView, FilteredSer
     );
   }
 
-  private void serverAdded(DruidServer server)
+  @VisibleForTesting
+  void serverAdded(DruidServer server)
   {
     synchronized (servers) {
       DruidServerHolder holder = servers.get(server.getName());
@@ -430,31 +434,7 @@ public class HttpServerInventoryView implements 
ServerInventoryView, FilteredSer
           log.debug("Running the Sync Monitoring.");
 
           try {
-            for (Map.Entry<String, DruidServerHolder> e : servers.entrySet()) {
-              DruidServerHolder serverHolder = e.getValue();
-              if (!serverHolder.syncer.isOK()) {
-                synchronized (servers) {
-                  // check again that server is still there and only then 
reset.
-                  if (servers.containsKey(e.getKey())) {
-                    log.makeAlert(
-                        "Server[%s] is not syncing properly. Current state is 
[%s]. Resetting it.",
-                        serverHolder.druidServer.getName(),
-                        serverHolder.syncer.getDebugInfo()
-                    ).emit();
-                    serverRemoved(serverHolder.druidServer);
-                    serverAdded(new DruidServer(
-                        serverHolder.druidServer.getName(),
-                        serverHolder.druidServer.getHostAndPort(),
-                        serverHolder.druidServer.getHostAndTlsPort(),
-                        serverHolder.druidServer.getMaxSize(),
-                        serverHolder.druidServer.getType(),
-                        serverHolder.druidServer.getTier(),
-                        serverHolder.druidServer.getPriority()
-                    ));
-                  }
-                }
-              }
-            }
+            syncMonitoring();
           }
           catch (Exception ex) {
             if (ex instanceof InterruptedException) {
@@ -470,6 +450,38 @@ public class HttpServerInventoryView implements 
ServerInventoryView, FilteredSer
     );
   }
 
+  @VisibleForTesting
+  void syncMonitoring()
+  {
+    // Ensure that the collection is not being modified during iteration. 
Iterate over a copy
+    final Set<Map.Entry<String, DruidServerHolder>> serverEntrySet = 
ImmutableSet.copyOf(servers.entrySet());
+    for (Map.Entry<String, DruidServerHolder> e : serverEntrySet) {
+      DruidServerHolder serverHolder = e.getValue();
+      if (!serverHolder.syncer.isOK()) {
+        synchronized (servers) {
+          // check again that server is still there and only then reset.
+          if (servers.containsKey(e.getKey())) {
+            log.makeAlert(
+                "Server[%s] is not syncing properly. Current state is [%s]. 
Resetting it.",
+                serverHolder.druidServer.getName(),
+                serverHolder.syncer.getDebugInfo()
+            ).emit();
+            serverRemoved(serverHolder.druidServer);
+            serverAdded(new DruidServer(
+                serverHolder.druidServer.getName(),
+                serverHolder.druidServer.getHostAndPort(),
+                serverHolder.druidServer.getHostAndTlsPort(),
+                serverHolder.druidServer.getMaxSize(),
+                serverHolder.druidServer.getType(),
+                serverHolder.druidServer.getTier(),
+                serverHolder.druidServer.getPriority()
+            ));
+          }
+        }
+      }
+    }
+  }
+
   @Override
   public boolean isStarted()
   {
diff --git 
a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java 
b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
index 896f90d67d..b08db90c2c 100644
--- 
a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
@@ -273,6 +273,50 @@ public class HttpServerInventoryViewTest
     httpServerInventoryView.stop();
   }
 
+  @Test(timeout = 60_000L)
+  public void testSyncMonitoring()
+  {
+    ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
+
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(DataNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    TestHttpClient httpClient = new TestHttpClient(ImmutableList.of());
+
+    HttpServerInventoryView httpServerInventoryView = new 
HttpServerInventoryView(
+        jsonMapper,
+        httpClient,
+        druidNodeDiscoveryProvider,
+        (pair) -> !pair.rhs.getDataSource().equals("non-loading-datasource"),
+        new HttpServerInventoryViewConfig(null, null, null),
+        "test"
+    );
+
+    httpServerInventoryView.start();
+    httpServerInventoryView.serverAdded(makeServer("abc.com:8080"));
+    httpServerInventoryView.serverAdded(makeServer("xyz.com:8080"));
+    httpServerInventoryView.serverAdded(makeServer("lol.com:8080"));
+    Assert.assertEquals(3, httpServerInventoryView.getDebugInfo().size());
+    httpServerInventoryView.syncMonitoring();
+    Assert.assertEquals(3, httpServerInventoryView.getDebugInfo().size());
+  }
+
+  private DruidServer makeServer(String host)
+  {
+    return new DruidServer(
+        host,
+        host,
+        host,
+        100_000_000L,
+        ServerType.HISTORICAL,
+        "__default_tier",
+        50
+    );
+  }
+
   private static class TestDruidNodeDiscovery implements DruidNodeDiscovery
   {
     Listener listener;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to