Repository: ignite Updated Branches: refs/heads/ignite-4159 d949b739d -> b4b924023
IGNITE-4160: working implementation of Kubernetes IP finder. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b4b92402 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b4b92402 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b4b92402 Branch: refs/heads/ignite-4159 Commit: b4b9240231641614af5318ac0f57721dae5dc30c Parents: d949b73 Author: Denis Magda <dma...@gridgain.com> Authored: Sun Feb 12 18:13:26 2017 -0800 Committer: Denis Magda <dma...@gridgain.com> Committed: Sun Feb 12 18:13:26 2017 -0800 ---------------------------------------------------------------------- .../TcpDiscoveryKubernetesIpFinder.java | 278 +++++++++++++++++++ .../tcp/ipfinder/kubernetes/package-info.java | 22 ++ pom.xml | 1 + 3 files changed, 301 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b4b92402/modules/kubernetes/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/kubernetes/TcpDiscoveryKubernetesIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/kubernetes/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/kubernetes/TcpDiscoveryKubernetesIpFinder.java b/modules/kubernetes/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/kubernetes/TcpDiscoveryKubernetesIpFinder.java new file mode 100644 index 0000000..3c292e8 --- /dev/null +++ b/modules/kubernetes/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/kubernetes/TcpDiscoveryKubernetesIpFinder.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.ipfinder.kubernetes; + + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.SecureRandom; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderAdapter; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.ObjectMapper; + +/** + * Kubernetes Service based IP finder. + */ +public class TcpDiscoveryKubernetesIpFinder extends TcpDiscoveryIpFinderAdapter { + /** Grid logger. */ + @LoggerResource + private IgniteLogger log; + + /** Init routine guard. */ + private final AtomicBoolean initGuard = new AtomicBoolean(); + + /** Init routine latch. */ + private final CountDownLatch initLatch = new CountDownLatch(1); + + /** Trust manager. */ + private TrustManager[] trustAll = new TrustManager[] { + new X509TrustManager() { + public void checkServerTrusted(X509Certificate[] certs, String authType) {} + public void checkClientTrusted(X509Certificate[] certs, String authType) {} + public X509Certificate[] getAcceptedIssuers() { return null; } + } + }; + + /** Host verifier. */ + private HostnameVerifier trustAllHosts = new HostnameVerifier() { + public boolean verify(String hostname, SSLSession session) { + return true; + } + }; + + /** Ignite's Kubernetes Service name. */ + private String serviceName = "ignite"; + + /** Ignite Pod namespace name. */ + private String namespace = "default"; + + /** Kubernetes master URL. */ + private String master = "https://kubernetes.default.svc.cluster.local:443"; + + /** Account token location. */ + private String accountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token"; + + /** Kubernets API URL. */ + private URL url; + + /** SSL context */ + private SSLContext ctx; + + /** + * + */ + public TcpDiscoveryKubernetesIpFinder() { + setShared(true); + } + + /** {@inheritDoc} */ + @Override public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException { + init(); + + Collection<InetSocketAddress> addrs = new ArrayList<>(); + + try { + System.out.println("Getting Apache Ignite endpoints from: " + url); + + HttpsURLConnection conn = (HttpsURLConnection)url.openConnection(); + + conn.setHostnameVerifier(trustAllHosts); + + conn.setSSLSocketFactory(ctx.getSocketFactory()); + conn.addRequestProperty("Authorization", "Bearer " + serviceAccountToken(accountToken)); + + // Sending the request and processing a response. + ObjectMapper mapper = new ObjectMapper(); + + Endpoints endpoints = mapper.readValue(conn.getInputStream(), Endpoints.class); + + if (endpoints != null) { + if (endpoints.subsets != null && !endpoints.subsets.isEmpty()) { + for (Subset subset : endpoints.subsets) { + + if (subset.addresses != null && !subset.addresses.isEmpty()) { + for (Address address : subset.addresses) { + addrs.add(new InetSocketAddress(address.ip, 0)); + + System.out.println("Added an address to the list: " + address.ip); + } + } + } + } + } + } + catch (Exception e) { + throw new IgniteSpiException("Failed to retrieve Ignite pods IP addresses.", e); + } + + return addrs; + } + + /** {@inheritDoc} */ + @Override public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { + // No-op + } + + /** {@inheritDoc} */ + @Override public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { + // No-op + } + + /** + * Sets the name of Ignite's Kubernetes Service where the IP finder will connect to in order to retrieve IP + * addresses of existing Ignite pods. + * + * @param service Ignite's Kubernetes Service name. + */ + public void serviceName(String service) { + this.serviceName = service; + } + + /** + * Sets the namespace name Ignite's Kubernetes Service belongs to. + * If it is not set then 'default' is used as the namespace. + * + * @param namespace Ignite's Kubernetes Service namespace name. + */ + public void namespace(String namespace) { + this.namespace = namespace; + } + + /** + * Sets Kubernetes master's URL. By default 'kubernetes.default.svc' is used. + * + * @param master URL string of Kubernetes master. + */ + public void masterUrl(String master) { + this.master = master; + } + + /** + * @param accountToken + */ + public void accountToken(String accountToken) { + this.accountToken = accountToken; + } + + /** + * Kubernetes IP finder initalization. + * + * @throws IgniteSpiException In case of error. + */ + private void init() throws IgniteSpiException { + if (initGuard.compareAndSet(false, true)) { + + if (serviceName == null || serviceName.isEmpty() || + namespace == null || namespace.isEmpty() || + master == null || master.isEmpty() || + accountToken == null || accountToken.isEmpty()) { + throw new IgniteSpiException( + "One or more configuration parameters are invalid [serviceName=" + + serviceName + ", namespace=" + namespace + ", masterUrl=" + + master + ", accountToken=" + accountToken + "]"); + } + + try { + // Preparing the URL and SSL context to be used for connection purposes. + String path = String.format("/api/v1/namespaces/%s/endpoints/%s", namespace, serviceName); + + url = new URL(master + path); + + ctx = SSLContext.getInstance("SSL"); + + ctx.init(null, trustAll, new SecureRandom()); + } + catch (Exception e) { + throw new IgniteSpiException("Failed to connect to Ignite's Kubernetes Service.", e); + } + finally { + initLatch.countDown(); + } + } + else { + try { + U.await(initLatch); + } + catch (IgniteInterruptedCheckedException e) { + throw new IgniteSpiException("Thread has been interrupted.", e); + } + + if (url == null || ctx == null) + throw new IgniteSpiException("IP finder has not been initialized properly."); + } + } + + /** + * @param file + * @return + */ + private String serviceAccountToken(String file) { + try { + return new String(Files.readAllBytes(Paths.get(file))); + } catch (IOException e) { + throw new IgniteSpiException("Failed to load services account token [accountToken= " + file + "]", e); + } + } + + /** + * + */ + @JsonIgnoreProperties(ignoreUnknown = true) + private static class Address { + /** */ + public String ip; + } + + /** + * + */ + @JsonIgnoreProperties(ignoreUnknown = true) + private static class Subset { + /** */ + public List<Address> addresses; + } + + /** + * + */ + @JsonIgnoreProperties(ignoreUnknown = true) + private static class Endpoints { + /** */ + public List<Subset> subsets; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b4b92402/modules/kubernetes/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/kubernetes/package-info.java ---------------------------------------------------------------------- diff --git a/modules/kubernetes/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/kubernetes/package-info.java b/modules/kubernetes/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/kubernetes/package-info.java new file mode 100644 index 0000000..6b42121 --- /dev/null +++ b/modules/kubernetes/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/kubernetes/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Contains Google Cloud Storage IP finder. + */ +package org.apache.ignite.spi.discovery.tcp.ipfinder.kubernetes; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b4b92402/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ea76053..238361b 100644 --- a/pom.xml +++ b/pom.xml @@ -88,6 +88,7 @@ <module>modules/web/ignite-websphere-test</module> <module>modules/cassandra</module> <module>modules/flink</module> + <module>modules/kubernetes</module> </modules> <profiles>