Repository: cassandra Updated Branches: refs/heads/trunk 10054f308 -> 0401f5797
fix 2.2 eclipse-warnings patch by Ariel Weisberg; reviewed by Robert Stupp for CASSANDRA-9800 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8fc0311 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8fc0311 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8fc0311 Branch: refs/heads/trunk Commit: f8fc0311b65b3d82737352f3d01483c0334a6867 Parents: 61e0251 Author: Ariel Weisberg <ariel.weisb...@datastax.com> Authored: Fri Nov 27 11:40:16 2015 +0100 Committer: Robert Stupp <sn...@snazy.de> Committed: Fri Nov 27 11:40:16 2015 +0100 ---------------------------------------------------------------------- .../apache/cassandra/cache/AutoSavingCache.java | 1 + .../db/WindowsFailedSnapshotTracker.java | 41 ++--- .../db/commitlog/CommitLogReplayer.java | 3 +- .../hadoop/AbstractColumnFamilyInputFormat.java | 4 +- .../cassandra/hadoop/cql3/CqlRecordWriter.java | 160 +++++++++++-------- .../cassandra/hadoop/pig/CqlNativeStorage.java | 6 +- .../io/util/ChecksummedRandomAccessReader.java | 29 +++- .../apache/cassandra/io/util/SegmentedFile.java | 1 + .../cassandra/net/IncomingTcpConnection.java | 3 +- 9 files changed, 149 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/cache/AutoSavingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index c08925d..2c6820e 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -318,6 +318,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K return info.forProgress(keysWritten, Math.max(keysWritten, keysEstimate)); } + @SuppressWarnings("resource") public void saveCache() { logger.trace("Deleting old {} files.", cacheType); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java b/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java index 9e6bb47..7cc7893 100644 --- a/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java +++ b/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java @@ -52,32 +52,33 @@ public class WindowsFailedSnapshotTracker { try { - BufferedReader reader = new BufferedReader(new FileReader(TODELETEFILE)); - String snapshotDirectory; - while ((snapshotDirectory = reader.readLine()) != null) + try (BufferedReader reader = new BufferedReader(new FileReader(TODELETEFILE))) { - File f = new File(snapshotDirectory); + String snapshotDirectory; + while ((snapshotDirectory = reader.readLine()) != null) + { + File f = new File(snapshotDirectory); - // Skip folders that aren't a subset of temp or a data folder. We don't want people to accidentally - // delete something important by virtue of adding something invalid to the .toDelete file. - boolean validFolder = FileUtils.isSubDirectory(new File(System.getenv("TEMP")), f); - for (String s : DatabaseDescriptor.getAllDataFileLocations()) - validFolder |= FileUtils.isSubDirectory(new File(s), f); + // Skip folders that aren't a subset of temp or a data folder. We don't want people to accidentally + // delete something important by virtue of adding something invalid to the .toDelete file. + boolean validFolder = FileUtils.isSubDirectory(new File(System.getenv("TEMP")), f); + for (String s : DatabaseDescriptor.getAllDataFileLocations()) + validFolder |= FileUtils.isSubDirectory(new File(s), f); - if (!validFolder) - { - logger.warn("Skipping invalid directory found in .toDelete: {}. Only %TEMP% or data file subdirectories are valid.", f); - continue; - } + if (!validFolder) + { + logger.warn("Skipping invalid directory found in .toDelete: {}. Only %TEMP% or data file subdirectories are valid.", f); + continue; + } - // Could be a non-existent directory if deletion worked on previous JVM shutdown. - if (f.exists()) - { - logger.warn("Discovered obsolete snapshot. Deleting directory [{}]", snapshotDirectory); - FileUtils.deleteRecursive(new File(snapshotDirectory)); + // Could be a non-existent directory if deletion worked on previous JVM shutdown. + if (f.exists()) + { + logger.warn("Discovered obsolete snapshot. Deleting directory [{}]", snapshotDirectory); + FileUtils.deleteRecursive(new File(snapshotDirectory)); + } } } - reader.close(); // Only delete the old .toDelete file if we succeed in deleting all our known bad snapshots. Files.delete(Paths.get(TODELETEFILE)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index cb02a8c..98fb556 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -197,7 +197,7 @@ public class CommitLogReplayer } return end; } - + abstract static class ReplayFilter { public abstract Iterable<ColumnFamily> filter(Mutation mutation); @@ -273,6 +273,7 @@ public class CommitLogReplayer } } + @SuppressWarnings("resource") public void recover(File file, boolean tolerateTruncation) throws IOException { CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java index 148c08a..3c088c2 100644 --- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java @@ -117,9 +117,9 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< } } - try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf)) + try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf); + Session session = cluster.connect()) { - Session session = cluster.connect(); Metadata metadata = session.getCluster().getMetadata(); for (TokenRange range : masterRangeNodes.keySet()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java index 14e24fb..84102a5 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java @@ -43,7 +43,7 @@ import org.apache.hadoop.util.Progressable; /** * The <code>CqlRecordWriter</code> maps the output <key, value> * pairs to a Cassandra table. In particular, it applies the binded variables - * in the value to the prepared statement, which it associates with the key, and in + * in the value to the prepared statement, which it associates with the key, and in * turn the responsible endpoint. * * <p> @@ -112,11 +112,11 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors()); batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32); this.clients = new HashMap<>(); + String keyspace = ConfigHelper.getOutputKeyspace(conf); - try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf)) + try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf); + Session client = cluster.connect(keyspace)) { - String keyspace = ConfigHelper.getOutputKeyspace(conf); - Session client = cluster.connect(keyspace); ringCache = new NativeRingCache(conf); if (client != null) { @@ -179,7 +179,7 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf if (clientException != null) throw clientException; } - + /** * If the key is to be associated with a valid value, a mutation is created * for it with the given table and columns. In the event the value @@ -225,6 +225,20 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf HadoopCompat.progress(context); } + private static void closeSession(Session session) + { + //Close the session to satisfy to avoid warnings for the resource not being closed + try + { + if (session != null) + session.close(); + } + catch (Throwable t) + { + logger.warn("Error closing connection", t); + } + } + /** * A client that runs in a threadpool and connects to the list of endpoints for a particular * range. Bound variables for keys in that range are sent to this client via a queue. @@ -273,94 +287,104 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf } } } - + /** * Loops collecting cql binded variable values from the queue and sending to Cassandra */ + @SuppressWarnings("resource") public void run() { Session session = null; - outer: - while (run || !queue.isEmpty()) + try { - List<ByteBuffer> bindVariables; - try + outer: + while (run || !queue.isEmpty()) { - bindVariables = queue.take(); - } - catch (InterruptedException e) - { - // re-check loop condition after interrupt - continue; - } + List<ByteBuffer> bindVariables; + try + { + bindVariables = queue.take(); + } + catch (InterruptedException e) + { + // re-check loop condition after interrupt + continue; + } - ListIterator<InetAddress> iter = endpoints.listIterator(); - while (true) - { - // send the mutation to the last-used endpoint. first time through, this will NPE harmlessly. - if (session != null) + ListIterator<InetAddress> iter = endpoints.listIterator(); + while (true) { - try + // send the mutation to the last-used endpoint. first time through, this will NPE harmlessly. + if (session != null) { - int i = 0; - PreparedStatement statement = preparedStatement(session); - while (bindVariables != null) + try { - BoundStatement boundStatement = new BoundStatement(statement); - for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++) + int i = 0; + PreparedStatement statement = preparedStatement(session); + while (bindVariables != null) { - boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition)); + BoundStatement boundStatement = new BoundStatement(statement); + for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++) + { + boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition)); + } + session.execute(boundStatement); + i++; + + if (i >= batchThreshold) + break; + bindVariables = queue.poll(); } - session.execute(boundStatement); - i++; - - if (i >= batchThreshold) - break; - bindVariables = queue.poll(); + break; } - break; + catch (Exception e) + { + closeInternal(); + if (!iter.hasNext()) + { + lastException = new IOException(e); + break outer; + } + } + } + + // attempt to connect to a different endpoint + try + { + InetAddress address = iter.next(); + String host = address.getHostName(); + cluster = CqlConfigHelper.getOutputCluster(host, conf); + closeSession(session); + session = cluster.connect(); } catch (Exception e) { + //If connection died due to Interrupt, just try connecting to the endpoint again. + //There are too many ways for the Thread.interrupted() state to be cleared, so + //we can't rely on that here. Until the java driver gives us a better way of knowing + //that this exception came from an InterruptedException, this is the best solution. + if (canRetryDriverConnection(e)) + { + iter.previous(); + } closeInternal(); - if (!iter.hasNext()) + + // Most exceptions mean something unexpected went wrong to that endpoint, so + // we should try again to another. Other exceptions (auth or invalid request) are fatal. + if ((e instanceof AuthenticationException || e instanceof InvalidQueryException) || !iter.hasNext()) { lastException = new IOException(e); break outer; } - } - } - - // attempt to connect to a different endpoint - try - { - InetAddress address = iter.next(); - String host = address.getHostName(); - cluster = CqlConfigHelper.getOutputCluster(host, conf); - session = cluster.connect(); - } - catch (Exception e) - { - //If connection died due to Interrupt, just try connecting to the endpoint again. - //There are too many ways for the Thread.interrupted() state to be cleared, so - //we can't rely on that here. Until the java driver gives us a better way of knowing - //that this exception came from an InterruptedException, this is the best solution. - if (canRetryDriverConnection(e)) - { - iter.previous(); - } - closeInternal(); - - // Most exceptions mean something unexpected went wrong to that endpoint, so - // we should try again to another. Other exceptions (auth or invalid request) are fatal. - if ((e instanceof AuthenticationException || e instanceof InvalidQueryException) || !iter.hasNext()) - { - lastException = new IOException(e); - break outer; } } } } + finally + { + closeSession(session); + } + // close all our connections once we are done. closeInternal(); } @@ -489,9 +513,9 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf private void refreshEndpointMap() { String keyspace = ConfigHelper.getOutputKeyspace(conf); - try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf)) + try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf); + Session session = cluster.connect(keyspace)) { - Session session = cluster.connect(keyspace); rangeMap = new HashMap<>(); metadata = session.getCluster().getMetadata(); Set<TokenRange> ranges = metadata.getTokenRanges(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java index 74058b1..8831cf2 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java @@ -690,7 +690,7 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo else throw new IOException("bulk_insert_statement is missing in input url parameter"); if (bulkTableAlias != null) - CqlBulkOutputFormat.setTableAlias(conf, bulkTableAlias, column_family); + CqlBulkOutputFormat.setTableAlias(conf, bulkTableAlias, column_family); CqlBulkOutputFormat.setDeleteSourceOnSuccess(conf, bulkDeleteSourceOnSuccess); if (bulkOutputLocation != null) conf.set(CqlBulkRecordWriter.OUTPUT_LOCATION, bulkOutputLocation); @@ -724,9 +724,9 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, Lo // Only get the schema if we haven't already gotten it if (!properties.containsKey(signature)) { - try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), conf)) + try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), conf); + Session client = cluster.connect()) { - Session client = cluster.connect(); client.execute("USE " + keyspace); // compose the CfDef for the columfamily http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java index 442236d..9015b61 100644 --- a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java @@ -48,15 +48,36 @@ public class ChecksummedRandomAccessReader extends RandomAccessReader this.file = file; } + @SuppressWarnings("resource") public static ChecksummedRandomAccessReader open(File file, File crcFile) throws IOException { try (ChannelProxy channel = new ChannelProxy(file)) { RandomAccessReader crcReader = RandomAccessReader.open(crcFile); - @SuppressWarnings("resource") - DataIntegrityMetadata.ChecksumValidator validator = - new DataIntegrityMetadata.ChecksumValidator(new Adler32(), crcReader, file.getPath()); - return new ChecksummedRandomAccessReader(file, channel, validator); + boolean closeCrcReader = true; + try + { + DataIntegrityMetadata.ChecksumValidator validator = + new DataIntegrityMetadata.ChecksumValidator(new Adler32(), crcReader, file.getPath()); + closeCrcReader = false; + boolean closeValidator = true; + try + { + ChecksummedRandomAccessReader retval = new ChecksummedRandomAccessReader(file, channel, validator); + closeValidator = false; + return retval; + } + finally + { + if (closeValidator) + validator.close(); + } + } + finally + { + if (closeCrcReader) + crcReader.close(); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/io/util/SegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java index 30707d8..553cc0d 100644 --- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java @@ -179,6 +179,7 @@ public abstract class SegmentedFile extends SharedCloseableImpl return complete(path, -1L); } + @SuppressWarnings("resource") public SegmentedFile complete(String path, long overrideLength) { ChannelProxy channelCopy = getChannel(path); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/net/IncomingTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index f6652b0..a972114 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -108,7 +108,7 @@ public class IncomingTcpConnection extends Thread implements Closeable close(); } } - + @Override public void close() { @@ -164,6 +164,7 @@ public class IncomingTcpConnection extends Thread implements Closeable } else { + @SuppressWarnings("resource") ReadableByteChannel channel = socket.getChannel(); in = new NIODataInputStream(channel != null ? channel : Channels.newChannel(socket.getInputStream()), BUFFER_SIZE); }