This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 1242dc1  Add a configuration option to allow enable/disable writing 
error log to ZK (#1487)
1242dc1 is described below

commit 1242dc16d249816176e64a7304583bdc78fbe030
Author: kaisun2000 <[email protected]>
AuthorDate: Tue Nov 10 23:13:41 2020 -0800

    Add a configuration option to allow enable/disable writing error log to ZK 
(#1487)
    
    Improve statusUpdateUtil log error to ZK by adding an option to enabled
    it. By default, it would not log error to ZK. This is to avoid some
    error code path that keep flooding ZK sever which cause DoS to Zk,
    such as HelixTaskExecutor onMessage creation messageHandler exception.
---
 .../java/org/apache/helix/SystemPropertyKeys.java  |   2 +
 .../org/apache/helix/util/StatusUpdateUtil.java    |  53 ++++----
 .../manager/TestParticipantManager.java            |   9 ++
 .../apache/helix/util/TestStatusUpdateUtil.java    | 138 +++++++++++++++++++++
 4 files changed, 175 insertions(+), 27 deletions(-)

diff --git 
a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java 
b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
index 909a719..6a73a7e 100644
--- a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -82,4 +82,6 @@ public class SystemPropertyKeys {
   // System Property Metadata Store Directory Server endpoint key
   public static final String MSDS_SERVER_ENDPOINT_KEY =
       MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY;
+
+  public static final String STATEUPDATEUTIL_ERROR_PERSISTENCY_ENABLED = 
"helix.StateUpdateUtil.errorLog.enabled";
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java 
b/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java
index fbef0d6..1d70f99 100644
--- a/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java
@@ -39,6 +39,7 @@ import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.model.Error;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
@@ -55,6 +56,9 @@ import org.slf4j.LoggerFactory;
 public class StatusUpdateUtil {
   static Logger _logger = LoggerFactory.getLogger(StatusUpdateUtil.class);
 
+  public static final boolean ERROR_LOG_TO_ZK_ENABLED =
+      
Boolean.getBoolean(SystemPropertyKeys.STATEUPDATEUTIL_ERROR_PERSISTENCY_ENABLED);
+
   public static class Transition implements Comparable<Transition> {
     private final String _msgID;
     private final long _timeStamp;
@@ -492,46 +496,37 @@ public class StatusUpdateUtil {
 
     Builder keyBuilder = accessor.keyBuilder();
     if (!_recordedMessages.containsKey(message.getMsgId())) {
-      if (isController) {
-        accessor
-            
.updateProperty(keyBuilder.controllerTaskStatus(statusUpdateSubPath, 
statusUpdateKey),
-                new StatusUpdate(createMessageLogRecord(message)));
+      ZNRecord statusUpdateRecord = createMessageLogRecord(message);
+      PropertyKey propertyKey;
 
+      if (isController) {
+        propertyKey = keyBuilder.controllerTaskStatus(statusUpdateSubPath, 
statusUpdateKey);
       } else {
-
-        PropertyKey propertyKey =
+        propertyKey =
             keyBuilder.stateTransitionStatus(instanceName, sessionId, 
statusUpdateSubPath,
                 statusUpdateKey);
+      }
+      accessor.updateProperty(propertyKey, new 
StatusUpdate(statusUpdateRecord));
 
-        ZNRecord statusUpdateRecord = createMessageLogRecord(message);
-
-        // For now write participant StatusUpdates to log4j.
-        // we are using restlet as another data channel to report to 
controller.
-        if (_logger.isTraceEnabled()) {
-          _logger.trace("StatusUpdate path:" + propertyKey.getPath() + ", 
updates:"
-              + statusUpdateRecord);
-        }
-        accessor.updateProperty(propertyKey, new 
StatusUpdate(statusUpdateRecord));
-
+      if (_logger.isTraceEnabled()) {
+        _logger.trace("StatusUpdate path:" + propertyKey.getPath() + ", 
updates:"
+            + statusUpdateRecord);
       }
       _recordedMessages.put(message.getMsgId(), message.getMsgId());
     }
 
+    PropertyKey propertyKey;
     if (isController) {
-      accessor.updateProperty(
-          keyBuilder.controllerTaskStatus(statusUpdateSubPath, 
statusUpdateKey), new StatusUpdate(
-              record));
+      propertyKey = keyBuilder.controllerTaskStatus(statusUpdateSubPath, 
statusUpdateKey);
     } else {
-
-      PropertyKey propertyKey =
+      propertyKey =
           keyBuilder.stateTransitionStatus(instanceName, sessionId, 
statusUpdateSubPath,
               statusUpdateKey);
-      // For now write participant StatusUpdates to log4j.
-      // we are using restlet as another data channel to report to controller.
-      if (_logger.isTraceEnabled()) {
-        _logger.trace("StatusUpdate path:" + propertyKey.getPath() + ", 
updates:" + record);
-      }
-      accessor.updateProperty(propertyKey, new StatusUpdate(record));
+    }
+    accessor.updateProperty(propertyKey, new StatusUpdate(record));
+
+    if (_logger.isTraceEnabled()) {
+      _logger.trace("StatusUpdate path:" + propertyKey.getPath() + ", 
updates:" + record);
     }
 
     // If the error level is ERROR, also write the record to "ERROR" ZNode
@@ -560,6 +555,10 @@ public class StatusUpdateUtil {
    */
   void publishErrorRecord(ZNRecord record, String instanceName, String 
updateSubPath,
       String updateKey, String sessionId, HelixDataAccessor accessor, boolean 
isController) {
+    _logger.error("StatusUpdate Error record: {}", record);
+    if (!ERROR_LOG_TO_ZK_ENABLED) {
+      return;
+    }
     Builder keyBuilder = accessor.keyBuilder();
     if (isController) {
       // TODO need to fix: ERRORS_CONTROLLER doesn't have a form of
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
 
b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
index 0b690e2..c43fb2a 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
@@ -60,12 +60,16 @@ import 
org.apache.helix.monitoring.mbeans.ZkClientPathMonitor;
 import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.testng.Assert;
 import org.testng.ITestContext;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
 public class TestParticipantManager extends ZkTestBase {
   private MBeanServer _server = ManagementFactory.getPlatformMBeanServer();
   private String clusterName = TestHelper.getTestClassName();
+  static {
+    
System.setProperty(SystemPropertyKeys.STATEUPDATEUTIL_ERROR_PERSISTENCY_ENABLED,
 "true");
+  }
 
   @AfterMethod
   public void afterMethod(Method testMethod, ITestContext testContext) {
@@ -73,6 +77,11 @@ public class TestParticipantManager extends ZkTestBase {
     super.endTest(testMethod, testContext);
   }
 
+  @AfterClass
+  public void afterClass() {
+    
System.clearProperty(SystemPropertyKeys.STATEUPDATEUTIL_ERROR_PERSISTENCY_ENABLED);
+  }
+
   @Test
   public void simpleIntegrationTest() throws Exception {
     int n = 1;
diff --git 
a/helix-core/src/test/java/org/apache/helix/util/TestStatusUpdateUtil.java 
b/helix-core/src/test/java/org/apache/helix/util/TestStatusUpdateUtil.java
new file mode 100644
index 0000000..fbdc364
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/util/TestStatusUpdateUtil.java
@@ -0,0 +1,138 @@
+package org.apache.helix.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+
+import org.apache.helix.HelixConstants;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.messaging.handling.HelixStateTransitionHandler;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.StatusUpdate;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.zkclient.exception.ZkException;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestStatusUpdateUtil extends ZkTestBase {
+  private String clusterName = TestHelper.getTestClassName();
+  private int n = 1;
+  private Message message = new Message(Message.MessageType.STATE_TRANSITION, 
"Some unique id");
+  private MockParticipantManager[] participants = new 
MockParticipantManager[n];
+
+
+  static void setFinalStatic(Field field, Object newValue) throws Exception {
+    field.setAccessible(true);
+
+    Field modifiersField = Field.class.getDeclaredField("modifiers");
+    modifiersField.setAccessible(true);
+    modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+
+    field.set(null, newValue);
+  }
+
+  @AfterClass
+  public void afterClass() {
+    for (int i = 0; i < n; i++) {
+      participants[i].syncStop();
+    }
+  }
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        1, // partitions per resource
+        n, // number of nodes
+        1, // replicas
+        "MasterSlave", true);
+
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, 
instanceName);
+      participants[i].syncStart();
+    }
+
+    message.setSrcName("cm-instance-0");
+    message.setTgtSessionId(participants[0].getSessionId());
+    message.setFromState("Offline");
+    message.setToState("Slave");
+    message.setPartitionName("TestDB_0");
+    message.setMsgId("Some unique message id");
+    message.setResourceName("TestDB");
+    message.setTgtName("localhost_12918");
+    message.setStateModelDef("MasterSlave");
+    
message.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+
+  }
+
+  @Test(dependsOnMethods = "testDisableErrorLogByDefault")
+  public void testEnableErrorLog() throws Exception {
+    StatusUpdateUtil statusUpdateUtil = new StatusUpdateUtil();
+    setFinalStatic(StatusUpdateUtil.class.getField("ERROR_LOG_TO_ZK_ENABLED"), 
true);
+
+    Exception e = new RuntimeException("test exception");
+    statusUpdateUtil.logError(message, HelixStateTransitionHandler.class, e,
+        "test status update", participants[0]);
+    // logged to Zookeeper
+    String errPath = PropertyPathBuilder
+        .instanceError(clusterName, "localhost_12918", 
participants[0].getSessionId(), "TestDB",
+            "TestDB_0");
+
+    try {
+      ZNRecord error = _gZkClient.readData(errPath);
+    } catch (ZkException zke) {
+      Assert.fail("expecting being able to send error logs to ZK.", zke);
+    }
+  }
+
+  @Test
+  public void testDisableErrorLogByDefault() throws Exception {
+    StatusUpdateUtil statusUpdateUtil = new StatusUpdateUtil();
+    setFinalStatic(StatusUpdateUtil.class.getField("ERROR_LOG_TO_ZK_ENABLED"), 
false);
+
+    Exception e = new RuntimeException("test exception");
+    statusUpdateUtil.logError(message, HelixStateTransitionHandler.class, e,
+        "test status update", participants[0]);
+
+    // assert by default, not logged to Zookeeper
+    String errPath = PropertyPathBuilder
+        .instanceError(clusterName, "localhost_12918", 
participants[0].getSessionId(), "TestDB",
+            "TestDB_0");
+    try {
+      ZNRecord error = _gZkClient.readData(errPath);
+      Assert.fail("not expecting being able to send error logs to ZK by 
default.");
+    } catch (ZkException zke) {
+      // expected
+    }
+  }
+}

Reply via email to