Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c7724e6b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c7724e6b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c7724e6b Branch: refs/heads/cassandra-3.0 Commit: c7724e6b356ed0f3cee1236db52ec2ee425f2495 Parents: 7b430ee f8fc031 Author: Robert Stupp <sn...@snazy.de> Authored: Fri Nov 27 11:41:21 2015 +0100 Committer: Robert Stupp <sn...@snazy.de> Committed: Fri Nov 27 11:41:21 2015 +0100 ---------------------------------------------------------------------- .../cassandra/hadoop/cql3/CqlRecordWriter.java | 160 +++++++++++-------- 1 file changed, 92 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7724e6b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java index 23beba3,84102a5..96815ef --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java @@@ -108,14 -109,14 +108,14 @@@ class CqlRecordWriter extends RecordWri CqlRecordWriter(Configuration conf) { this.conf = conf; - this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors()); - batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32); + this.queueSize = conf.getInt(CqlOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors()); + batchThreshold = conf.getLong(CqlOutputFormat.BATCH_THRESHOLD, 32); this.clients = new HashMap<>(); + String keyspace = ConfigHelper.getOutputKeyspace(conf); - try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf)) + try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf); + Session client = cluster.connect(keyspace)) { - String keyspace = ConfigHelper.getOutputKeyspace(conf); - Session client = cluster.connect(keyspace); ringCache = new NativeRingCache(conf); if (client != null) { @@@ -279,45 -295,67 +294,68 @@@ public void run() { Session session = null; - outer: - while (run || !queue.isEmpty()) ++ + try { - List<ByteBuffer> bindVariables; - try + outer: + while (run || !queue.isEmpty()) { - bindVariables = queue.take(); - } - catch (InterruptedException e) - { - // re-check loop condition after interrupt - continue; - } + List<ByteBuffer> bindVariables; + try + { + bindVariables = queue.take(); + } + catch (InterruptedException e) + { + // re-check loop condition after interrupt + continue; + } - ListIterator<InetAddress> iter = endpoints.listIterator(); - while (true) - { - // send the mutation to the last-used endpoint. first time through, this will NPE harmlessly. - if (session != null) + ListIterator<InetAddress> iter = endpoints.listIterator(); + while (true) { - try + // send the mutation to the last-used endpoint. first time through, this will NPE harmlessly. + if (session != null) { - int i = 0; - PreparedStatement statement = preparedStatement(session); - while (bindVariables != null) + try { - BoundStatement boundStatement = new BoundStatement(statement); - for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++) + int i = 0; + PreparedStatement statement = preparedStatement(session); + while (bindVariables != null) { - boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition)); + BoundStatement boundStatement = new BoundStatement(statement); + for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++) + { + boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition)); + } + session.execute(boundStatement); + i++; + + if (i >= batchThreshold) + break; + bindVariables = queue.poll(); } - session.execute(boundStatement); - i++; - - if (i >= batchThreshold) - break; - bindVariables = queue.poll(); + break; } - break; + catch (Exception e) + { + closeInternal(); + if (!iter.hasNext()) + { + lastException = new IOException(e); + break outer; + } + } + } + + // attempt to connect to a different endpoint + try + { + InetAddress address = iter.next(); + String host = address.getHostName(); + cluster = CqlConfigHelper.getOutputCluster(host, conf); + closeSession(session); + session = cluster.connect(); } catch (Exception e) { @@@ -329,37 -378,13 +378,12 @@@ } } } - - // attempt to connect to a different endpoint - try - { - InetAddress address = iter.next(); - String host = address.getHostName(); - cluster = CqlConfigHelper.getOutputCluster(host, conf); - session = cluster.connect(); - } - catch (Exception e) - { - //If connection died due to Interrupt, just try connecting to the endpoint again. - //There are too many ways for the Thread.interrupted() state to be cleared, so - //we can't rely on that here. Until the java driver gives us a better way of knowing - //that this exception came from an InterruptedException, this is the best solution. - if (canRetryDriverConnection(e)) - { - iter.previous(); - } - closeInternal(); - - // Most exceptions mean something unexpected went wrong to that endpoint, so - // we should try again to another. Other exceptions (auth or invalid request) are fatal. - if ((e instanceof AuthenticationException || e instanceof InvalidQueryException) || !iter.hasNext()) - { - lastException = new IOException(e); - break outer; - } - } } } + finally + { + closeSession(session); + } - // close all our connections once we are done. closeInternal(); }