Author: jbellis Date: Tue Jun 28 12:28:00 2011 New Revision: 1140565 URL: http://svn.apache.org/viewvc?rev=1140565&view=rev Log: fix race that could result in Hadoopwriter failing to throw exception for encountered error patch by Mck SembWever and jbellis for CASSANDRA-2755
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Modified: cassandra/branches/cassandra-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1140565&r1=1140564&r2=1140565&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Jun 28 12:28:00 2011 @@ -23,6 +23,8 @@ * Expose number of threads blocked on submitting memtable to flush (CASSANDRA-2817) * Fix potential NPE during read repair (CASSANDRA-2823) + * fix race that could result in Hadoop writer failing to throw an + exception encountered after close() (CASSANDRA-2755) 0.7.6 Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1140565&r1=1140564&r2=1140565&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Tue Jun 28 12:28:00 2011 @@ -32,15 +32,14 @@ import java.util.concurrent.TimeUnit; import org.apache.cassandra.client.RingCache; import org.apache.cassandra.dht.Range; import org.apache.cassandra.thrift.*; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.thrift.TException; import org.apache.thrift.transport.TSocket; -import org.apache.cassandra.utils.ByteBufferUtil; /** * The <code>ColumnFamilyRecordWriter</code> maps the output <key, value> @@ -219,27 +218,33 @@ implements org.apache.hadoop.mapred.Reco @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { - close((org.apache.hadoop.mapred.Reporter)null); + close(); } /** Fills the deprecated RecordWriter interface for streaming. */ @Deprecated public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException { + close(); + } + + private void close() throws IOException + { + // close all the clients before throwing anything + IOException clientException = null; for (RangeClient client : clients.values()) - client.stopNicely(); - try { - for (RangeClient client : clients.values()) + try { - client.join(); client.close(); } + catch (IOException e) + { + clientException = e; + } } - catch (InterruptedException e) - { - throw new AssertionError(e); - } + if (clientException != null) + throw clientException; } /** @@ -255,6 +260,9 @@ implements org.apache.hadoop.mapred.Reco private final BlockingQueue<Pair<ByteBuffer, Mutation>> queue = new ArrayBlockingQueue<Pair<ByteBuffer,Mutation>>(queueSize); private volatile boolean run = true; + // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing + // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls, + // when the client is closed. private volatile IOException lastException; private Cassandra.Client thriftClient; @@ -291,15 +299,25 @@ implements org.apache.hadoop.mapred.Reco } } - public void stopNicely() throws IOException + public void close() throws IOException { - if (lastException != null) - throw lastException; + // stop the run loop. this will result in closeInternal being called by the time join() finishes. run = false; interrupt(); + try + { + this.join(); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + + if (lastException != null) + throw lastException; } - public void close() + private void closeInternal() { if (thriftSocket != null) { @@ -356,7 +374,7 @@ implements org.apache.hadoop.mapred.Reco } catch (Exception e) { - close(); + closeInternal(); if (!iter.hasNext()) { lastException = new IOException(e); @@ -373,7 +391,7 @@ implements org.apache.hadoop.mapred.Reco } catch (Exception e) { - close(); + closeInternal(); // TException means something unexpected went wrong to that endpoint, so // we should try again to another. Other exceptions (auth or invalid request) are fatal. if ((!(e instanceof TException)) || !iter.hasNext())