This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new dbe2831  Add PropertyStore write endpoint to Helix REST (#1049)
dbe2831 is described below

commit dbe283129c55429410b88d8ae37ab9e98322ee70
Author: Hunter Lee <[email protected]>
AuthorDate: Tue Jun 2 17:13:56 2020 -0700

    Add PropertyStore write endpoint to Helix REST (#1049)
    
    This commit adds in Helix REST a write endpoint that allows you to either 
write byte array or a ZNRecord to any path under PropertyStore, which is a 
directory in cluster metadata in ZK where applications can write custom data.
---
 .../apache/helix/rest/server/ServerContext.java    | 16 ++----
 .../rest/server/resources/AbstractResource.java    |  5 ++
 .../resources/helix/PropertyStoreAccessor.java     | 60 +++++++++++++++++++++-
 .../rest/server/TestPropertyStoreAccessor.java     | 48 ++++++++++++++---
 4 files changed, 107 insertions(+), 22 deletions(-)

diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java 
b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
index 86342f7..2449b0d 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
@@ -31,6 +31,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.InstanceType;
 import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.manager.zk.ByteArraySerializer;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -231,19 +232,8 @@ public class ServerContext implements IZkDataListener, 
IZkChildListener, IZkStat
     if (_byteArrayZkBaseDataAccessor == null) {
       synchronized (this) {
         if (_byteArrayZkBaseDataAccessor == null) {
-
-          _byteArrayZkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkAddr, new 
ZkSerializer() {
-            @Override
-            public byte[] serialize(Object o) throws ZkMarshallingError {
-              // TODO: Support serialize for write methods if necessary
-              throw new UnsupportedOperationException("serialize() is not 
supported.");
-            }
-
-            @Override
-            public Object deserialize(byte[] bytes) throws ZkMarshallingError {
-              return bytes;
-            }
-          });
+          _byteArrayZkBaseDataAccessor =
+              new ZkBaseDataAccessor<>(_zkAddr, new ByteArraySerializer());
         }
       }
     }
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
index 1b9111e..51cac9d 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
@@ -98,6 +98,11 @@ public class AbstractResource {
     return Response.serverError().build();
   }
 
+  protected Response serverError(String errorMsg) {
+    return 
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errorMsgToJson(errorMsg))
+        .build();
+  }
+
   protected Response serverError(Exception ex) {
     addExceptionToAuditLog(ex);
     return 
Response.serverError().entity(errorMsgToJson(ex.getMessage())).build();
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PropertyStoreAccessor.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PropertyStoreAccessor.java
index 11226e5..af9efac 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PropertyStoreAccessor.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PropertyStoreAccessor.java
@@ -19,18 +19,23 @@ package org.apache.helix.rest.server.resources.helix;
  * under the License.
  */
 
+import java.io.IOException;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
+import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
 
 import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.msdcommon.util.ZkValidationUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.codehaus.jackson.node.ObjectNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,4 +83,55 @@ public class PropertyStoreAccessor extends 
AbstractHelixResource {
           .entity(String.format("The property store path %s doesn't exist", 
recordPath)).build());
     }
   }
