Repository: knox Updated Branches: refs/heads/master aa62fa2db -> a08aaf742
KNOX-1041 - High Availability Support For Apache SOLR, HBase & Kafka (Rick Kellogg via Sandeep More) Project: http://git-wip-us.apache.org/repos/asf/knox/repo Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/a08aaf74 Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/a08aaf74 Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/a08aaf74 Branch: refs/heads/master Commit: a08aaf742a97a3c35c94e28406fc4b6ef3184005 Parents: aa62fa2 Author: Sandeep More <m...@apache.org> Authored: Fri Oct 20 10:38:34 2017 -0400 Committer: Sandeep More <m...@apache.org> Committed: Fri Oct 20 10:38:34 2017 -0400 ---------------------------------------------------------------------- .../provider/impl/BaseZookeeperURLManager.java | 195 +++++++++++++++++++ .../provider/impl/HBaseZookeeperURLManager.java | 138 +++++++++++++ .../provider/impl/KafkaZookeeperURLManager.java | 152 +++++++++++++++ .../provider/impl/SOLRZookeeperURLManager.java | 118 +++++++++++ .../ha/provider/impl/StringResponseHandler.java | 49 +++++ ...apache.hadoop.gateway.ha.provider.URLManager | 5 +- .../impl/HBaseZookeeperURLManagerTest.java | 72 +++++++ .../impl/KafkaZookeeperURLManagerTest.java | 71 +++++++ .../impl/SOLRZookeeperURLManagerTest.java | 110 +++++++++++ 9 files changed, 909 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/BaseZookeeperURLManager.java ---------------------------------------------------------------------- diff --git a/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/BaseZookeeperURLManager.java b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/BaseZookeeperURLManager.java new file mode 100644 index 0000000..0b16144 --- /dev/null +++ b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/BaseZookeeperURLManager.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.gateway.ha.provider.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.gateway.ha.provider.HaServiceConfig; +import org.apache.hadoop.gateway.ha.provider.URLManager; +import org.apache.hadoop.gateway.ha.provider.impl.i18n.HaMessages; +import org.apache.hadoop.gateway.i18n.messages.MessagesFactory; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; + +import com.google.common.collect.Lists; + +/** + * Base implementation of URLManager intended for query of Zookeeper active hosts. In + * the event of a failure via markFailed, Zookeeper is queried again for active + * host information. + * + * When configuring the HAProvider in the topology, the zookeeperEnsemble attribute must be set to a + * comma delimited list of the host and port number, i.e. host1:2181,host2:2181. + */ +public abstract class BaseZookeeperURLManager implements URLManager { + protected static final HaMessages LOG = MessagesFactory.get(HaMessages.class); + /** + * Host Ping Timeout + */ + private static final int TIMEOUT = 2000; + + private String zooKeeperEnsemble; + private ConcurrentLinkedQueue<String> urls = new ConcurrentLinkedQueue<String>(); + + // ------------------------------------------------------------------------------------- + // URLManager interface methods + // ------------------------------------------------------------------------------------- + + @Override + public boolean supportsConfig(HaServiceConfig config) { + if (!config.getServiceName().equalsIgnoreCase(getServiceName())) { + return false; + } + + String zookeeperEnsemble = config.getZookeeperEnsemble(); + if (zookeeperEnsemble != null && zookeeperEnsemble.trim().length() > 0) { + return true; + } + + return false; + } + + @Override + public void setConfig(HaServiceConfig config) { + zooKeeperEnsemble = config.getZookeeperEnsemble(); + setURLs(lookupURLs()); + } + + @Override + public synchronized String getActiveURL() { + // None available so refresh + if (urls.isEmpty()) { + setURLs(lookupURLs()); + } + + return this.urls.peek(); + } + + @Override + public synchronized void setActiveURL(String url) { + throw new UnsupportedOperationException(); + } + + @Override + public synchronized List<String> getURLs() { + return Lists.newArrayList(this.urls.iterator()); + } + + @Override + public synchronized void markFailed(String url) { + // Capture complete URL of active host + String topURL = getActiveURL(); + + // Refresh URLs from ZooKeeper + setURLs(lookupURLs()); + + // Show failed URL and new URL + LOG.markedFailedUrl(topURL, getActiveURL()); + } + + @Override + public synchronized void setURLs(List<String> urls) { + if ((urls != null) && (!(urls.isEmpty()))) { + this.urls.clear(); + this.urls.addAll(urls); + } + } + + // ------------------------------------------------------------------------------------- + // Abstract methods + // ------------------------------------------------------------------------------------- + + /** + * Look within Zookeeper under the /live_nodes branch for active hosts + * + * @return A List of URLs (never null) + */ + protected abstract List<String> lookupURLs(); + + /** + * @return The name of the Knox Topology Service to support + */ + protected abstract String getServiceName(); + + // ------------------------------------------------------------------------------------- + // Protected methods + // ------------------------------------------------------------------------------------- + + protected String getZookeeperEnsemble() { + return zooKeeperEnsemble; + } + + /** + * Validate access to hosts using simple light weight ping style REST call. + * + * @param hosts List of hosts to evaluate (required) + * @param suffix Text to append to host (required) + * @param acceptHeader Used for Accept header (optional) + * + * @return Hosts with successful access + */ + protected List<String> validateHosts(List<String> hosts, String suffix, String acceptHeader) { + List<String> result = new ArrayList<String>(); + + CloseableHttpClient client = null; + + try { + // Construct a HttpClient with short term timeout + RequestConfig.Builder requestBuilder = RequestConfig.custom() + .setConnectTimeout(TIMEOUT) + .setSocketTimeout(TIMEOUT) + .setConnectionRequestTimeout(TIMEOUT); + + client = HttpClientBuilder.create() + .setDefaultRequestConfig(requestBuilder.build()) + .build(); + + for(String host: hosts) { + try { + HttpGet get = new HttpGet(host + suffix); + + if (acceptHeader != null) { + get.setHeader("Accept", acceptHeader); + } + + String response = client.execute(get, new StringResponseHandler()); + + if (response != null) { + result.add(host); + } + } + catch (Exception ex) { + // ignore host + } + } + } + catch (Exception ex) { + // Ignore errors + } + finally { + IOUtils.closeQuietly(client); + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManager.java ---------------------------------------------------------------------- diff --git a/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManager.java b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManager.java new file mode 100644 index 0000000..8a414c7 --- /dev/null +++ b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManager.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.gateway.ha.provider.impl; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Implementation of URLManager intended for query of Zookeeper for active HBase RegionServer hosts. + * + * The assumption is that the HBase REST Server will be installed on the same host. For safety + * reasons, the REST Server is pinged for access before inclusion in the list of returned hosts. + * + * In the event of a failure via markFailed, Zookeeper is queried again for active + * host information. + * + * When configuring the HAProvider in the topology, the zookeeperEnsemble + * attribute must be set to a comma delimited list of the host and port number, + * i.e. host1:2181,host2:2181. + */ +public class HBaseZookeeperURLManager extends BaseZookeeperURLManager { + /** + * Default Port Number for HBase REST Server + */ + private static final int PORT_NUMBER = 8080; + + private String zookeeperNamespace = "hbase-unsecure"; + + // ------------------------------------------------------------------------------------- + // Abstract methods + // ------------------------------------------------------------------------------------- + + /** + * Look within Zookeeper under the /hbase-unsecure/rs branch for active HBase RegionServer hosts + * + * @return A List of URLs (never null) + */ + @Override + protected List<String> lookupURLs() { + // Retrieve list of potential hosts from ZooKeeper + List<String> hosts = retrieveHosts(); + + // Validate access to hosts using cheap ping style operation + List<String> validatedHosts = validateHosts(hosts,"/","text/xml"); + + // Randomize the hosts list for simple load balancing + if (!validatedHosts.isEmpty()) { + Collections.shuffle(validatedHosts); + } + + return validatedHosts; + } + + protected String getServiceName() { + return "WEBHBASE"; + }; + + // ------------------------------------------------------------------------------------- + // Private methods + // ------------------------------------------------------------------------------------- + + /** + * @return Retrieve lists of hosts from ZooKeeper + */ + private List<String> retrieveHosts() + { + List<String> serverHosts = new ArrayList<>(); + + CuratorFramework zooKeeperClient = CuratorFrameworkFactory.builder() + .connectString(getZookeeperEnsemble()) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)) + .build(); + + try { + zooKeeperClient.start(); + + // Retrieve list of all region server hosts + List<String> serverNodes = zooKeeperClient.getChildren().forPath("/" + zookeeperNamespace + "/rs"); + + for (String serverNode : serverNodes) { + String serverURL = constructURL(serverNode); + serverHosts.add(serverURL); + } + } catch (Exception e) { + LOG.failedToGetZookeeperUrls(e); + throw new RuntimeException(e); + } finally { + // Close the client connection with ZooKeeper + if (zooKeeperClient != null) { + zooKeeperClient.close(); + } + } + + return serverHosts; + } + + /** + * Given a String of the format "host,number,number" convert to a URL of the format + * "http://host:port". + * + * @param serverInfo Server Info from Zookeeper (required) + * + * @return URL to HBASE + */ + private String constructURL(String serverInfo) { + String scheme = "http"; + + StringBuffer buffer = new StringBuffer(); + buffer.append(scheme); + buffer.append("://"); + // Strip off the host name + buffer.append(serverInfo.substring(0,serverInfo.indexOf(","))); + buffer.append(":"); + buffer.append(PORT_NUMBER); + + return buffer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManager.java ---------------------------------------------------------------------- diff --git a/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManager.java b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManager.java new file mode 100644 index 0000000..c68c107 --- /dev/null +++ b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManager.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.gateway.ha.provider.impl; + +import net.minidev.json.JSONObject; +import net.minidev.json.parser.JSONParser; +import net.minidev.json.parser.ParseException; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Implementation of URLManager intended for query of Zookeeper for active Kafka hosts. + * + * The assumption is that the Confluent REST Proxy will be installed on the same host. For safety + * reasons, the REST Server is pinged for access before inclusion in the list of returned hosts. + * + * In the event of a failure via markFailed, Zookeeper is queried again for active + * host information. + * + * When configuring the HAProvider in the topology, the zookeeperEnsemble + * attribute must be set to a comma delimited list of the host and port number, + * i.e. host1:2181,host2:2181. + */ +public class KafkaZookeeperURLManager extends BaseZookeeperURLManager { + /** + * Default Port Number for Confluent Kafka REST Server + */ + private static final int PORT_NUMBER = 8082; + /** + * Base path for retrieval from Zookeeper + */ + private static final String BASE_PATH = "/brokers/ids"; + + // ------------------------------------------------------------------------------------- + // Abstract methods + // ------------------------------------------------------------------------------------- + + /** + * Look within Zookeeper under the /broker/ids branch for active Kafka hosts + * + * @return A List of URLs (never null) + */ + @Override + protected List<String> lookupURLs() { + // Retrieve list of potential hosts from ZooKeeper + List<String> hosts = retrieveHosts(); + + // Validate access to hosts using cheap ping style operation + List<String> validatedHosts = validateHosts(hosts,"/topics","application/vnd.kafka.v2+json"); + + // Randomize the hosts list for simple load balancing + if (!validatedHosts.isEmpty()) { + Collections.shuffle(validatedHosts); + } + + return validatedHosts; + } + + protected String getServiceName() { + return "KAFKA"; + }; + + // ------------------------------------------------------------------------------------- + // Private methods + // ------------------------------------------------------------------------------------- + + /** + * @return Retrieve lists of hosts from ZooKeeper + */ + private List<String> retrieveHosts() + { + List<String> serverHosts = new ArrayList<>(); + + CuratorFramework zooKeeperClient = CuratorFrameworkFactory.builder() + .connectString(getZookeeperEnsemble()) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)) + .build(); + + try { + zooKeeperClient.start(); + + // Retrieve list of host URLs from ZooKeeper + List<String> brokers = zooKeeperClient.getChildren().forPath(BASE_PATH); + + for (String broker : brokers) { + String serverInfo = new String(zooKeeperClient.getData().forPath(BASE_PATH + "/" + broker), Charset.forName("UTF-8")); + + String serverURL = constructURL(serverInfo); + serverHosts.add(serverURL); + } + } catch (Exception e) { + LOG.failedToGetZookeeperUrls(e); + throw new RuntimeException(e); + } finally { + // Close the client connection with ZooKeeper + if (zooKeeperClient != null) { + zooKeeperClient.close(); + } + } + + return serverHosts; + } + + /** + * Given a String of the format "{"jmx_port":-1,"timestamp":"1505763958072","endpoints":["PLAINTEXT://host:6667"],"host":"host","version":3,"port":6667}" + * convert to a URL of the format "http://host:port". + * + * @param serverInfo Server Info in JSON Format from Zookeeper (required) + * + * @return URL to Kafka + * @throws ParseException + */ + private String constructURL(String serverInfo) throws ParseException { + String scheme = "http"; + + StringBuffer buffer = new StringBuffer(); + + buffer.append(scheme); + buffer.append("://"); + + JSONParser parser = new JSONParser(JSONParser.DEFAULT_PERMISSIVE_MODE); + JSONObject obj = (JSONObject) parser.parse(serverInfo); + buffer.append(obj.get("host")); + + buffer.append(":"); + buffer.append(PORT_NUMBER); + + return buffer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManager.java ---------------------------------------------------------------------- diff --git a/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManager.java b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManager.java new file mode 100644 index 0000000..f612e9b --- /dev/null +++ b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManager.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.gateway.ha.provider.impl; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Implementation of URLManager intended for query of Zookeeper for active SOLR Cloud hosts. + * In the event of a failure via markFailed, Zookeeper is queried again for active + * host information. + * + * When configuring the HAProvider in the topology, the zookeeperEnsemble + * attribute must be set to a comma delimited list of the host and port number, + * i.e. host1:2181,host2:2181. + */ +public class SOLRZookeeperURLManager extends BaseZookeeperURLManager { + + // ------------------------------------------------------------------------------------- + // Abstract methods + // ------------------------------------------------------------------------------------- + + /** + * Look within Zookeeper under the /live_nodes branch for active SOLR hosts + * + * @return A List of URLs (never null) + */ + @Override + protected List<String> lookupURLs() { + // Retrieve list of potential hosts from ZooKeeper + List<String> hosts = retrieveHosts(); + + // Randomize the hosts list for simple load balancing + if (!hosts.isEmpty()) { + Collections.shuffle(hosts); + } + + return hosts; + } + + protected String getServiceName() { + return "SOLR"; + }; + + // ------------------------------------------------------------------------------------- + // Private methods + // ------------------------------------------------------------------------------------- + + /** + * @return Retrieve lists of hosts from ZooKeeper + */ + private List<String> retrieveHosts() + { + List<String> serverHosts = new ArrayList<>(); + + CuratorFramework zooKeeperClient = CuratorFrameworkFactory.builder() + .connectString(getZookeeperEnsemble()) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)) + .build(); + + try { + zooKeeperClient.start(); + List<String> serverNodes = zooKeeperClient.getChildren().forPath("/live_nodes"); + for (String serverNode : serverNodes) { + String serverURL = constructURL(serverNode); + serverHosts.add(serverURL); + } + } catch (Exception e) { + LOG.failedToGetZookeeperUrls(e); + throw new RuntimeException(e); + } finally { + // Close the client connection with ZooKeeper + if (zooKeeperClient != null) { + zooKeeperClient.close(); + } + } + + return serverHosts; + } + + /** + * Given a String of the format "host:port_solr" convert to a URL of the format + * "http://host:port/solr". + * + * @param serverInfo Server Info from Zookeeper (required) + * + * @return URL to SOLR + */ + private String constructURL(String serverInfo) { + String scheme = "http"; + + StringBuffer buffer = new StringBuffer(); + buffer.append(scheme); + buffer.append("://"); + buffer.append(serverInfo.replace("_", "/")); + return buffer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/StringResponseHandler.java ---------------------------------------------------------------------- diff --git a/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/StringResponseHandler.java b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/StringResponseHandler.java new file mode 100644 index 0000000..68b68c6 --- /dev/null +++ b/gateway-provider-ha/src/main/java/org/apache/hadoop/gateway/ha/provider/impl/StringResponseHandler.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.gateway.ha.provider.impl; + +import java.io.IOException; + +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.ResponseHandler; +import org.apache.http.util.EntityUtils; + +/** + * Apache HttpClient ResponseHandler for String HttpResponse + */ +public class StringResponseHandler implements ResponseHandler<String> +{ + @Override + public String handleResponse(HttpResponse response) + throws ClientProtocolException, IOException + { + int status = response.getStatusLine().getStatusCode(); + + if (status >= 200 && status < 300) + { + HttpEntity entity = response.getEntity(); + return entity != null ?EntityUtils.toString(entity) : null; + } + else + { + throw new ClientProtocolException("Unexcepted response status: " + status); + } + } +} http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/main/resources/META-INF/services/org.apache.hadoop.gateway.ha.provider.URLManager ---------------------------------------------------------------------- diff --git a/gateway-provider-ha/src/main/resources/META-INF/services/org.apache.hadoop.gateway.ha.provider.URLManager b/gateway-provider-ha/src/main/resources/META-INF/services/org.apache.hadoop.gateway.ha.provider.URLManager index d1ec0b9..7530ac6 100644 --- a/gateway-provider-ha/src/main/resources/META-INF/services/org.apache.hadoop.gateway.ha.provider.URLManager +++ b/gateway-provider-ha/src/main/resources/META-INF/services/org.apache.hadoop.gateway.ha.provider.URLManager @@ -16,4 +16,7 @@ # limitations under the License. ########################################################################## -org.apache.hadoop.gateway.ha.provider.impl.HS2ZookeeperURLManager \ No newline at end of file +org.apache.hadoop.gateway.ha.provider.impl.HS2ZookeeperURLManager +org.apache.hadoop.gateway.ha.provider.impl.SOLRZookeeperURLManager +org.apache.hadoop.gateway.ha.provider.impl.KafkaZookeeperURLManager +org.apache.hadoop.gateway.ha.provider.impl.HBaseZookeeperURLManager \ No newline at end of file http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManagerTest.java ---------------------------------------------------------------------- diff --git a/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManagerTest.java b/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManagerTest.java new file mode 100644 index 0000000..087651e --- /dev/null +++ b/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/HBaseZookeeperURLManagerTest.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.gateway.ha.provider.impl; + +import java.io.IOException; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingCluster; +import org.apache.hadoop.gateway.ha.provider.HaServiceConfig; +import org.apache.hadoop.gateway.ha.provider.URLManager; +import org.apache.hadoop.gateway.ha.provider.URLManagerLoader; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Simple unit tests for HBaseZookeeperURLManager. + * + * @see HBaseZookeeperURLManager + */ +public class HBaseZookeeperURLManagerTest { + + private TestingCluster cluster; + + @Before + public void setup() throws Exception { + cluster = new TestingCluster(3); + cluster.start(); + + CuratorFramework zooKeeperClient = + CuratorFrameworkFactory.builder().connectString(cluster.getConnectString()) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + + zooKeeperClient.start(); + zooKeeperClient.create().forPath("/hbase-unsecure"); + zooKeeperClient.create().forPath("/hbase-unsecure/rs"); + zooKeeperClient.close(); + } + + @After + public void teardown() throws IOException { + cluster.stop(); + } + + @Test + public void testHBaseZookeeperURLManagerLoading() { + HaServiceConfig config = new DefaultHaServiceConfig("WEBHBASE"); + config.setEnabled(true); + config.setZookeeperEnsemble(cluster.getConnectString()); + URLManager manager = URLManagerLoader.loadURLManager(config); + Assert.assertNotNull(manager); + Assert.assertTrue(manager instanceof HBaseZookeeperURLManager); + } +} http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManagerTest.java ---------------------------------------------------------------------- diff --git a/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManagerTest.java b/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManagerTest.java new file mode 100644 index 0000000..50dedbf --- /dev/null +++ b/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/KafkaZookeeperURLManagerTest.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.gateway.ha.provider.impl; + +import java.io.IOException; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingCluster; +import org.apache.hadoop.gateway.ha.provider.HaServiceConfig; +import org.apache.hadoop.gateway.ha.provider.URLManager; +import org.apache.hadoop.gateway.ha.provider.URLManagerLoader; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Simple unit tests for KafkaZookeeperURLManager. + * + * @see KafkaZookeeperURLManager + */ +public class KafkaZookeeperURLManagerTest { + private TestingCluster cluster; + + @Before + public void setup() throws Exception { + cluster = new TestingCluster(3); + cluster.start(); + + CuratorFramework zooKeeperClient = + CuratorFrameworkFactory.builder().connectString(cluster.getConnectString()) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + + zooKeeperClient.start(); + zooKeeperClient.create().forPath("/brokers"); + zooKeeperClient.create().forPath("/brokers/ids"); + zooKeeperClient.close(); + } + + @After + public void teardown() throws IOException { + cluster.stop(); + } + + @Test + public void testHBaseZookeeperURLManagerLoading() { + HaServiceConfig config = new DefaultHaServiceConfig("KAFKA"); + config.setEnabled(true); + config.setZookeeperEnsemble(cluster.getConnectString()); + URLManager manager = URLManagerLoader.loadURLManager(config); + Assert.assertNotNull(manager); + Assert.assertTrue(manager instanceof KafkaZookeeperURLManager); + } +} http://git-wip-us.apache.org/repos/asf/knox/blob/a08aaf74/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManagerTest.java ---------------------------------------------------------------------- diff --git a/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManagerTest.java b/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManagerTest.java new file mode 100644 index 0000000..6cc6fa7 --- /dev/null +++ b/gateway-provider-ha/src/test/java/org/apache/hadoop/gateway/ha/provider/impl/SOLRZookeeperURLManagerTest.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.gateway.ha.provider.impl; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingCluster; +import org.apache.hadoop.gateway.ha.provider.HaServiceConfig; +import org.apache.hadoop.gateway.ha.provider.URLManager; +import org.apache.hadoop.gateway.ha.provider.URLManagerLoader; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.TreeSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Simple unit tests for SOLRZookeeperURLManager. + * + * @see SOLRZookeeperURLManager + */ +public class SOLRZookeeperURLManagerTest { + + private TestingCluster cluster; + private SOLRZookeeperURLManager manager; + + @Before + public void setup() throws Exception { + cluster = new TestingCluster(3); + cluster.start(); + + CuratorFramework zooKeeperClient = + CuratorFrameworkFactory.builder().connectString(cluster.getConnectString()) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + + zooKeeperClient.start(); + zooKeeperClient.create().forPath("/live_nodes"); + zooKeeperClient.create().forPath("/live_nodes/host1:8983_solr"); + zooKeeperClient.create().forPath("/live_nodes/host2:8983_solr"); + zooKeeperClient.create().forPath("/live_nodes/host3:8983_solr"); + zooKeeperClient.close(); + manager = new SOLRZookeeperURLManager(); + HaServiceConfig config = new DefaultHaServiceConfig("SOLR"); + config.setEnabled(true); + config.setZookeeperEnsemble(cluster.getConnectString()); + manager.setConfig(config); + } + + @After + public void teardown() throws IOException { + cluster.stop(); + } + + @Test + public void testURLs() throws Exception { + List<String> urls = manager.getURLs(); + Assert.assertNotNull(urls); + + // Order of URLS is not deterministic out of Zookeeper + // So we just check for expected values + + TreeSet<String> expected = new TreeSet<String>(); + + expected.add("http://host1:8983/solr"); + expected.add("http://host2:8983/solr"); + expected.add("http://host3:8983/solr"); + + for(String url : urls) + { + assertTrue(expected.contains(url)); + expected.remove(url); + } + + assertEquals(0,expected.size()); + + // Unable to test markFailed because the SOLRZookeeperURLManager always does a refresh on Zookeeper contents. + } + + @Test + public void testSOLRZookeeperURLManagerLoading() { + HaServiceConfig config = new DefaultHaServiceConfig("SOLR"); + config.setEnabled(true); + config.setZookeeperEnsemble(cluster.getConnectString()); + URLManager manager = URLManagerLoader.loadURLManager(config); + Assert.assertNotNull(manager); + Assert.assertTrue(manager instanceof SOLRZookeeperURLManager); + } +}