This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new dbe2831 Add PropertyStore write endpoint to Helix REST (#1049)
dbe2831 is described below
commit dbe283129c55429410b88d8ae37ab9e98322ee70
Author: Hunter Lee <[email protected]>
AuthorDate: Tue Jun 2 17:13:56 2020 -0700
Add PropertyStore write endpoint to Helix REST (#1049)
This commit adds in Helix REST a write endpoint that allows you to either
write byte array or a ZNRecord to any path under PropertyStore, which is a
directory in cluster metadata in ZK where applications can write custom data.
---
.../apache/helix/rest/server/ServerContext.java | 16 ++----
.../rest/server/resources/AbstractResource.java | 5 ++
.../resources/helix/PropertyStoreAccessor.java | 60 +++++++++++++++++++++-
.../rest/server/TestPropertyStoreAccessor.java | 48 ++++++++++++++---
4 files changed, 107 insertions(+), 22 deletions(-)
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
index 86342f7..2449b0d 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
@@ -31,6 +31,7 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.InstanceType;
import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.manager.zk.ByteArraySerializer;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -231,19 +232,8 @@ public class ServerContext implements IZkDataListener,
IZkChildListener, IZkStat
if (_byteArrayZkBaseDataAccessor == null) {
synchronized (this) {
if (_byteArrayZkBaseDataAccessor == null) {
-
- _byteArrayZkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkAddr, new
ZkSerializer() {
- @Override
- public byte[] serialize(Object o) throws ZkMarshallingError {
- // TODO: Support serialize for write methods if necessary
- throw new UnsupportedOperationException("serialize() is not
supported.");
- }
-
- @Override
- public Object deserialize(byte[] bytes) throws ZkMarshallingError {
- return bytes;
- }
- });
+ _byteArrayZkBaseDataAccessor =
+ new ZkBaseDataAccessor<>(_zkAddr, new ByteArraySerializer());
}
}
}
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
index 1b9111e..51cac9d 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
@@ -98,6 +98,11 @@ public class AbstractResource {
return Response.serverError().build();
}
+ protected Response serverError(String errorMsg) {
+ return
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errorMsgToJson(errorMsg))
+ .build();
+ }
+
protected Response serverError(Exception ex) {
addExceptionToAuditLog(ex);
return
Response.serverError().entity(errorMsgToJson(ex.getMessage())).build();
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PropertyStoreAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PropertyStoreAccessor.java
index 11226e5..af9efac 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PropertyStoreAccessor.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PropertyStoreAccessor.java
@@ -19,18 +19,23 @@ package org.apache.helix.rest.server.resources.helix;
* under the License.
*/
+import java.io.IOException;
+import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
+import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.msdcommon.util.ZkValidationUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.codehaus.jackson.node.ObjectNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,4 +83,55 @@ public class PropertyStoreAccessor extends
AbstractHelixResource {
.entity(String.format("The property store path %s doesn't exist",
recordPath)).build());
}
}
+
+ /**
+ * Sample HTTP URLs:
+ * http://<HOST>/clusters/{clusterId}/propertyStore/<PATH>
+ * It refers to the /PROPERTYSTORE/<PATH> in Helix metadata store
+ * @param clusterId The cluster Id
+ * @param path path parameter is like "abc/abc/abc" in the URL
+ * @param isZNRecord true if the content represents a ZNRecord. false means
byte array.
+ * @param content
+ * @return Response
+ */
+ @PUT
+ @Path("{path: .+}")
+ public Response putPropertyByPath(@PathParam("clusterId") String clusterId,
+ @PathParam("path") String path,
+ @QueryParam("isZNRecord") @DefaultValue("true") String isZNRecord,
String content) {
+ path = "/" + path;
+ if (!ZkValidationUtil.isPathValid(path)) {
+ LOG.info("The propertyStore path {} is invalid for cluster {}", path,
clusterId);
+ return badRequest(
+ "Invalid path string. Valid path strings use slash as the directory
separator and names the location of ZNode");
+ }
+ final String recordPath = PropertyPathBuilder.propertyStore(clusterId) +
path;
+ try {
+ if (Boolean.parseBoolean(isZNRecord)) {
+ try {
+ ZNRecord record = toZNRecord(content);
+ BaseDataAccessor<ZNRecord> propertyStoreDataAccessor =
+ getDataAccssor(clusterId).getBaseDataAccessor();
+ if (!propertyStoreDataAccessor.set(recordPath, record,
AccessOption.PERSISTENT)) {
+ return serverError(
+ "Failed to set content: " + content + " in PropertyStore path:
" + path);
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to deserialize content " + content + " into a
ZNRecord!", e);
+ return badRequest(
+ "Failed to write to path: " + recordPath + "! Content is not a
valid ZNRecord!");
+ }
+ } else {
+ ZkBaseDataAccessor<byte[]> propertyStoreDataAccessor =
getByteArrayDataAccessor();
+ if (!propertyStoreDataAccessor
+ .set(recordPath, content.getBytes(), AccessOption.PERSISTENT)) {
+ return serverError(
+ "Failed to set content: " + content + " in PropertyStore path: "
+ path);
+ }
+ }
+ return OK();
+ } catch (Exception e) {
+ return serverError(e);
+ }
+ }
}
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPropertyStoreAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPropertyStoreAccessor.java
index fc0a701..f1954d7 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPropertyStoreAccessor.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPropertyStoreAccessor.java
@@ -20,13 +20,18 @@ package org.apache.helix.rest.server;
*/
import java.io.IOException;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import com.google.common.collect.ImmutableMap;
import org.apache.helix.AccessOption;
import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.TestHelper;
+import org.apache.helix.manager.zk.ByteArraySerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.rest.server.util.JerseyUriRequestBuilder;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.apache.http.HttpStatus;
@@ -53,14 +58,12 @@ public class TestPropertyStoreAccessor extends
AbstractTestClass {
public void init() {
_customDataAccessor = new ZkBaseDataAccessor<>(ZK_ADDR, new ZkSerializer()
{
@Override
- public byte[] serialize(Object o)
- throws ZkMarshallingError {
+ public byte[] serialize(Object o) throws ZkMarshallingError {
return o.toString().getBytes();
}
@Override
- public Object deserialize(byte[] bytes)
- throws ZkMarshallingError {
+ public Object deserialize(byte[] bytes) throws ZkMarshallingError {
return new String(bytes);
}
});
@@ -78,8 +81,7 @@ public class TestPropertyStoreAccessor extends
AbstractTestClass {
}
@Test
- public void testGetPropertyStoreWithZNRecordData()
- throws IOException {
+ public void testGetPropertyStoreWithZNRecordData() throws IOException {
String data =
new
JerseyUriRequestBuilder("clusters/{}/propertyStore/ZnRecord").format(TEST_CLUSTER)
.isBodyReturnExpected(true).get(this);
@@ -114,4 +116,36 @@ public class TestPropertyStoreAccessor extends
AbstractTestClass {
.getResponse(this);
Assert.assertEquals(response.getStatus(), HttpStatus.SC_BAD_REQUEST);
}
+
+ @Test
+ public void testPutPropertyStore() throws IOException {
+ String path = "/writePath/content";
+
+ // First, try to write byte array
+ String content = TestHelper.getTestMethodName();
+ put("clusters/" + TEST_CLUSTER + "/propertyStore" + path,
+ ImmutableMap.of("isZNRecord", "false"),
+ Entity.entity(OBJECT_MAPPER.writeValueAsBytes(content),
MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.OK.getStatusCode());
+
+ // Verify
+ ZkBaseDataAccessor<byte[]> byteAccessor =
+ new ZkBaseDataAccessor(ZK_ADDR, new ByteArraySerializer());
+ byte[] data = byteAccessor
+ .get(PropertyPathBuilder.propertyStore(TEST_CLUSTER) + path, null,
AccessOption.PERSISTENT);
+ byteAccessor.close();
+ Assert.assertEquals(content, OBJECT_MAPPER.readValue(data, String.class));
+
+ // Second, try to write a ZNRecord
+ ZNRecord contentRecord = new ZNRecord(TestHelper.getTestMethodName());
+ contentRecord.setSimpleField("testField", TestHelper.getTestMethodName());
+ put("clusters/" + TEST_CLUSTER + "/propertyStore" + path, null, Entity
+ .entity(OBJECT_MAPPER.writeValueAsBytes(contentRecord),
MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.OK.getStatusCode());
+
+ // Verify
+ ZNRecord record = _baseAccessor
+ .get(PropertyPathBuilder.propertyStore(TEST_CLUSTER) + path, null,
AccessOption.PERSISTENT);
+ Assert.assertEquals(contentRecord, record);
+ }
}