KNOX-1041 - High Availability Support For Apache SOLR, HBase & Kafka (Rick 
Kellogg via Sandeep More)


Project: http://git-wip-us.apache.org/repos/asf/knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/a08aaf74
Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/a08aaf74
Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/a08aaf74

Branch: refs/heads/KNOX-998-Package_Restructuring
Commit: a08aaf742a97a3c35c94e28406fc4b6ef3184005
Parents: aa62fa2
Author: Sandeep More <m...@apache.org>
Authored: Fri Oct 20 10:38:34 2017 -0400
Committer: Sandeep More <m...@apache.org>
Committed: Fri Oct 20 10:38:34 2017 -0400

----------------------------------------------------------------------
 .../provider/impl/BaseZookeeperURLManager.java  | 195 +++++++++++++++++++
 .../provider/impl/HBaseZookeeperURLManager.java | 138 +++++++++++++
 .../provider/impl/KafkaZookeeperURLManager.java | 152 +++++++++++++++
 .../provider/impl/SOLRZookeeperURLManager.java  | 118 +++++++++++
 .../ha/provider/impl/StringResponseHandler.java |  49 +++++
 ...apache.hadoop.gateway.ha.provider.URLManager |   5 +-
 .../impl/HBaseZookeeperURLManagerTest.java      |  72 +++++++
 .../impl/KafkaZookeeperURLManagerTest.java      |  71 +++++++
 .../impl/SOLRZookeeperURLManagerTest.java       | 110 +++++++++++
 9 files changed, 909 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/BaseZookeeperURLManager.java
----------------------------------------------------------------------
diff --git 
a/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/BaseZookeeperURLManager.java
 
