This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 347779b  Zookeeper loss (#6740)
347779b is described below

commit 347779b17a5b0156c4634a06c2cba261019c9760
Author: Michael Trelinski <[email protected]>
AuthorDate: Fri Mar 29 15:10:42 2019 -0700

    Zookeeper loss (#6740)
    
    * Update init
    
    Fix bin/init to source from proper directory.
    
    * Fix for Proposal #6518: Shutdown druid processes upon complete loss of ZK 
connectivity
    
    * Zookeeper Loss:
    
    - Add feature documentation
    - Cosmetic refactors
    - Variable extractions
    - Remove getter
    
    * - Change config key name and reword documentation
    - Switch from Function<Void,Void> to Runnable/Lambda
    - try { … } finally { … }
    
    * Fix line length too long
    
    * - change to formatted string for logging
    - use System.err.println after lifecycle stops
    
    * commenting on makeEnsembleProvider()-created Zookeeper termination
    
    * Add javadoc
    
    * added java doc reference back to apache discussion thread.
    
    * move comment to other class
    
    * favor two-slash comments instead of multiline comments
---
 docs/content/configuration/index.md                |   1 +
 .../BoundedExponentialBackoffRetryWithQuit.java    |  63 ++++++++++++
 .../org/apache/druid/curator/CuratorConfig.java    |  18 ++++
 .../org/apache/druid/curator/CuratorModule.java    |  54 +++++++++-
 ...BoundedExponentialBackoffRetryWithQuitTest.java | 111 +++++++++++++++++++++
 5 files changed, 245 insertions(+), 2 deletions(-)

diff --git a/docs/content/configuration/index.md 
b/docs/content/configuration/index.md
index fa71318..51b206b 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -171,6 +171,7 @@ We recommend just setting the base ZK path and the ZK 
service host, but all ZK p
 |`druid.zk.service.user`|The username to authenticate with ZooKeeper. This is 
an optional property.|none|
 |`druid.zk.service.pwd`|The [Password 
Provider](../operations/password-provider.html) or the string password to 
authenticate with ZooKeeper. This is an optional property.|none|
 |`druid.zk.service.authScheme`|digest is the only authentication scheme 
supported. |digest|
+|`druid.zk.service.terminateDruidProcessOnConnectFail`|If set to 'true' and 
the connection to ZooKeeper fails (after exhausting all potential backoff 
retires), Druid process terminates itself with exit code 1.|false|
 
 #### Zookeeper Behavior
 
diff --git 
a/server/src/main/java/org/apache/druid/curator/BoundedExponentialBackoffRetryWithQuit.java
 
b/server/src/main/java/org/apache/druid/curator/BoundedExponentialBackoffRetryWithQuit.java
new file mode 100644
index 0000000..531edb4
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/curator/BoundedExponentialBackoffRetryWithQuit.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator;
+
+import org.apache.curator.RetrySleeper;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import org.apache.druid.java.util.common.logger.Logger;
+
+/**
+ * BoundedExponentialBackoffRetryWithQuit extends 
BoundedExponentialBackoffRetry for simplicity. It's not actually a
+ * BoundedExponentialBackoffRetry from the Liskov substitution principle point 
of view,
+ * but it doesn't matter in this code.
+ *
+ */
+public class BoundedExponentialBackoffRetryWithQuit extends 
BoundedExponentialBackoffRetry
+{
+
+  private static final Logger log = new 
Logger(BoundedExponentialBackoffRetryWithQuit.class);
+
+  private final Runnable exitRunner;
+
+  public BoundedExponentialBackoffRetryWithQuit(
+      Runnable exitRunner,
+      int baseSleepTimeMs,
+      int maxSleepTimeMs,
+      int maxRetries
+  )
+  {
+    super(baseSleepTimeMs, maxSleepTimeMs, maxRetries);
+    this.exitRunner = exitRunner;
+    log.info("BoundedExponentialBackoffRetryWithQuit Retry Policy selected.");
+  }
+
+  @Override
+  public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper 
sleeper)
+  {
+    log.warn("Zookeeper can't be reached, retrying (retryCount = %s out of 
%s)...", retryCount, this.getN());
+    boolean shouldRetry = super.allowRetry(retryCount, elapsedTimeMs, sleeper);
+    if (!shouldRetry) {
+      log.warn("Since Zookeeper can't be reached after retries exhausted, 
calling exit function...");
+      exitRunner.run();
+    }
+    return shouldRetry;
+  }
+
+}
diff --git a/server/src/main/java/org/apache/druid/curator/CuratorConfig.java 
b/server/src/main/java/org/apache/druid/curator/CuratorConfig.java
index d07b9d9..52af44e 100644
--- a/server/src/main/java/org/apache/druid/curator/CuratorConfig.java
+++ b/server/src/main/java/org/apache/druid/curator/CuratorConfig.java
@@ -52,6 +52,10 @@ public class CuratorConfig
   @JsonProperty("authScheme")
   private String authScheme = "digest";
 
+  @JsonProperty("terminateDruidProcessOnConnectFail")
+  private boolean terminateDruidProcessOnConnectFail = false;
+
+
   public String getZkHosts()
   {
     return zkHosts;
@@ -109,4 +113,18 @@ public class CuratorConfig
     return authScheme;
   }
 
+  public boolean getTerminateDruidProcessOnConnectFail()
+  {
+    return terminateDruidProcessOnConnectFail;
+  }
+
+  public void setTerminateDruidProcessOnConnectFail(Boolean 
terminateDruidProcessOnConnectFail)
+  {
+    if (terminateDruidProcessOnConnectFail == null) {
+      this.terminateDruidProcessOnConnectFail = false;
+    } else {
+      this.terminateDruidProcessOnConnectFail = 
terminateDruidProcessOnConnectFail;
+    }
+  }
+
 }
diff --git a/server/src/main/java/org/apache/druid/curator/CuratorModule.java 
b/server/src/main/java/org/apache/druid/curator/CuratorModule.java
index 84af443..2c27dba 100644
--- a/server/src/main/java/org/apache/druid/curator/CuratorModule.java
+++ b/server/src/main/java/org/apache/druid/curator/CuratorModule.java
@@ -22,6 +22,8 @@ package org.apache.druid.curator;
 import com.google.inject.Binder;
 import com.google.inject.Module;
 import com.google.inject.Provides;
+import io.netty.util.SuppressForbidden;
+import org.apache.curator.RetryPolicy;
 import org.apache.curator.ensemble.EnsembleProvider;
 import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient;
 import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider;
@@ -70,6 +72,7 @@ public class CuratorModule implements Module
 
   @Provides
   @LazySingleton
+  @SuppressForbidden(reason = "System#err")
   public CuratorFramework makeCurator(CuratorConfig config, EnsembleProvider 
ensembleProvider, Lifecycle lifecycle)
   {
     final Builder builder = CuratorFrameworkFactory.builder();
@@ -79,10 +82,33 @@ public class CuratorModule implements Module
           StringUtils.format("%s:%s", config.getZkUser(), 
config.getZkPwd()).getBytes(StandardCharsets.UTF_8)
       );
     }
