alex-plekhanov commented on a change in pull request #8206:
URL: https://github.com/apache/ignite/pull/8206#discussion_r506132945
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -798,8 +801,10 @@ void channelsInit(boolean force) {
c = hld.getOrCreateChannel();
- if (c != null)
- return new T2<>(function.apply(c), attempt + 1);
+ if (c != null) {
+ attemptsCallback.accept(attempt + 1);
Review comment:
NL
##########
File path:
modules/kubernetes/src/test/java/org/apache/ignite/internal/kubernetes/connection/KubernetesServiceAddressResolverTest.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.kubernetes.connection;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteException;
+import
org.apache.ignite.kubernetes.configuration.KubernetesConnectionConfiguration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.matchers.Times;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+/** Checks that correctly parse kubernetes json response. */
+public class KubernetesServiceAddressResolverTest {
+ /** Mock of kubernetes API. */
+ private static ClientAndServer mockServer;
+
+ /** */
+ private static final String namespace = "ns01";
+
+ /** */
+ private static final String service = "ignite";
+
+ /** */
+ @BeforeClass
+ public static void startServer() {
+ mockServer = startClientAndServer();
+ }
+
+ /** */
+ @AfterClass
+ public static void stopServer() {
+ mockServer.stop();
+ }
+
+ /** */
+ @Test
+ public void testCorrectParseKubernetesResponse() throws IOException {
+ // given
+ KubernetesServiceAddressResolver rslvr = prepareResolver(false);
+
+ mockSuccessServerResponse();
+
+ // when
+ Collection<InetAddress> result = rslvr.getServiceAddresses();
+
+ // then
+ List<String> ips = result.stream()
+ .map(InetAddress::getHostAddress)
+ .collect(Collectors.toList());
+
+ assertEquals(
+ Arrays.asList("10.1.1.1", "10.1.1.2", "10.1.1.4", "10.1.1.5",
"10.1.1.7"),
+ ips
+ );
+ }
+
+ /** */
+ @Test
+ public void
testCorrectParseKubernetesResponseWithIncludingNotReadyAddresses() throws
IOException {
+ // given
+ KubernetesServiceAddressResolver rslvr = prepareResolver(true);
+
+ mockSuccessServerResponse();
+
+ // when
+ Collection<InetAddress> result = rslvr.getServiceAddresses();
+
+ // then
+ List<String> ips = result.stream()
+ .map(InetAddress::getHostAddress)
+ .collect(Collectors.toList());
+
+ assertEquals(
+ Arrays.asList("10.1.1.1", "10.1.1.2", "10.1.1.3", "10.1.1.4",
"10.1.1.5", "10.1.1.6", "10.1.1.7"),
+ ips
+ );
+ }
+
+ /** */
+ @Test(expected = IgniteException.class)
+ public void testConnectionFailure() throws IOException {
+ // given
+ KubernetesServiceAddressResolver rslvr = prepareResolver(true);
+
+ mockFailureServerResponse();
+
+ rslvr.getServiceAddresses();
+ }
+
+ /** */
+ private KubernetesServiceAddressResolver prepareResolver(boolean
includeNotReadyAddresses)
+ throws IOException
+ {
+ File account = File.createTempFile("kubernetes-test-account", "");
+ new FileWriter(account).write("account-token");
+ String accountFile = account.getAbsolutePath();
+
+ KubernetesConnectionConfiguration cfg = new
KubernetesConnectionConfiguration();
+ cfg.setNamespace(namespace);
+ cfg.setServiceName(service);
+ cfg.setMasterUrl("https://localhost:" + mockServer.getLocalPort());
+ cfg.setAccountToken(accountFile);
+ cfg.setIncludeNotReadyAddresses(includeNotReadyAddresses);
+
+ return new KubernetesServiceAddressResolver(cfg);
+ }
+
+ /** */
+ private void mockFailureServerResponse() {
+ mockServer
+ .when(
+ request()
+ .withMethod("GET")
+
.withPath(String.format("/api/v1/namespaces/%s/endpoints/%s", namespace,
service)),
+ Times.exactly(1)
+ )
+ .respond(
+ response()
+ .withStatusCode(401));
+ }
+
+ /** */
+ private void mockSuccessServerResponse() {
+ mockServer
+ .when(
+ request()
+ .withMethod("GET")
+
.withPath(String.format("/api/v1/namespaces/%s/endpoints/%s", namespace,
service)),
+ Times.exactly(1)
+ )
+ .respond(
+ response()
+ .withStatusCode(200)
+ .withBody("{" +
+ " \"subsets\": [" +
+ " {" +
+ " \"addresses\": [" +
+ " {" +
+ " \"ip\": \"10.1.1.1\"" +
+ " }," +
+ " {" +
+ " \"ip\": \"10.1.1.2\"" +
+ " }" +
+ " ]," +
+ " \"notReadyAddresses\": [" +
+ " {" +
+ " \"ip\": \"10.1.1.3\"" +
+ " }" +
+ " ]" +
+ " }," +
+ " {" +
+ " \"addresses\": [" +
+ " {" +
+ " \"ip\": \"10.1.1.4\"" +
+ " }," +
+ " {" +
+ " \"ip\": \"10.1.1.5\"" +
+ " }" +
+ " ]," +
+ " \"notReadyAddresses\": [" +
+ " {" +
+ " \"ip\": \"10.1.1.6\"" +
+ " }" +
+ " ]" +
+ " }," +
+ " {" +
+ " \"addresses\": [" +
+ " {" +
+ " \"ip\": \"10.1.1.7\"" +
+ " }" +
+ " ]" +
+ " }" +
+ " ]" +
+ "}"
+ ));
+ }
+
+//
Review comment:
Clean it up, please
##########
File path:
modules/kubernetes/src/test/java/org/apache/ignite/internal/kubernetes/connection/KubernetesServiceAddressResolverTest.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.kubernetes.connection;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteException;
+import
org.apache.ignite.kubernetes.configuration.KubernetesConnectionConfiguration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.matchers.Times;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+/** Checks that correctly parse kubernetes json response. */
+public class KubernetesServiceAddressResolverTest {
+ /** Mock of kubernetes API. */
+ private static ClientAndServer mockServer;
+
+ /** */
+ private static final String namespace = "ns01";
+
+ /** */
+ private static final String service = "ignite";
+
+ /** */
+ @BeforeClass
+ public static void startServer() {
+ mockServer = startClientAndServer();
+ }
+
+ /** */
+ @AfterClass
+ public static void stopServer() {
+ mockServer.stop();
+ }
+
+ /** */
+ @Test
+ public void testCorrectParseKubernetesResponse() throws IOException {
+ // given
+ KubernetesServiceAddressResolver rslvr = prepareResolver(false);
+
+ mockSuccessServerResponse();
+
+ // when
+ Collection<InetAddress> result = rslvr.getServiceAddresses();
+
+ // then
+ List<String> ips = result.stream()
+ .map(InetAddress::getHostAddress)
+ .collect(Collectors.toList());
+
+ assertEquals(
+ Arrays.asList("10.1.1.1", "10.1.1.2", "10.1.1.4", "10.1.1.5",
"10.1.1.7"),
+ ips
+ );
+ }
+
+ /** */
+ @Test
+ public void
testCorrectParseKubernetesResponseWithIncludingNotReadyAddresses() throws
IOException {
+ // given
+ KubernetesServiceAddressResolver rslvr = prepareResolver(true);
+
+ mockSuccessServerResponse();
+
+ // when
+ Collection<InetAddress> result = rslvr.getServiceAddresses();
+
+ // then
+ List<String> ips = result.stream()
+ .map(InetAddress::getHostAddress)
+ .collect(Collectors.toList());
+
+ assertEquals(
+ Arrays.asList("10.1.1.1", "10.1.1.2", "10.1.1.3", "10.1.1.4",
"10.1.1.5", "10.1.1.6", "10.1.1.7"),
+ ips
+ );
+ }
+
+ /** */
+ @Test(expected = IgniteException.class)
+ public void testConnectionFailure() throws IOException {
+ // given
+ KubernetesServiceAddressResolver rslvr = prepareResolver(true);
+
+ mockFailureServerResponse();
+
+ rslvr.getServiceAddresses();
+ }
+
+ /** */
+ private KubernetesServiceAddressResolver prepareResolver(boolean
includeNotReadyAddresses)
+ throws IOException
+ {
+ File account = File.createTempFile("kubernetes-test-account", "");
+ new FileWriter(account).write("account-token");
Review comment:
Close FileWriter?
##########
File path:
modules/kubernetes/src/test/java/org/apache/ignite/internal/kubernetes/connection/KubernetesServiceAddressResolverTest.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.kubernetes.connection;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteException;
+import
org.apache.ignite.kubernetes.configuration.KubernetesConnectionConfiguration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.matchers.Times;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+/** Checks that correctly parse kubernetes json response. */
+public class KubernetesServiceAddressResolverTest {
Review comment:
Why it is not included to the suite?
##########
File path:
modules/kubernetes/src/test/java/org/apache/ignite/client/TestClusterClientConnection.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import
org.apache.ignite.kubernetes.configuration.KubernetesConnectionConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import
org.apache.ignite.spi.discovery.tcp.ipfinder.kubernetes.TcpDiscoveryKubernetesIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.matchers.Times;
+
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+/** Test that thin client connects to cluster with {@link
ThinClientKubernetesAddressFinder}. */
+public class TestClusterClientConnection extends GridCommonAbstractTest {
+ /** Mock of kubernetes API. */
+ private static ClientAndServer mockServer;
+
+ /** */
+ private static final String namespace = "ns01";
+
+ /** */
+ private static final String service = "ignite";
+
+ /** */
+ @BeforeClass
+ public static void startServer() {
+ mockServer = startClientAndServer();
+ }
+
+ /** */
+ @AfterClass
+ public static void stopServer() {
+ mockServer.stop();
+ }
+
+ /** */
+ @After
+ public void tearDown() {
+ stopAllGrids();
+ }
+
+ /** */
+ @Test
+ public void testClientConnectsToCluster() throws Exception {
+ mockServerResponse();
+
+ IgniteEx crd = startGrid(getConfiguration());
+ String crdAddr = crd.localNode().addresses().iterator().next();
+
+ mockServerResponse(crdAddr);
+
+ ClientConfiguration ccfg = new ClientConfiguration();
+ ccfg.setAddressesFinder(new
ThinClientKubernetesAddressFinder(prepareConfiguration()));
+
+ IgniteClient client = Ignition.startClient(ccfg);
+
+ ClientCache cache = client.createCache("cache");
+ cache.put(1, 2);
+ assertEquals(2, cache.get(1));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration() throws
Exception {
+ IgniteConfiguration cfg = super.getConfiguration();
+
+ KubernetesConnectionConfiguration kccfg = prepareConfiguration();
+ TcpDiscoveryKubernetesIpFinder ipFinder = new
TcpDiscoveryKubernetesIpFinder(kccfg);
+
+ TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
+ discoverySpi.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(discoverySpi);
+
+ cfg.setIgniteInstanceName(getTestIgniteInstanceName());
+
+ return cfg;
+ }
+
+ /** */
+ private void mockServerResponse(String... addrs) {
+ String ipAddrs = Arrays.stream(addrs)
+ .map(addr -> String.format("{\"ip\":\"%s\"}", addr))
+ .collect(Collectors.joining(","));
+
+ mockServer
+ .when(
+ request()
+ .withMethod("GET")
+
.withPath(String.format("/api/v1/namespaces/%s/endpoints/%s", namespace,
service)),
+ Times.exactly(1)
+ )
+ .respond(
+ response()
+ .withStatusCode(200)
+ .withBody("{" +
+ " \"subsets\": [" +
+ " {" +
+ " \"addresses\": [" +
+ " " + ipAddrs +
+ " ]" +
+ " }" +
+ " ]" +
+ "}"
+ ));
+ }
+
+ /** */
+ private KubernetesConnectionConfiguration prepareConfiguration()
+ throws IOException
+ {
+ File account = File.createTempFile("kubernetes-test-account", "");
+ new FileWriter(account).write("account-token");
Review comment:
Close FileWriter?
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -218,18 +220,24 @@
Function<PayloadInputChannel, T>
payloadReader,
int attemptsLimit,
ClientConnectionException failure) {
- T2<ClientChannel, Integer> chAndAttempts;
+ ClientChannel ch;
+ // Workaround to store used attempts value within lambda body.
+ int attemptsCnt[] = new int[1];
try {
- chAndAttempts = applyOnDefaultChannel(channel -> channel,
attemptsLimit);
-
+ ch = applyOnDefaultChannel(channel -> channel, attemptsLimit, v ->
attemptsCnt[0] = v );
} catch (Throwable ex) {
+ if (failure != null) {
+ failure.addSuppressed(ex);
+ fut.completeExceptionally(failure);
Review comment:
NL
##########
File path: modules/kubernetes/pom.xml
##########
@@ -101,6 +101,25 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mock-server</groupId>
+ <artifactId>mockserver-netty</artifactId>
+ <version>5.11.1</version>
Review comment:
`<scope>test</scope>`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]