b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/BaseZookeeperURLManager.java
new file mode 100644
index 0000000..0b16144
--- /dev/null
+++ 
b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/BaseZookeeperURLManager.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.ha.provider.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.gateway.ha.provider.HaServiceConfig;
+import org.apache.hadoop.gateway.ha.provider.URLManager;
+import org.apache.hadoop.gateway.ha.provider.impl.i18n.HaMessages;
+import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Base implementation of URLManager intended for query of Zookeeper active 
hosts. In
+ * the event of a failure via markFailed, Zookeeper is queried again for active
+ * host information.
+ * 
+ * When configuring the HAProvider in the topology, the zookeeperEnsemble 
attribute must be set to a
+ * comma delimited list of the host and port number, i.e. 
host1:2181,host2:2181. 
+ */
+public abstract class BaseZookeeperURLManager implements URLManager {
+       protected static final HaMessages LOG = 
MessagesFactory.get(HaMessages.class);
+       /**
+        * Host Ping Timeout
+        */
+       private static final int TIMEOUT = 2000;
+
+       private String zooKeeperEnsemble;
+       private ConcurrentLinkedQueue<String> urls = new 
ConcurrentLinkedQueue<String>();
+
+       // 
-------------------------------------------------------------------------------------
+       // URLManager interface methods
+       // 
-------------------------------------------------------------------------------------
+
+       @Override
+       public boolean supportsConfig(HaServiceConfig config) {
+               if 
(!config.getServiceName().equalsIgnoreCase(getServiceName())) {
+                       return false;
+               }
+               
+               String zookeeperEnsemble = config.getZookeeperEnsemble();
+               if (zookeeperEnsemble != null && 
zookeeperEnsemble.trim().length() > 0) {
+                       return true;
+               }
+               
+               return false;
+       }
+
+       @Override
+       public void setConfig(HaServiceConfig config) {
+               zooKeeperEnsemble = config.getZookeeperEnsemble();
+               setURLs(lookupURLs());
+       }
+
+       @Override
+       public synchronized String getActiveURL() {
+               // None available so refresh
+               if (urls.isEmpty()) {
+                       setURLs(lookupURLs());
+               }
+
+               return this.urls.peek();
+       }
+
+       @Override
+       public synchronized void setActiveURL(String url) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public synchronized List<String> getURLs() {
+               return Lists.newArrayList(this.urls.iterator());
+       }
+
+       @Override
+       public synchronized void markFailed(String url) {
+               // Capture complete URL of active host
+               String topURL = getActiveURL();
+
+               // Refresh URLs from ZooKeeper
+               setURLs(lookupURLs());
+
+               // Show failed URL and new URL
+               LOG.markedFailedUrl(topURL, getActiveURL());
+       }
+
+       @Override
+       public synchronized void setURLs(List<String> urls) {
+               if ((urls != null) && (!(urls.isEmpty()))) {
+                       this.urls.clear();
+                       this.urls.addAll(urls);
+               }
+       }
+
+       // 
-------------------------------------------------------------------------------------
+       // Abstract methods
+       // 
-------------------------------------------------------------------------------------
+
+       /**
+        * Look within Zookeeper under the /live_nodes branch for active hosts
+        * 
+        * @return A List of URLs (never null)
+        */
+       protected abstract List<String> lookupURLs();
+
+       /**
+        * @return The name of the Knox Topology Service to support
+        */
+       protected abstract String getServiceName();
+
+       // 
-------------------------------------------------------------------------------------
+       // Protected methods
+       // 
-------------------------------------------------------------------------------------
+
+       protected String getZookeeperEnsemble() {
+               return zooKeeperEnsemble;
+       }
+       
+       /**
+        * Validate access to hosts using simple light weight ping style REST 
call.
+        * 
+        * @param hosts List of hosts to evaluate (required)
+        * @param suffix Text to append to host (required) 
+        * @param acceptHeader Used for Accept header (optional)
+        * 
+        * @return Hosts with successful access
+        */
+       protected List<String> validateHosts(List<String> hosts, String suffix, 
String acceptHeader) {
+               List<String> result = new ArrayList<String>();
+               
+               CloseableHttpClient client = null;
+               
+               try {
+                       // Construct a HttpClient with short term timeout
+                       RequestConfig.Builder requestBuilder = 
RequestConfig.custom()
+                                       .setConnectTimeout(TIMEOUT)
+                                       .setSocketTimeout(TIMEOUT)
+                                       .setConnectionRequestTimeout(TIMEOUT);
+
+                       client = HttpClientBuilder.create()
+                                       
.setDefaultRequestConfig(requestBuilder.build())
+                                       .build();
+                       
+                       for(String host: hosts) {
+                               try     {
+                                       HttpGet get = new HttpGet(host + 
suffix);
+                                       
+                                       if (acceptHeader != null) {
+                                               get.setHeader("Accept", 
acceptHeader);
+                                       }
+                                       
+                                       String response = client.execute(get, 
new StringResponseHandler());
+                                       
+                                       if (response != null) {
+                                               result.add(host);
+                                       }
+                               }
+                               catch (Exception ex) {
+                                       // ignore host
+                               }
+                       }
+               }
+               catch (Exception ex) {
+                       // Ignore errors
+               }
+               finally {
+                       IOUtils.closeQuietly(client);
+               }
+               
+               return result;
+       }
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManager.java
----------------------------------------------------------------------
diff --git 
a/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManager.java
 
b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManager.java
new file mode 100644
index 0000000..8a414c7
--- /dev/null
+++ 
b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManager.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.ha.provider.impl;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Implementation of URLManager intended for query of Zookeeper for active 
HBase RegionServer hosts.
+ *  
+ * The assumption is that the HBase REST Server will be installed on the same 
host.  For safety
+ * reasons, the REST Server is pinged for access before inclusion in the list 
of returned hosts.
+ * 
+ * In the event of a failure via markFailed, Zookeeper is queried again for 
active
+ * host information.
+ * 
+ * When configuring the HAProvider in the topology, the zookeeperEnsemble
+ * attribute must be set to a comma delimited list of the host and port number,
+ * i.e. host1:2181,host2:2181.
+ */
+public class HBaseZookeeperURLManager extends BaseZookeeperURLManager {
+       /**
+        * Default Port Number for HBase REST Server
+        */
+       private static final int PORT_NUMBER = 8080;
+       
+       private String zookeeperNamespace = "hbase-unsecure";
+       
+       // 
-------------------------------------------------------------------------------------
+       // Abstract methods
+       // 
-------------------------------------------------------------------------------------
+
+       /**
+        * Look within Zookeeper under the /hbase-unsecure/rs branch for active 
HBase RegionServer hosts
+        * 
+        * @return A List of URLs (never null)
+        */
+       @Override
+       protected List<String> lookupURLs() {
+               // Retrieve list of potential hosts from ZooKeeper
+               List<String> hosts = retrieveHosts();
+               
+               // Validate access to hosts using cheap ping style operation
+               List<String> validatedHosts = 
validateHosts(hosts,"/","text/xml");
+
+               // Randomize the hosts list for simple load balancing
+               if (!validatedHosts.isEmpty()) {
+                       Collections.shuffle(validatedHosts);
+               }
+
+               return validatedHosts;
+       }
+
+       protected String getServiceName() {
+               return "WEBHBASE";
+       };
+
+       // 
-------------------------------------------------------------------------------------
+       // Private methods
+       // 
-------------------------------------------------------------------------------------
+
+       /**
+        * @return Retrieve lists of hosts from ZooKeeper
+        */
+       private List<String> retrieveHosts()
+       {
+               List<String> serverHosts = new ArrayList<>();
+               
+               CuratorFramework zooKeeperClient = 
CuratorFrameworkFactory.builder()
+                               .connectString(getZookeeperEnsemble())
+                               .retryPolicy(new ExponentialBackoffRetry(1000, 
3))
+                               .build();
+               
+               try {
+                       zooKeeperClient.start();
+                       
+                       // Retrieve list of all region server hosts
+                       List<String> serverNodes = 
zooKeeperClient.getChildren().forPath("/" + zookeeperNamespace + "/rs");
+                       
+                       for (String serverNode : serverNodes) {
+                               String serverURL = constructURL(serverNode);
+                               serverHosts.add(serverURL);
+                       }
+               } catch (Exception e) {
+                       LOG.failedToGetZookeeperUrls(e);
+                       throw new RuntimeException(e);
+               } finally {
+                       // Close the client connection with ZooKeeper
+                       if (zooKeeperClient != null) {
+                               zooKeeperClient.close();
+                       }
+               }
+               
+               return serverHosts;
+       }
+       
+       /**
+        * Given a String of the format "host,number,number" convert to a URL 
of the format
+        * "http://host:port";.
+        * 
+        * @param serverInfo Server Info from Zookeeper (required)
+        * 
+        * @return URL to HBASE
+        */
+       private String constructURL(String serverInfo) {
+               String scheme = "http";
+
+               StringBuffer buffer = new StringBuffer();
+               buffer.append(scheme);
+               buffer.append("://");
+               // Strip off the host name 
+               buffer.append(serverInfo.substring(0,serverInfo.indexOf(",")));
+               buffer.append(":");
+               buffer.append(PORT_NUMBER);
+               
+               return buffer.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManager.java
----------------------------------------------------------------------
diff --git 
a/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManager.java
 
b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManager.java
new file mode 100644
index 0000000..c68c107
--- /dev/null
+++ 
b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManager.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.ha.provider.impl;
+
+import net.minidev.json.JSONObject;
+import net.minidev.json.parser.JSONParser;
+import net.minidev.json.parser.ParseException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Implementation of URLManager intended for query of Zookeeper for active 
Kafka hosts. 
+ * 
+ * The assumption is that the Confluent REST Proxy will be installed on the 
same host.  For safety
+ * reasons, the REST Server is pinged for access before inclusion in the list 
of returned hosts.
+ * 
+ * In the event of a failure via markFailed, Zookeeper is queried again for 
active
+ * host information.
+ * 
+ * When configuring the HAProvider in the topology, the zookeeperEnsemble
+ * attribute must be set to a comma delimited list of the host and port number,
+ * i.e. host1:2181,host2:2181.
+ */
+public class KafkaZookeeperURLManager extends BaseZookeeperURLManager {
+       /**
+        * Default Port Number for Confluent Kafka REST Server
+        */
+       private static final int PORT_NUMBER = 8082;
+       /**
+        * Base path for retrieval from Zookeeper
+        */
+       private static final String BASE_PATH = "/brokers/ids";
+       
+       // 
-------------------------------------------------------------------------------------
+       // Abstract methods
+       // 
-------------------------------------------------------------------------------------
+
+       /**
+        * Look within Zookeeper under the /broker/ids branch for active Kafka 
hosts
+        * 
+        * @return A List of URLs (never null)
+        */
+       @Override
+       protected List<String> lookupURLs() {
+               // Retrieve list of potential hosts from ZooKeeper
+               List<String> hosts = retrieveHosts();
+               
+               // Validate access to hosts using cheap ping style operation
+               List<String> validatedHosts = 
validateHosts(hosts,"/topics","application/vnd.kafka.v2+json");
+
+               // Randomize the hosts list for simple load balancing
+               if (!validatedHosts.isEmpty()) {
+                       Collections.shuffle(validatedHosts);
+               }
+
+               return validatedHosts;
+       }
+
+       protected String getServiceName() {
+               return "KAFKA";
+       };
+
+       // 
-------------------------------------------------------------------------------------
+       // Private methods
+       // 
-------------------------------------------------------------------------------------
+
+       /**
+        * @return Retrieve lists of hosts from ZooKeeper
+        */
+       private List<String> retrieveHosts()
+       {
+               List<String> serverHosts = new ArrayList<>();
+               
+               CuratorFramework zooKeeperClient = 
CuratorFrameworkFactory.builder()
+                               .connectString(getZookeeperEnsemble())
+                               .retryPolicy(new ExponentialBackoffRetry(1000, 
3))
+                               .build();
+               
+               try {
+                       zooKeeperClient.start();
+
+                       // Retrieve list of host URLs from ZooKeeper
+                       List<String> brokers = 
zooKeeperClient.getChildren().forPath(BASE_PATH);
+
+                       for (String broker : brokers) {
+                               String serverInfo = new 
String(zooKeeperClient.getData().forPath(BASE_PATH + "/" + broker), 
Charset.forName("UTF-8"));
+                               
+                               String serverURL = constructURL(serverInfo);
+                               serverHosts.add(serverURL);
+                       }
+               } catch (Exception e) {
+                       LOG.failedToGetZookeeperUrls(e);
+                       throw new RuntimeException(e);
+               } finally {
+                       // Close the client connection with ZooKeeper
+                       if (zooKeeperClient != null) {
+                               zooKeeperClient.close();
+                       }
+               }
+               
+               return serverHosts;
+       }
+       
+       /**
+        * Given a String of the format 
"{"jmx_port":-1,"timestamp":"1505763958072","endpoints":["PLAINTEXT://host:6667"],"host":"host","version":3,"port":6667}"
 
+        * convert to a URL of the format "http://host:port";.
+        * 
+        * @param serverInfo Server Info in JSON Format from Zookeeper 
(required)
+        * 
+        * @return URL to Kafka
+        * @throws ParseException 
+        */
+       private String constructURL(String serverInfo) throws ParseException {
+               String scheme = "http";
+
+               StringBuffer buffer = new StringBuffer();
+               
+               buffer.append(scheme);
+               buffer.append("://");
+               
+               JSONParser parser = new 
JSONParser(JSONParser.DEFAULT_PERMISSIVE_MODE);
+               JSONObject obj = (JSONObject) parser.parse(serverInfo);
+               buffer.append(obj.get("host"));
+               
+               buffer.append(":");
+               buffer.append(PORT_NUMBER);
+
+               return buffer.toString();
+       }       
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManager.java
----------------------------------------------------------------------
diff --git 
a/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManager.java
 
b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManager.java
new file mode 100644
index 0000000..f612e9b
--- /dev/null
+++ 
b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManager.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.ha.provider.impl;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Implementation of URLManager intended for query of Zookeeper for active 
SOLR Cloud hosts. 
+ * In the event of a failure via markFailed, Zookeeper is queried again for 
active
+ * host information.
+ * 
+ * When configuring the HAProvider in the topology, the zookeeperEnsemble
+ * attribute must be set to a comma delimited list of the host and port number,
+ * i.e. host1:2181,host2:2181.
+ */
+public class SOLRZookeeperURLManager extends BaseZookeeperURLManager {
+
+       // 
-------------------------------------------------------------------------------------
+       // Abstract methods
+       // 
-------------------------------------------------------------------------------------
+
+       /**
+        * Look within Zookeeper under the /live_nodes branch for active SOLR 
hosts
+        * 
+        * @return A List of URLs (never null)
+        */
+       @Override
+       protected List<String> lookupURLs() {
+               // Retrieve list of potential hosts from ZooKeeper
+               List<String> hosts = retrieveHosts();
+               
+               // Randomize the hosts list for simple load balancing
+               if (!hosts.isEmpty()) {
+                       Collections.shuffle(hosts);
+               }
+
+               return hosts;
+       }
+
+       protected String getServiceName() {
+               return "SOLR";
+       };
+
+       // 
-------------------------------------------------------------------------------------
+       // Private methods
+       // 
-------------------------------------------------------------------------------------
+
+       /**
+        * @return Retrieve lists of hosts from ZooKeeper
+        */
+       private List<String> retrieveHosts()
+       {
+               List<String> serverHosts = new ArrayList<>();
+               
+               CuratorFramework zooKeeperClient = 
CuratorFrameworkFactory.builder()
+                               .connectString(getZookeeperEnsemble())
+                               .retryPolicy(new ExponentialBackoffRetry(1000, 
3))
+                               .build();
+               
+               try {
+                       zooKeeperClient.start();
+                       List<String> serverNodes = 
zooKeeperClient.getChildren().forPath("/live_nodes");
+                       for (String serverNode : serverNodes) {
+                               String serverURL = constructURL(serverNode);
+                               serverHosts.add(serverURL);
+                       }
+               } catch (Exception e) {
+                       LOG.failedToGetZookeeperUrls(e);
+                       throw new RuntimeException(e);
+               } finally {
+                       // Close the client connection with ZooKeeper
+                       if (zooKeeperClient != null) {
+                               zooKeeperClient.close();
+                       }
+               }
+
+               return serverHosts;
+       }
+       
+       /**
+        * Given a String of the format "host:port_solr" convert to a URL of 
the format
+        * "http://host:port/solr";.
+        * 
+        * @param serverInfo Server Info from Zookeeper (required)
+        * 
+        * @return URL to SOLR
+        */
+       private String constructURL(String serverInfo) {
+               String scheme = "http";
+
+               StringBuffer buffer = new StringBuffer();
+               buffer.append(scheme);
+               buffer.append("://");
+               buffer.append(serverInfo.replace("_", "/"));
+               return buffer.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/StringResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/StringResponseHandler.java
 
b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/StringResponseHandler.java
new file mode 100644
index 0000000..68b68c6
--- /dev/null
+++ 
b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/StringResponseHandler.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.ha.provider.impl;
+
+import java.io.IOException;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.util.EntityUtils;
+
+/**
+ * Apache HttpClient ResponseHandler for String HttpResponse
+ */
+public class StringResponseHandler implements ResponseHandler<String>
+{
+       @Override
+       public String handleResponse(HttpResponse response)
+       throws ClientProtocolException, IOException 
+       {
+               int status = response.getStatusLine().getStatusCode();
+               
+               if (status >= 200 && status < 300)
+               {
+                       HttpEntity entity = response.getEntity();
+                       return entity != null ?EntityUtils.toString(entity) : 
null;
+               }
+               else
+               {
+                       throw new ClientProtocolException("Unexcepted response 
status: " + status);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/main/resources/META-INF/services/org.apache.hadoop.gateway.ha.provider.URLManager
----------------------------------------------------------------------
diff --git 
a/gateway-provider-ha/src/main/resources/META-INF/services/org.apache.hadoop.gateway.ha.provider.URLManager
 
b/gateway-provider-ha/src/main/resources/META-INF/services/org.apache.hadoop.gateway.ha.provider.URLManager
index d1ec0b9..7530ac6 100644
--- 
a/gateway-provider-ha/src/main/resources/META-INF/services/org.apache.hadoop.gateway.ha.provider.URLManager
+++ 
b/gateway-provider-ha/src/main/resources/META-INF/services/org.apache.hadoop.gateway.ha.provider.URLManager
@@ -16,4 +16,7 @@
 # limitations under the License.
 ##########################################################################
 
-org.apache.hadoop.gateway.ha.provider.impl.HS2ZookeeperURLManager
\ No newline at end of file
+org.apache.hadoop.gateway.ha.provider.impl.HS2ZookeeperURLManager
+org.apache.hadoop.gateway.ha.provider.impl.SOLRZookeeperURLManager
+org.apache.hadoop.gateway.ha.provider.impl.KafkaZookeeperURLManager
+org.apache.hadoop.gateway.ha.provider.impl.HBaseZookeeperURLManager
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManagerTest.java
----------------------------------------------------------------------
diff --git 
a/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManagerTest.java
 
b/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManagerTest.java
new file mode 100644
index 0000000..087651e
--- /dev/null
+++ 
b/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManagerTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.ha.provider.impl;
+
+import java.io.IOException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingCluster;
+import org.apache.hadoop.gateway.ha.provider.HaServiceConfig;
+import org.apache.hadoop.gateway.ha.provider.URLManager;
+import org.apache.hadoop.gateway.ha.provider.URLManagerLoader;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Simple unit tests for HBaseZookeeperURLManager.
+ * 
+ * @see HBaseZookeeperURLManager
+ */
+public class HBaseZookeeperURLManagerTest {
+       
+  private TestingCluster cluster;
+
+  @Before
+  public void setup() throws Exception {
+    cluster = new TestingCluster(3);
+    cluster.start();
+
+    CuratorFramework zooKeeperClient =
+        
CuratorFrameworkFactory.builder().connectString(cluster.getConnectString())
+            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+
+    zooKeeperClient.start();
+    zooKeeperClient.create().forPath("/hbase-unsecure");
+    zooKeeperClient.create().forPath("/hbase-unsecure/rs");
+    zooKeeperClient.close();
+  }
+
+  @After
+  public void teardown() throws IOException {
+    cluster.stop();
+  }
+
+  @Test
+  public void testHBaseZookeeperURLManagerLoading() {
+    HaServiceConfig config = new DefaultHaServiceConfig("WEBHBASE");
+    config.setEnabled(true);
+    config.setZookeeperEnsemble(cluster.getConnectString());
+    URLManager manager = URLManagerLoader.loadURLManager(config);
+    Assert.assertNotNull(manager);
+    Assert.assertTrue(manager instanceof HBaseZookeeperURLManager);
+  }
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManagerTest.java
----------------------------------------------------------------------
diff --git 
a/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManagerTest.java
 
b/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManagerTest.java
new file mode 100644
index 0000000..50dedbf
--- /dev/null
+++ 
b/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManagerTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.ha.provider.impl;
+
+import java.io.IOException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingCluster;
+import org.apache.hadoop.gateway.ha.provider.HaServiceConfig;
+import org.apache.hadoop.gateway.ha.provider.URLManager;
+import org.apache.hadoop.gateway.ha.provider.URLManagerLoader;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Simple unit tests for KafkaZookeeperURLManager.
+ * 
+ * @see KafkaZookeeperURLManager
+ */
+public class KafkaZookeeperURLManagerTest {
+  private TestingCluster cluster;
+
+  @Before
+  public void setup() throws Exception {
+    cluster = new TestingCluster(3);
+    cluster.start();
+
+    CuratorFramework zooKeeperClient =
+        
CuratorFrameworkFactory.builder().connectString(cluster.getConnectString())
+            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+
+    zooKeeperClient.start();
+    zooKeeperClient.create().forPath("/brokers");
+    zooKeeperClient.create().forPath("/brokers/ids");
+    zooKeeperClient.close();
+  }
+
+  @After
+  public void teardown() throws IOException {
+    cluster.stop();
+  }
+       
+  @Test
+  public void testHBaseZookeeperURLManagerLoading() {
+    HaServiceConfig config = new DefaultHaServiceConfig("KAFKA");
+    config.setEnabled(true);
+    config.setZookeeperEnsemble(cluster.getConnectString());
+    URLManager manager = URLManagerLoader.loadURLManager(config);
+    Assert.assertNotNull(manager);
+    Assert.assertTrue(manager instanceof KafkaZookeeperURLManager);
+  }
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManagerTest.java
----------------------------------------------------------------------
diff --git 
a/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManagerTest.java
 
b/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManagerTest.java
new file mode 100644
index 0000000..6cc6fa7
--- /dev/null
+++ 
b/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManagerTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.ha.provider.impl;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingCluster;
+import org.apache.hadoop.gateway.ha.provider.HaServiceConfig;
+import org.apache.hadoop.gateway.ha.provider.URLManager;
+import org.apache.hadoop.gateway.ha.provider.URLManagerLoader;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.TreeSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Simple unit tests for SOLRZookeeperURLManager.
+ * 
+ * @see SOLRZookeeperURLManager
+ */
+public class SOLRZookeeperURLManagerTest {
+
+  private TestingCluster cluster;
+  private SOLRZookeeperURLManager manager;
+
+  @Before
+  public void setup() throws Exception {
+    cluster = new TestingCluster(3);
+    cluster.start();
+
+    CuratorFramework zooKeeperClient =
+        
CuratorFrameworkFactory.builder().connectString(cluster.getConnectString())
+            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+
+    zooKeeperClient.start();
+    zooKeeperClient.create().forPath("/live_nodes");
+    zooKeeperClient.create().forPath("/live_nodes/host1:8983_solr");
+    zooKeeperClient.create().forPath("/live_nodes/host2:8983_solr");
+    zooKeeperClient.create().forPath("/live_nodes/host3:8983_solr");
+    zooKeeperClient.close();
+    manager = new SOLRZookeeperURLManager();
+    HaServiceConfig config = new DefaultHaServiceConfig("SOLR");
+    config.setEnabled(true);
+    config.setZookeeperEnsemble(cluster.getConnectString());
+    manager.setConfig(config);
+  }
+
+  @After
+  public void teardown() throws IOException {
+    cluster.stop();
+  }
+
+  @Test
+  public void testURLs() throws Exception {
+    List<String> urls = manager.getURLs();
+    Assert.assertNotNull(urls);
+
+    // Order of URLS is not deterministic out of Zookeeper
+    // So we just check for expected values
+    
+    TreeSet<String> expected = new TreeSet<String>();
+
+    expected.add("http://host1:8983/solr";);
+    expected.add("http://host2:8983/solr";);
+    expected.add("http://host3:8983/solr";);
+    
+    for(String url : urls)
+    {
+       assertTrue(expected.contains(url));
+       expected.remove(url);
+    }
+    
+    assertEquals(0,expected.size());
+    
+    // Unable to test markFailed because the SOLRZookeeperURLManager always 
does a refresh on Zookeeper contents.
+  }
+
+  @Test
+  public void testSOLRZookeeperURLManagerLoading() {
+    HaServiceConfig config = new DefaultHaServiceConfig("SOLR");
+    config.setEnabled(true);
+    config.setZookeeperEnsemble(cluster.getConnectString());
+    URLManager manager = URLManagerLoader.loadURLManager(config);
+    Assert.assertNotNull(manager);
+    Assert.assertTrue(manager instanceof SOLRZookeeperURLManager);
+  }
+}

Reply via email to