+
+    RetryPolicy retryPolicy;
+    if (config.getTerminateDruidProcessOnConnectFail()) {
+      final Runnable exitRunner = () -> {
+        try {
+          log.error("Zookeeper can't be reached, forcefully stopping 
lifecycle...");
+          lifecycle.stop();
+          System.err.println("Zookeeper can't be reached, forcefully stopping 
virtual machine...");
+        }
+        finally {
+          System.exit(1);
+        }
+      };
+      retryPolicy = new BoundedExponentialBackoffRetryWithQuit(
+          exitRunner,
+          BASE_SLEEP_TIME_MS,
+          MAX_SLEEP_TIME_MS,
+          MAX_RETRIES
+      );
+    } else {
+      retryPolicy = new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, 
MAX_SLEEP_TIME_MS, MAX_RETRIES);
+    }
+
     final CuratorFramework framework = builder
         .ensembleProvider(ensembleProvider)
         .sessionTimeoutMs(config.getZkSessionTimeoutMs())
-        .retryPolicy(new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, 
MAX_SLEEP_TIME_MS, MAX_RETRIES))
+        .retryPolicy(retryPolicy)
         .compressionProvider(new 
PotentiallyGzippedCompressionProvider(config.getEnableCompression()))
         .aclProvider(config.getEnableAcl() ? new SecuredACLProvider() : new 
DefaultACLProvider())
         .build();
@@ -127,6 +153,30 @@ public class CuratorModule implements Module
       return new FixedEnsembleProvider(config.getZkHosts());
     }
 
