Repository: cassandra Updated Branches: refs/heads/trunk 753ccff52 -> 314afb8df
(Hadoop) fix cluster initialisation for a split fetching patch by Jacek Lewandowski; reviewed by Alex Liu for CASSANDRA-7774 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fe39eb7a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fe39eb7a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fe39eb7a Branch: refs/heads/trunk Commit: fe39eb7a9e2b017e3cd31b1c09693c8d565dee18 Parents: 44cfd95 Author: Jacek Lewandowski <lewandowski.ja...@gmail.com> Authored: Wed Aug 20 20:39:12 2014 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Wed Aug 20 20:39:12 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/hadoop/cql3/CqlConfigHelper.java | 89 +-------- .../cassandra/hadoop/cql3/CqlRecordReader.java | 19 +- ...mitedLocalNodeFirstLocalBalancingPolicy.java | 185 +++++++++++++++++++ 4 files changed, 198 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe39eb7a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 17c0671..71cfca0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.10 + * (Hadoop) fix cluster initialisation for a split fetching (CASSANDRA-7774) * Configure system.paxos with LeveledCompactionStrategy (CASSANDRA-7753) * Fix ALTER clustering column type from DateType to TimestampType when using DESC clustering order (CASSANRDA-7797) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe39eb7a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java index e894996..137bddf 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java @@ -288,16 +288,22 @@ public class CqlConfigHelper public static Cluster getInputCluster(String host, Configuration conf) { + // this method has been left for backward compatibility + return getInputCluster(new String[] {host}, conf); + } + + public static Cluster getInputCluster(String[] hosts, Configuration conf) + { int port = getInputNativePort(conf); Optional<AuthProvider> authProvider = getAuthProvider(conf); Optional<SSLOptions> sslOptions = getSSLOptions(conf); - LoadBalancingPolicy loadBalancingPolicy = getReadLoadBalancingPolicy(conf, host); + LoadBalancingPolicy loadBalancingPolicy = getReadLoadBalancingPolicy(conf, hosts); SocketOptions socketOptions = getReadSocketOptions(conf); QueryOptions queryOptions = getReadQueryOptions(conf); PoolingOptions poolingOptions = getReadPoolingOptions(conf); Cluster.Builder builder = Cluster.builder() - .addContactPoint(host) + .addContactPoints(hosts) .withPort(port) .withCompression(ProtocolOptions.Compression.NONE); @@ -480,84 +486,9 @@ public class CqlConfigHelper return socketOptions; } - private static LoadBalancingPolicy getReadLoadBalancingPolicy(Configuration conf, final String stickHost) + private static LoadBalancingPolicy getReadLoadBalancingPolicy(Configuration conf, final String[] stickHosts) { - return new LoadBalancingPolicy() - { - private Host origHost; - private Set<Host> liveRemoteHosts = Sets.newHashSet(); - - @Override - public void onAdd(Host host) - { - if (host.getAddress().getHostName().equals(stickHost)) - origHost = host; - } - - @Override - public void onDown(Host host) - { - if (host.getAddress().getHostName().equals(stickHost)) - origHost = null; - liveRemoteHosts.remove(host); - } - - @Override - public void onRemove(Host host) - { - if (host.getAddress().getHostName().equals(stickHost)) - origHost = null; - liveRemoteHosts.remove(host); - } - - @Override - public void onUp(Host host) - { - if (host.getAddress().getHostName().equals(stickHost)) - origHost = host; - liveRemoteHosts.add(host); - } - - @Override - public void onSuspected(Host host) - { - } - - @Override - public HostDistance distance(Host host) - { - if (host.getAddress().getHostName().equals(stickHost)) - return HostDistance.LOCAL; - else - return HostDistance.REMOTE; - } - - @Override - public void init(Cluster cluster, Collection<Host> hosts) - { - for (Host host : hosts) - { - if (host.getAddress().getHostName().equals(stickHost)) - { - origHost = host; - break; - } - } - } - - @Override - public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement) - { - if (origHost != null) - { - return Iterators.concat(Collections.singletonList(origHost).iterator(), liveRemoteHosts.iterator()); - } - else - { - return liveRemoteHosts.iterator(); - } - } - }; + return new LimitedLocalNodeFirstLocalBalancingPolicy(stickHosts); } private static Optional<AuthProvider> getAuthProvider(Configuration conf) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe39eb7a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java index 9167ac3..3eab7c0 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java @@ -124,24 +124,9 @@ public class CqlRecordReader extends RecordReader<Long, Row> if (cluster != null) return; - // create connection using thrift + // create a Cluster instance String[] locations = split.getLocations(); - Exception lastException = null; - for (String location : locations) - { - try - { - cluster = CqlConfigHelper.getInputCluster(location, conf); - break; - } - catch (Exception e) - { - lastException = e; - logger.warn("Failed to create authenticated client to {}", location); - } - } - if (cluster == null && lastException != null) - throw lastException; + cluster = CqlConfigHelper.getInputCluster(locations, conf); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe39eb7a/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java b/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java new file mode 100644 index 0000000..3aa7df0 --- /dev/null +++ b/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java @@ -0,0 +1,185 @@ +package org.apache.cassandra.hadoop.cql3; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Host; +import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.*; +import java.util.concurrent.CopyOnWriteArraySet; + +/** + * This load balancing policy is intended to be used only for CqlRecordReader when it fetches a particular split. + * <p/> + * It chooses alive hosts only from the set of the given replicas - because the connection is used to load the data from + * the particular split, with a strictly defined list of the replicas, it is pointless to try the other nodes. + * The policy tracks which of the replicas are alive, and when a new query plan is requested, it returns those replicas + * in the following order: + * <ul> + * <li>the local node</li> + * <li>the collection of the remaining hosts (which is shuffled on each request)</li> + * </ul> + */ +class LimitedLocalNodeFirstLocalBalancingPolicy implements LoadBalancingPolicy +{ + private final static Logger logger = LoggerFactory.getLogger(LimitedLocalNodeFirstLocalBalancingPolicy.class); + + private final static Set<InetAddress> localAddresses = Collections.unmodifiableSet(getLocalInetAddresses()); + + private final CopyOnWriteArraySet<Host> liveReplicaHosts = new CopyOnWriteArraySet<>(); + + private final Set<InetAddress> replicaAddresses = new HashSet<>(); + + public LimitedLocalNodeFirstLocalBalancingPolicy(String[] replicas) + { + for (String replica : replicas) + { + try + { + InetAddress[] addresses = InetAddress.getAllByName(replica); + Collections.addAll(replicaAddresses, addresses); + } + catch (UnknownHostException e) + { + logger.warn("Invalid replica host name: {}, skipping it", replica); + } + } + logger.debug("Created instance with the following replicas: {}", Arrays.asList(replicas)); + } + + @Override + public void init(Cluster cluster, Collection<Host> hosts) + { + List<Host> replicaHosts = new ArrayList<>(); + for (Host host : hosts) + { + if (replicaAddresses.contains(host.getAddress())) + { + replicaHosts.add(host); + } + } + liveReplicaHosts.addAll(replicaHosts); + logger.debug("Initialized with replica hosts: {}", replicaHosts); + } + + @Override + public HostDistance distance(Host host) + { + if (isLocalHost(host)) + { + return HostDistance.LOCAL; + } + else + { + return HostDistance.REMOTE; + } + } + + @Override + public Iterator<Host> newQueryPlan(String keyspace, Statement statement) + { + List<Host> local = new ArrayList<>(1); + List<Host> remote = new ArrayList<>(liveReplicaHosts.size()); + for (Host liveReplicaHost : liveReplicaHosts) + { + if (isLocalHost(liveReplicaHost)) + { + local.add(liveReplicaHost); + } + else + { + remote.add(liveReplicaHost); + } + } + + Collections.shuffle(remote); + + logger.debug("Using the following hosts order for the new query plan: {} | {}", local, remote); + + return Iterators.concat(local.iterator(), remote.iterator()); + } + + @Override + public void onAdd(Host host) + { + if (replicaAddresses.contains(host.getAddress())) + { + liveReplicaHosts.add(host); + logger.debug("Added a new host {}", host); + } + } + + @Override + public void onUp(Host host) + { + if (replicaAddresses.contains(host.getAddress())) + { + liveReplicaHosts.add(host); + logger.debug("The host {} is now up", host); + } + } + + @Override + public void onDown(Host host) + { + if (liveReplicaHosts.remove(host)) + { + logger.debug("The host {} is now down", host); + } + } + + + @Override + public void onRemove(Host host) + { + if (liveReplicaHosts.remove(host)) + { + logger.debug("Removed the host {}", host); + } + } + + @Override + public void onSuspected(Host host) + { + // not supported by this load balancing policy + } + + private static boolean isLocalHost(Host host) + { + InetAddress hostAddress = host.getAddress(); + return hostAddress.isLoopbackAddress() || localAddresses.contains(hostAddress); + } + + private static Set<InetAddress> getLocalInetAddresses() + { + try + { + return Sets.newHashSet(Iterators.concat( + Iterators.transform( + Iterators.forEnumeration(NetworkInterface.getNetworkInterfaces()), + new Function<NetworkInterface, Iterator<InetAddress>>() + { + @Override + public Iterator<InetAddress> apply(NetworkInterface netIface) + { + return Iterators.forEnumeration(netIface.getInetAddresses()); + } + }))); + } + catch (SocketException e) + { + logger.warn("Could not retrieve local network interfaces.", e); + return Collections.emptySet(); + } + } +}