Repository: cassandra Updated Branches: refs/heads/trunk 41c69ac55 -> 3ea35bda6
(Hadoop) ensure that Cluster instances are always closed patch by Alex Liu; reviewed by Aleksey Yeschenko for CASSANDRA-10058 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/177f6070 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/177f6070 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/177f6070 Branch: refs/heads/trunk Commit: 177f607057a9d4c4b3746cec51e8e283938a5363 Parents: ef0e447 Author: Alex Liu <alex_li...@yahoo.com> Authored: Tue Nov 10 14:32:55 2015 +0000 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Tue Nov 10 14:34:00 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../hadoop/AbstractColumnFamilyInputFormat.java | 74 +++++++-------- .../cassandra/hadoop/cql3/CqlRecordWriter.java | 96 ++++++++++---------- .../cassandra/hadoop/pig/CqlNativeStorage.java | 4 +- 4 files changed, 91 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/177f6070/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5edad20..81ceb25 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.4 + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058) * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645) * Use most up-to-date version of schema for system tables (CASSANDRA-10652) * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628) http://git-wip-us.apache.org/repos/asf/cassandra/blob/177f6070/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java index e531ad1..d687183 100644 --- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java @@ -24,6 +24,7 @@ import java.util.concurrent.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Host; import com.datastax.driver.core.Metadata; import com.datastax.driver.core.ResultSet; @@ -58,7 +59,6 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< private String keyspace; private String cfName; private IPartitioner partitioner; - private Session session; protected void validateConfiguration(Configuration conf) { @@ -90,36 +90,36 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); List<InputSplit> splits = new ArrayList<>(); - try + List<Future<List<InputSplit>>> splitfutures = new ArrayList<>(); + KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf); + Range<Token> jobRange = null; + if (jobKeyRange != null) { - List<Future<List<InputSplit>>> splitfutures = new ArrayList<>(); - KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf); - Range<Token> jobRange = null; - if (jobKeyRange != null) + if (jobKeyRange.start_key != null) { - if (jobKeyRange.start_key != null) - { - if (!partitioner.preservesOrder()) - throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving partitioner"); - if (jobKeyRange.start_token != null) - throw new IllegalArgumentException("only start_key supported"); - if (jobKeyRange.end_token != null) - throw new IllegalArgumentException("only start_key supported"); - jobRange = new Range<>(partitioner.getToken(jobKeyRange.start_key), - partitioner.getToken(jobKeyRange.end_key)); - } - else if (jobKeyRange.start_token != null) - { - jobRange = new Range<>(partitioner.getTokenFactory().fromString(jobKeyRange.start_token), - partitioner.getTokenFactory().fromString(jobKeyRange.end_token)); - } - else - { - logger.warn("ignoring jobKeyRange specified without start_key or start_token"); - } + if (!partitioner.preservesOrder()) + throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving partitioner"); + if (jobKeyRange.start_token != null) + throw new IllegalArgumentException("only start_key supported"); + if (jobKeyRange.end_token != null) + throw new IllegalArgumentException("only start_key supported"); + jobRange = new Range<>(partitioner.getToken(jobKeyRange.start_key), + partitioner.getToken(jobKeyRange.end_key)); } + else if (jobKeyRange.start_token != null) + { + jobRange = new Range<>(partitioner.getTokenFactory().fromString(jobKeyRange.start_token), + partitioner.getTokenFactory().fromString(jobKeyRange.end_token)); + } + else + { + logger.warn("ignoring jobKeyRange specified without start_key or start_token"); + } + } - session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect(); + try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf)) + { + Session session = cluster.connect(); Metadata metadata = session.getCluster().getMetadata(); for (TokenRange range : masterRangeNodes.keySet()) @@ -127,7 +127,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< if (jobRange == null) { // for each tokenRange, pick a live owner and ask it to compute bite-sized splits - splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf))); + splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf, session))); } else { @@ -137,7 +137,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< for (TokenRange intersection: range.intersectWith(jobTokenRange)) { // for each tokenRange, pick a live owner and ask it to compute bite-sized splits - splitfutures.add(executor.submit(new SplitCallable(intersection, masterRangeNodes.get(range), conf))); + splitfutures.add(executor.submit(new SplitCallable(intersection, masterRangeNodes.get(range), conf, session))); } } } @@ -182,19 +182,21 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< private final TokenRange tokenRange; private final Set<Host> hosts; private final Configuration conf; + private final Session session; - public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf) + public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf, Session session) { this.tokenRange = tr; this.hosts = hosts; this.conf = conf; + this.session = session; } public List<InputSplit> call() throws Exception { ArrayList<InputSplit> splits = new ArrayList<>(); Map<TokenRange, Long> subSplits; - subSplits = getSubSplits(keyspace, cfName, tokenRange, conf); + subSplits = getSubSplits(keyspace, cfName, tokenRange, conf, session); // turn the sub-ranges into InputSplits String[] endpoints = new String[hosts.size()]; @@ -225,12 +227,12 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< } } - private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException + private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf, Session session) throws IOException { int splitSize = ConfigHelper.getInputSplitSize(conf); try { - return describeSplits(keyspace, cfName, range, splitSize); + return describeSplits(keyspace, cfName, range, splitSize, session); } catch (Exception e) { @@ -240,17 +242,17 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace) { - try (Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect()) + try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf)) { Map<TokenRange, Set<Host>> map = new HashMap<>(); - Metadata metadata = session.getCluster().getMetadata(); + Metadata metadata = cluster.connect().getCluster().getMetadata(); for (TokenRange tokenRange : metadata.getTokenRanges()) map.put(tokenRange, metadata.getReplicas('"' + keyspace + '"', tokenRange)); return map; } } - private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize) + private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize, Session session) { String query = String.format("SELECT mean_partition_size, partitions_count " + "FROM %s.%s " + http://git-wip-us.apache.org/repos/asf/cassandra/blob/177f6070/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java index 6e8ffd9..14e24fb 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java @@ -113,27 +113,25 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32); this.clients = new HashMap<>(); - try + try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf)) { String keyspace = ConfigHelper.getOutputKeyspace(conf); - try (Session client = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace)) + Session client = cluster.connect(keyspace); + ringCache = new NativeRingCache(conf); + if (client != null) { - ringCache = new NativeRingCache(conf); - if (client != null) - { - TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf)); - clusterColumns = tableMetadata.getClusteringColumns(); - partitionKeyColumns = tableMetadata.getPartitionKey(); - - String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim(); - if (cqlQuery.toLowerCase().startsWith("insert")) - throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement"); - cql = appendKeyWhereClauses(cqlQuery); - } - else - { - throw new IllegalArgumentException("Invalid configuration specified " + conf); - } + TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf)); + clusterColumns = tableMetadata.getClusteringColumns(); + partitionKeyColumns = tableMetadata.getPartitionKey(); + + String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim(); + if (cqlQuery.toLowerCase().startsWith("insert")) + throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement"); + cql = appendKeyWhereClauses(cqlQuery); + } + else + { + throw new IllegalArgumentException("Invalid configuration specified " + conf); } } catch (Exception e) @@ -235,7 +233,7 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf { // The list of endpoints for this range protected final List<InetAddress> endpoints; - protected Session client; + protected Cluster cluster = null; // A bounded queue of incoming mutations for this range protected final BlockingQueue<List<ByteBuffer>> queue = new ArrayBlockingQueue<List<ByteBuffer>>(queueSize); @@ -281,6 +279,7 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf */ public void run() { + Session session = null; outer: while (run || !queue.isEmpty()) { @@ -299,34 +298,37 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf while (true) { // send the mutation to the last-used endpoint. first time through, this will NPE harmlessly. - try + if (session != null) { - int i = 0; - PreparedStatement statement = preparedStatement(client); - while (bindVariables != null) + try { - BoundStatement boundStatement = new BoundStatement(statement); - for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++) + int i = 0; + PreparedStatement statement = preparedStatement(session); + while (bindVariables != null) { - boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition)); + BoundStatement boundStatement = new BoundStatement(statement); + for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++) + { + boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition)); + } + session.execute(boundStatement); + i++; + + if (i >= batchThreshold) + break; + bindVariables = queue.poll(); } - client.execute(boundStatement); - i++; - - if (i >= batchThreshold) - break; - bindVariables = queue.poll(); + break; } - break; - } - catch (Exception e) - { - closeInternal(); - if (!iter.hasNext()) + catch (Exception e) { - lastException = new IOException(e); - break outer; - } + closeInternal(); + if (!iter.hasNext()) + { + lastException = new IOException(e); + break outer; + } + } } // attempt to connect to a different endpoint @@ -334,7 +336,8 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf { InetAddress address = iter.next(); String host = address.getHostName(); - client = CqlConfigHelper.getOutputCluster(host, conf).connect(); + cluster = CqlConfigHelper.getOutputCluster(host, conf); + session = cluster.connect(); } catch (Exception e) { @@ -404,9 +407,9 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf protected void closeInternal() { - if (client != null) + if (cluster != null) { - client.close(); + cluster.close(); } } @@ -486,15 +489,14 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf private void refreshEndpointMap() { String keyspace = ConfigHelper.getOutputKeyspace(conf); - try (Session session = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace)) + try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf)) { + Session session = cluster.connect(keyspace); rangeMap = new HashMap<>(); metadata = session.getCluster().getMetadata(); Set<TokenRange> ranges = metadata.getTokenRanges(); for (TokenRange range : ranges) - { rangeMap.put(range, metadata.getReplicas(keyspace, range)); - } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/177f6070/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java index ba0a37d..74058b1 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java @@ -28,6 +28,7 @@ import java.net.URLDecoder; import java.nio.ByteBuffer; import java.util.*; +import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ColumnMetadata; import com.datastax.driver.core.Metadata; import com.datastax.driver.core.Row; @@ -723,8 +724,9 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo // Only get the schema if we haven't already gotten it if (!properties.containsKey(signature)) { - try (Session client = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), conf).connect()) + try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), conf)) { + Session client = cluster.connect(); client.execute("USE " + keyspace); // compose the CfDef for the columfamily