+    RetryPolicy retryPolicy;
+    if (config.getTerminateDruidProcessOnConnectFail()) {
+      // It's unknown whether or not this precaution is needed.  Tests 
revealed that this path was never taken.
+      //  see discussions in 
https://github.com/apache/incubator-druid/pull/6740
+
+      final Runnable exitRunner = () -> {
+        try {
+          log.error("Zookeeper can't be reached, forcefully stopping virtual 
machine...");
+        }
+        finally {
+          System.exit(1);
+        }
+      };
+
+      retryPolicy = new BoundedExponentialBackoffRetryWithQuit(
+          exitRunner,
+          BASE_SLEEP_TIME_MS,
+          MAX_SLEEP_TIME_MS,
+          MAX_RETRIES
+      );
+    } else {
+      retryPolicy = new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, 
MAX_SLEEP_TIME_MS, MAX_RETRIES);
+    }
+
     return new ExhibitorEnsembleProvider(
         new Exhibitors(
             exConfig.getHosts(),
@@ -136,7 +186,7 @@ public class CuratorModule implements Module
         new DefaultExhibitorRestClient(exConfig.getUseSsl()),
         exConfig.getRestUriPath(),
         exConfig.getPollingMs(),
-        new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, 
MAX_SLEEP_TIME_MS, MAX_RETRIES)
+        retryPolicy
     )
     {
       @Override
diff --git 
a/server/src/test/java/org/apache/druid/curator/BoundedExponentialBackoffRetryWithQuitTest.java
 
b/server/src/test/java/org/apache/druid/curator/BoundedExponentialBackoffRetryWithQuitTest.java
new file mode 100644
index 0000000..a5fb4fe
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/curator/BoundedExponentialBackoffRetryWithQuitTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.test.TestingServer;
+import org.apache.druid.java.util.common.lifecycle.Lifecycle;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+public final class BoundedExponentialBackoffRetryWithQuitTest
+{
+
+  private static final Logger log = new 
Logger(BoundedExponentialBackoffRetryWithQuitTest.class);
+
+  /*
+  Methodology (order is important!):
+    1. Zookeeper Server Service started
+    2. Lifecycle started
+    3. Curator invokes connection to service
+    4. Service is stopped
+    5. Curator attempts to do something, which invokes the retries policy
+    6. Retries exceed limit, call function which simulates an exit (since 
mocking System.exit() is hard to do without
+        changing a lot of dependencies)
+   */
+  @Test
+  public void testExitWithLifecycle() throws Exception
+  {
+    final Lifecycle actualNoop = new Lifecycle() {
+      @Override
+      public void start() throws Exception
+      {
+        super.start();
+        log.info("Starting lifecycle...");
+      }
+
+      @Override
+      public void stop()
+      {
+        super.stop();
+        log.info("Stopping lifecycle...");
+      }
+    };
+    Lifecycle noop = EasyMock.mock(Lifecycle.class);
+
+    noop.start();
+    EasyMock.expectLastCall().andDelegateTo(actualNoop);
+    noop.stop();
+    EasyMock.expectLastCall().andDelegateTo(actualNoop);
+    EasyMock.replay(noop);
+
+    Runnable exitFunction = () -> {
+      log.info("Zookeeper retries exhausted, exiting...");
+      noop.stop();
+      throw new RuntimeException("Simulated exit");
+    };
+
+    TestingServer server = new TestingServer();
+    BoundedExponentialBackoffRetryWithQuit retry = new 
BoundedExponentialBackoffRetryWithQuit(exitFunction, 1, 1, 2);
+    CuratorFramework curator = CuratorFrameworkFactory
+        .builder()
+        .connectString(server.getConnectString())
+        .sessionTimeoutMs(1000)
+        .connectionTimeoutMs(1)
+        .retryPolicy(retry)
+        .build();
+    server.start();
+    System.out.println("Server started.");
+    curator.start();
+    noop.start();
+    curator.checkExists().forPath("/tmp");
+    log.info("Connected.");
+    boolean failed = false;
+    try {
+      server.stop();
+      log.info("Stopped.");
+      curator.checkExists().forPath("/tmp");
+      Thread.sleep(10);
+      curator.checkExists().forPath("/tmp");
+    }
+    catch (Exception e) {
+      Assert.assertTrue("Correct exception type", e instanceof 
RuntimeException);
+      EasyMock.verify(noop);
+      curator.close();
+      failed = true;
+    }
+    Assert.assertTrue("Must be marked in failure state", failed);
+    log.info("Lifecycle stopped.");
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to