This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 80a8cdb Implement the propertyStore read endpoint (#516)
80a8cdb is described below
commit 80a8cdb56a3294beddf22d2b3933838c6027f37b
Author: Yi Wang <[email protected]>
AuthorDate: Mon Dec 2 10:52:41 2019 -0800
Implement the propertyStore read endpoint (#516)
Implement the propertyStore read endpoint
Example: http://xxxx/clusters/TestCluster/propertyStore/*<PATH>*
- The read method accepts a simple path parameter called "path" of the
format "abc/abc/abc"
- The path is validated using regex
- The read operation parses the byte array content from a static bytearray
zk base data accessor, if it's ZNRecord format, return ZnRecord; Otherwise,
return {"content": <binary payload>}
---
.../apache/helix/rest/server/ServerContext.java | 37 ++++++-
.../resources/helix/AbstractHelixResource.java | 13 ++-
.../resources/helix/PropertyStoreAccessor.java | 94 +++++++++++++++++
.../rest/server/TestPropertyStoreAccessor.java | 117 +++++++++++++++++++++
.../rest/server/util/JerseyUriRequestBuilder.java | 4 +
5 files changed, 257 insertions(+), 8 deletions(-)
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
index a9ab882..cfb4737 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
@@ -23,6 +23,8 @@ package org.apache.helix.rest.server;
import java.util.HashMap;
import java.util.Map;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
@@ -38,16 +40,17 @@ import
org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.tools.ClusterSetup;
+
public class ServerContext {
private final String _zkAddr;
private HelixZkClient _zkClient;
private ZKHelixAdmin _zkHelixAdmin;
private ClusterSetup _clusterSetup;
private ConfigAccessor _configAccessor;
-
+ // The lazy initialized base data accessor that reads/writes byte array to ZK
+ private ZkBaseDataAccessor<byte[]> _byteArrayBaseDataAccessor;
// 1 Cluster name will correspond to 1 helix data accessor
private final Map<String, HelixDataAccessor> _helixDataAccessorPool;
-
// 1 Cluster name will correspond to 1 task driver
private final Map<String, TaskDriver> _taskDriverPool;
@@ -66,8 +69,8 @@ public class ServerContext {
if (_zkClient == null) {
HelixZkClient.ZkClientConfig clientConfig = new
HelixZkClient.ZkClientConfig();
clientConfig.setZkSerializer(new ZNRecordSerializer());
- _zkClient = SharedZkClientFactory
- .getInstance().buildZkClient(new
HelixZkClient.ZkConnectionConfig(_zkAddr), clientConfig);
+ _zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr),
clientConfig);
}
return _zkClient;
}
@@ -110,7 +113,8 @@ public class ServerContext {
public HelixDataAccessor getDataAccssor(String clusterName) {
synchronized (_helixDataAccessorPool) {
if (!_helixDataAccessorPool.containsKey(clusterName)) {
- ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new
ZkBaseDataAccessor<>(getHelixZkClient());
+ ZkBaseDataAccessor<ZNRecord> baseDataAccessor =
+ new ZkBaseDataAccessor<>(getHelixZkClient());
_helixDataAccessorPool.put(clusterName,
new ZKHelixDataAccessor(clusterName, InstanceType.ADMINISTRATOR,
baseDataAccessor));
}
@@ -118,6 +122,29 @@ public class ServerContext {
}
}
+ public ZkBaseDataAccessor<byte[]> getByteArrayBaseDataAccessor() {
+ if (_byteArrayBaseDataAccessor == null) {
+ synchronized (this) {
+ if (_byteArrayBaseDataAccessor == null) {
+ _byteArrayBaseDataAccessor = new ZkBaseDataAccessor<>(_zkAddr, new
ZkSerializer() {
+ @Override
+ public byte[] serialize(Object o)
+ throws ZkMarshallingError {
+ throw new UnsupportedOperationException("Serialize is not
supported yet!");
+ }
+
+ @Override
+ public Object deserialize(byte[] bytes)
+ throws ZkMarshallingError {
+ return bytes;
+ }
+ });
+ }
+ }
+ }
+ return _byteArrayBaseDataAccessor;
+ }
+
public void close() {
if (_zkClient != null) {
_zkClient.close();
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
index e5694d0..7b00a1d 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
@@ -25,6 +25,7 @@ import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.rest.common.ContextPropertyKeys;
@@ -39,7 +40,7 @@ import org.apache.helix.tools.ClusterSetup;
* such as cluster, instance, job, resource, workflow, etc in
* metadata store.
*/
-public class AbstractHelixResource extends AbstractResource{
+public class AbstractHelixResource extends AbstractResource {
public HelixZkClient getHelixZkClient() {
ServerContext serverContext = getServerContext();
@@ -76,11 +77,17 @@ public class AbstractHelixResource extends AbstractResource{
return serverContext.getDataAccssor(clusterName);
}
- protected static ZNRecord toZNRecord(String data) throws IOException {
+ protected ZkBaseDataAccessor<byte[]> getByteArrayDataAccessor() {
+ return getServerContext().getByteArrayBaseDataAccessor();
+ }
+
+ protected static ZNRecord toZNRecord(String data)
+ throws IOException {
return OBJECT_MAPPER.reader(ZNRecord.class).readValue(data);
}
private ServerContext getServerContext() {
- return (ServerContext)
_application.getProperties().get(ContextPropertyKeys.SERVER_CONTEXT.name());
+ return (ServerContext) _application.getProperties()
+ .get(ContextPropertyKeys.SERVER_CONTEXT.name());
}
}
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PropertyStoreAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PropertyStoreAccessor.java
new file mode 100644
index 0000000..1928388
--- /dev/null
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PropertyStoreAccessor.java
@@ -0,0 +1,94 @@
+package org.apache.helix.rest.server.resources.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.codehaus.jackson.node.ObjectNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Path("/clusters/{clusterId}/propertyStore")
+public class PropertyStoreAccessor extends AbstractHelixResource {
+ private static Logger LOG =
LoggerFactory.getLogger(PropertyStoreAccessor.class);
+ private static final String CONTENT_KEY = "content";
+ private static final ZNRecordSerializer ZN_RECORD_SERIALIZER = new
ZNRecordSerializer();
+
+ /**
+ * Sample HTTP URLs:
+ * http://<HOST>/clusters/{clusterId}/propertyStore/<PATH>
+ * It refers to the /PROPERTYSTORE/<PATH> in Helix metadata store
+ * @param clusterId The cluster Id
+ * @param path path parameter is like "abc/abc/abc" in the URL
+ * @return If the payload is ZNRecord format, return ZnRecord json response;
+ * Otherwise, return json object {<PATH>: raw string}
+ */
+ @GET
+ @Path("{path: .+}")
+ public Response getPropertyByPath(@PathParam("clusterId") String clusterId,
+ @PathParam("path") String path) {
+ path = "/" + path;
+ if (!isPathValid(path)) {
+ LOG.info("The propertyStore path {} is invalid for cluster {}", path,
clusterId);
+ return badRequest(
+ "Invalid path string. Valid path strings use slash as the directory
separator and names the location of ZNode");
+ }
+ final String recordPath = PropertyPathBuilder.propertyStore(clusterId) +
path;
+ ZkBaseDataAccessor<byte[]> propertyStoreDataAccessor =
getByteArrayDataAccessor();
+ if (propertyStoreDataAccessor.exists(recordPath, AccessOption.PERSISTENT))
{
+ byte[] bytes = propertyStoreDataAccessor.get(recordPath, null,
AccessOption.PERSISTENT);
+ ZNRecord znRecord = (ZNRecord) ZN_RECORD_SERIALIZER.deserialize(bytes);
+ // The ZNRecordSerializer returns null when exception occurs in
deserialization method
+ if (znRecord == null) {
+ ObjectNode jsonNode = OBJECT_MAPPER.createObjectNode();
+ jsonNode.put(CONTENT_KEY, new String(bytes));
+ return JSONRepresentation(jsonNode);
+ }
+ return JSONRepresentation(znRecord);
+ } else {
+ throw new
WebApplicationException(Response.status(Response.Status.NOT_FOUND)
+ .entity(String.format("The property store path %s doesn't exist",
recordPath)).build());
+ }
+ }
+
+ /**
+ * Valid matches:
+ * /
+ * /abc
+ * /abc/abc/abc/abc
+ * Invalid matches:
+ * null or empty string
+ * /abc/
+ * /abc/abc/abc/abc/
+ **/
+ private static boolean isPathValid(String path) {
+ return path.matches("^/|(/[\\w-]+)+$");
+ }
+}
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPropertyStoreAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPropertyStoreAccessor.java
new file mode 100644
index 0000000..0bff205
--- /dev/null
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPropertyStoreAccessor.java
@@ -0,0 +1,117 @@
+package org.apache.helix.rest.server;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import javax.ws.rs.core.Response;
+
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.rest.server.util.JerseyUriRequestBuilder;
+import org.apache.http.HttpStatus;
+import org.codehaus.jackson.JsonNode;
+import org.junit.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestPropertyStoreAccessor extends AbstractTestClass {
+ private static final String TEST_CLUSTER = "TestCluster_0";
+ private static final String ZNRECORD_PATH =
+ PropertyPathBuilder.propertyStore(TEST_CLUSTER) + "/ZnRecord";
+ private static final ZNRecord TEST_ZNRECORD = new ZNRecord("TestContent");
+ private static final String CUSTOM_PATH =
+ PropertyPathBuilder.propertyStore(TEST_CLUSTER) + "/NonZnRecord";
+ private static final String TEST_CONTENT = "TestContent";
+ private static final String CONTENT_KEY = "content";
+
+ private ZkBaseDataAccessor<String> _customDataAccessor;
+
+ @BeforeClass
+ public void init() {
+ _customDataAccessor = new ZkBaseDataAccessor<>(ZK_ADDR, new ZkSerializer()
{
+ @Override
+ public byte[] serialize(Object o)
+ throws ZkMarshallingError {
+ return o.toString().getBytes();
+ }
+
+ @Override
+ public Object deserialize(byte[] bytes)
+ throws ZkMarshallingError {
+ return new String(bytes);
+ }
+ });
+ // initially prepare the datas in different paths
+ Assert
+ .assertTrue(_customDataAccessor.create(CUSTOM_PATH, TEST_CONTENT,
AccessOption.PERSISTENT));
+ Assert.assertTrue(_baseAccessor.create(ZNRECORD_PATH, TEST_ZNRECORD,
AccessOption.PERSISTENT));
+ }
+
+ @AfterClass
+ public void close() {
+ if (_customDataAccessor != null) {
+ _customDataAccessor.close();
+ }
+ }
+
+ @Test
+ public void testGetPropertyStoreWithZNRecordData()
+ throws IOException {
+ String data =
+ new
JerseyUriRequestBuilder("clusters/{}/propertyStore/ZnRecord").format(TEST_CLUSTER)
+ .isBodyReturnExpected(true).get(this);
+ ZNRecord record = OBJECT_MAPPER.reader(ZNRecord.class).readValue(data);
+ Assert.assertEquals(record.getId(), TEST_ZNRECORD.getId());
+ }
+
+ @Test
+ public void testGetPropertyStoreWithTestStringData() throws IOException {
+ String actual =
+ new
JerseyUriRequestBuilder("clusters/{}/propertyStore/NonZnRecord").format(TEST_CLUSTER)
+ .isBodyReturnExpected(true).get(this);
+ JsonNode jsonNode = OBJECT_MAPPER.readTree(actual);
+ String payLoad = jsonNode.get(CONTENT_KEY).getValueAsText();
+
+ Assert.assertEquals(TEST_CONTENT, payLoad);
+ }
+
+ @Test
+ public void testGetPropertyStoreWithEmptyDataPath() {
+ Response response =
+ new
JerseyUriRequestBuilder("clusters/{}/propertyStore/EmptyPath").format(TEST_CLUSTER)
+ .isBodyReturnExpected(true).getResponse(this);
+ Assert.assertEquals(response.getStatus(), HttpStatus.SC_NOT_FOUND);
+ }
+
+ @Test
+ public void testGetPropertyStoreWithInValidPath() {
+ String path = "/context/";
+ Response response =
+ new JerseyUriRequestBuilder("clusters/{}/propertyStore" +
path).format(TEST_CLUSTER)
+ .getResponse(this);
+ Assert.assertEquals(response.getStatus(), HttpStatus.SC_BAD_REQUEST);
+ }
+}
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/util/JerseyUriRequestBuilder.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/util/JerseyUriRequestBuilder.java
index c97710b..359999e 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/util/JerseyUriRequestBuilder.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/util/JerseyUriRequestBuilder.java
@@ -90,6 +90,10 @@ public class JerseyUriRequestBuilder {
return body;
}
+ public Response getResponse(JerseyTestNg.ContainerPerClassTest container) {
+ return buildWebTarget(container).request().get();
+ }
+
/**
* Execute put request
* @param container