This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit a585119a59f4520114dd2d6fb357aeab41b32977 Author: Zhong, Yanghong <nju_y...@apache.org> AuthorDate: Wed May 20 17:24:26 2020 +0800 KYLIN-4507 Override getSocketAddress() in TCPMemcachedNodeImpl to auto detect memcached instance ip change --- .../net/spy/memcached/protocol/TCPMemcachedNodeImpl.java | 11 ++++++++++- .../cachemanager/RemoteLocalFailOverCacheManager.java | 5 +++++ .../cachemanager/RemoteLocalFailOverCacheManagerTest.java | 14 ++++++++++++++ 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java b/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java index 22dd730..f7da57e 100644 --- a/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java +++ b/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java @@ -19,10 +19,12 @@ package net.spy.memcached.protocol; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import java.nio.channels.UnsupportedAddressTypeException; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.BlockingQueue; @@ -42,6 +44,9 @@ import net.spy.memcached.protocol.binary.TapAckOperationImpl; /** * Represents a node with the memcached cluster, along with buffering and * operation queues. + * + * This is a modified version of the net.spy.memcached.protocol.TCPMemcachedNodeImpl + * Override the final method getSocketAddress() to refresh SocketAddress to achieve same hostname with ip changing */ public abstract class TCPMemcachedNodeImpl extends SpyObject implements MemcachedNode { @@ -415,7 +420,11 @@ public abstract class TCPMemcachedNodeImpl extends SpyObject implements Memcache * @see net.spy.memcached.MemcachedNode#getSocketAddress() */ public final SocketAddress getSocketAddress() { - return socketAddress; + if (!(socketAddress instanceof InetSocketAddress)) { + throw new UnsupportedAddressTypeException(); + } + InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; + return new InetSocketAddress(inetSocketAddress.getHostName(), inetSocketAddress.getPort()); } /* diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java index 22517f4..aae0d7c 100644 --- a/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java +++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java @@ -68,4 +68,9 @@ public class RemoteLocalFailOverCacheManager extends AbstractCacheManager { void enableRemoteCacheManager() { remoteCacheManager.setClusterHealth(true); } + + @VisibleForTesting + MemcachedCacheManager getRemoteCacheManager() { + return remoteCacheManager; + } } \ No newline at end of file diff --git a/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java b/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java index 243e386..c45dd6f 100644 --- a/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java +++ b/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java @@ -32,6 +32,8 @@ import org.springframework.cache.ehcache.EhCacheCache; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +// +//import net.spy.memcached.MemcachedClientIF; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = { "classpath:cacheContext.xml" }) @@ -58,5 +60,17 @@ public class RemoteLocalFailOverCacheManagerTest { cacheManager.enableRemoteCacheManager(); Assert.assertTrue("Memcached enabled", cacheManager.getCache(QUERY_CACHE) instanceof MemcachedCacheManager.MemCachedCacheAdaptor); +// +// MemcachedCacheManager remoteCacheManager = cacheManager.getRemoteCacheManager(); +// for (int i = 0; i < 1000; i++) { +// MemcachedClientIF client = (MemcachedClientIF) remoteCacheManager.getCache(QUERY_CACHE).getNativeCache(); +// System.out.println(i + " available servers: " + client.getAvailableServers() + "; unavailable servers: " +// + client.getUnavailableServers()); +// try { +// client.get("key"); +// Thread.sleep(2000L); +// } catch (Exception e) { +// } +// } } } \ No newline at end of file