+
+  /**
+   * Sample HTTP URLs:
+   *  http://<HOST>/clusters/{clusterId}/propertyStore/<PATH>
+   * It refers to the /PROPERTYSTORE/<PATH> in Helix metadata store
+   * @param clusterId The cluster Id
+   * @param path path parameter is like "abc/abc/abc" in the URL
+   * @param isZNRecord true if the content represents a ZNRecord. false means 
byte array.
+   * @param content
+   * @return Response
+   */
+  @PUT
+  @Path("{path: .+}")
+  public Response putPropertyByPath(@PathParam("clusterId") String clusterId,
+      @PathParam("path") String path,
+      @QueryParam("isZNRecord") @DefaultValue("true") String isZNRecord, 
String content) {
+    path = "/" + path;
+    if (!ZkValidationUtil.isPathValid(path)) {
+      LOG.info("The propertyStore path {} is invalid for cluster {}", path, 
clusterId);
+      return badRequest(
+          "Invalid path string. Valid path strings use slash as the directory 
separator and names the location of ZNode");
+    }
+    final String recordPath = PropertyPathBuilder.propertyStore(clusterId) + 
path;
+    try {
+      if (Boolean.parseBoolean(isZNRecord)) {
+        try {
+          ZNRecord record = toZNRecord(content);
+          BaseDataAccessor<ZNRecord> propertyStoreDataAccessor =
+              getDataAccssor(clusterId).getBaseDataAccessor();
+          if (!propertyStoreDataAccessor.set(recordPath, record, 
AccessOption.PERSISTENT)) {
+            return serverError(
+                "Failed to set content: " + content + " in PropertyStore path: 
" + path);
+          }
+        } catch (IOException e) {
+          LOG.error("Failed to deserialize content " + content + " into a 
ZNRecord!", e);
+          return badRequest(
+              "Failed to write to path: " + recordPath + "! Content is not a 
valid ZNRecord!");
+        }
+      } else {
+        ZkBaseDataAccessor<byte[]> propertyStoreDataAccessor = 
getByteArrayDataAccessor();
+        if (!propertyStoreDataAccessor
+            .set(recordPath, content.getBytes(), AccessOption.PERSISTENT)) {
+          return serverError(
+              "Failed to set content: " + content + " in PropertyStore path: " 
+ path);
+        }
+      }
+      return OK();
+    } catch (Exception e) {
+      return serverError(e);
+    }
+  }
 }
diff --git 
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPropertyStoreAccessor.java
 
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPropertyStoreAccessor.java
index fc0a701..f1954d7 100644
--- 
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPropertyStoreAccessor.java
+++ 
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPropertyStoreAccessor.java
@@ -20,13 +20,18 @@ package org.apache.helix.rest.server;
  */
 
 import java.io.IOException;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.helix.AccessOption;
 import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.TestHelper;
+import org.apache.helix.manager.zk.ByteArraySerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.rest.server.util.JerseyUriRequestBuilder;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.apache.http.HttpStatus;
@@ -53,14 +58,12 @@ public class TestPropertyStoreAccessor extends 
AbstractTestClass {
   public void init() {
     _customDataAccessor = new ZkBaseDataAccessor<>(ZK_ADDR, new ZkSerializer() 
{
       @Override
-      public byte[] serialize(Object o)
-          throws ZkMarshallingError {
+      public byte[] serialize(Object o) throws ZkMarshallingError {
         return o.toString().getBytes();
       }
 
       @Override
-      public Object deserialize(byte[] bytes)
-          throws ZkMarshallingError {
+      public Object deserialize(byte[] bytes) throws ZkMarshallingError {
         return new String(bytes);
       }
     });
@@ -78,8 +81,7 @@ public class TestPropertyStoreAccessor extends 
AbstractTestClass {
   }
 
   @Test
-  public void testGetPropertyStoreWithZNRecordData()
-      throws IOException {
+  public void testGetPropertyStoreWithZNRecordData() throws IOException {
     String data =
         new 
JerseyUriRequestBuilder("clusters/{}/propertyStore/ZnRecord").format(TEST_CLUSTER)
             .isBodyReturnExpected(true).get(this);
@@ -114,4 +116,36 @@ public class TestPropertyStoreAccessor extends 
AbstractTestClass {
             .getResponse(this);
     Assert.assertEquals(response.getStatus(), HttpStatus.SC_BAD_REQUEST);
   }
+
+  @Test
+  public void testPutPropertyStore() throws IOException {
+    String path = "/writePath/content";
+
+    // First, try to write byte array
+    String content = TestHelper.getTestMethodName();
+    put("clusters/" + TEST_CLUSTER + "/propertyStore" + path,
+        ImmutableMap.of("isZNRecord", "false"),
+        Entity.entity(OBJECT_MAPPER.writeValueAsBytes(content), 
MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.OK.getStatusCode());
+
+    // Verify
+    ZkBaseDataAccessor<byte[]> byteAccessor =
+        new ZkBaseDataAccessor(ZK_ADDR, new ByteArraySerializer());
+    byte[] data = byteAccessor
+        .get(PropertyPathBuilder.propertyStore(TEST_CLUSTER) + path, null, 
AccessOption.PERSISTENT);
+    byteAccessor.close();
+    Assert.assertEquals(content, OBJECT_MAPPER.readValue(data, String.class));
+
+    // Second, try to write a ZNRecord
+    ZNRecord contentRecord = new ZNRecord(TestHelper.getTestMethodName());
+    contentRecord.setSimpleField("testField", TestHelper.getTestMethodName());
+    put("clusters/" + TEST_CLUSTER + "/propertyStore" + path, null, Entity
+            .entity(OBJECT_MAPPER.writeValueAsBytes(contentRecord), 
MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.OK.getStatusCode());
+
+    // Verify
+    ZNRecord record = _baseAccessor
+        .get(PropertyPathBuilder.propertyStore(TEST_CLUSTER) + path, null, 
AccessOption.PERSISTENT);
+    Assert.assertEquals(contentRecord, record);
+  }
 }

Reply via email to