Author: johan Date: Thu May 20 08:20:35 2010 New Revision: 946564 URL: http://svn.apache.org/viewvc?rev=946564&view=rev Log: Close thrift connections properly in the Hadoop record reader. Patch by johan, review by jbellis. CASSANDRA-1081
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=946564&r1=946563&r2=946564&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Thu May 20 08:20:35 2010 @@ -56,7 +56,11 @@ public class ColumnFamilyRecordReader ex private String cfName; private String keyspace; - public void close() {} + public void close() + { + if (iter != null) + iter.close(); + } public String getCurrentKey() { @@ -102,6 +106,7 @@ public class ColumnFamilyRecordReader ex private int totalRead = 0; private int i = 0; private AbstractType comparator = DatabaseDescriptor.getComparator(keyspace, cfName); + private TSocket socket; private void maybeInit() { @@ -111,7 +116,11 @@ public class ColumnFamilyRecordReader ex if (rows != null) return; - TSocket socket = new TSocket(getLocation(), + + // close previous connection if one is open + close(); + + socket = new TSocket(getLocation(), DatabaseDescriptor.getThriftPort()); TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false); Cassandra.Client client = new Cassandra.Client(binaryProtocol); @@ -226,6 +235,14 @@ public class ColumnFamilyRecordReader ex } return new Pair<String, SortedMap<byte[], IColumn>>(ks.key, map); } + + public void close() + { + if (socket != null && socket.isOpen()) + { + socket.close(); + } + } } private IColumn unthriftify(ColumnOrSuperColumn cosc)