Updated Branches: refs/heads/trunk 2bc79a074 -> e2506f1d0
IOException related cleanups Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e2506f1d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e2506f1d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e2506f1d Branch: refs/heads/trunk Commit: e2506f1d0f14d8719e5f4fde8020b0c5a31383fd Parents: 2bc79a0 Author: Sylvain Lebresne <[email protected]> Authored: Thu May 2 10:54:42 2013 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Thu May 2 10:54:42 2013 +0200 ---------------------------------------------------------------------- .../org/apache/cassandra/db/CounterColumn.java | 4 +- .../org/apache/cassandra/db/CounterMutation.java | 2 +- .../cassandra/db/CounterMutationVerbHandler.java | 4 - .../apache/cassandra/db/HintedHandOffManager.java | 2 +- .../org/apache/cassandra/service/StorageProxy.java | 79 ++++++--------- .../org/apache/cassandra/utils/FBUtilities.java | 26 +++-- 6 files changed, 53 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2506f1d/src/java/org/apache/cassandra/db/CounterColumn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java index f94672d..15da1df 100644 --- a/src/java/org/apache/cassandra/db/CounterColumn.java +++ b/src/java/org/apache/cassandra/db/CounterColumn.java @@ -344,7 +344,7 @@ public class CounterColumn extends Column return new CounterColumn(name, contextManager.markDeltaToBeCleared(value), timestamp, timestampOfLastDelete); } - private static void sendToOtherReplica(DecoratedKey key, ColumnFamily cf) throws RequestExecutionException, IOException + private static void sendToOtherReplica(DecoratedKey key, ColumnFamily cf) throws RequestExecutionException { RowMutation rm = new RowMutation(cf.metadata().ksName, key.key, cf); @@ -354,7 +354,7 @@ public class CounterColumn extends Column StorageProxy.performWrite(rm, ConsistencyLevel.ANY, localDataCenter, new StorageProxy.WritePerformer() { public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) - throws IOException, OverloadedException + throws OverloadedException { // We should only send to the remote replica, not the local one Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets), ImmutableSet.of(local)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2506f1d/src/java/org/apache/cassandra/db/CounterMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java index d60c5eb..65ca22a 100644 --- a/src/java/org/apache/cassandra/db/CounterMutation.java +++ b/src/java/org/apache/cassandra/db/CounterMutation.java @@ -108,7 +108,7 @@ public class CounterMutation implements IMutation commands.add(new SliceByNamesReadCommand(table, key, columnFamily.metadata().cfName, new NamesQueryFilter(s))); } - public MessageOut<CounterMutation> makeMutationMessage() throws IOException + public MessageOut<CounterMutation> makeMutationMessage() { return new MessageOut<CounterMutation>(MessagingService.Verb.COUNTER_MUTATION, this, serializer); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2506f1d/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java index 38ee66f..3286c9a 100644 --- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java @@ -63,9 +63,5 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation> // The coordinator will timeout on it's own so ignore logger.debug("counter error", e); } - catch (IOException e) - { - logger.error("Error in counter mutation", e); - } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2506f1d/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index 6f2ecb4..0939abb 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -116,7 +116,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean * Returns a mutation representing a Hint to be sent to <code>targetId</code> * as soon as it becomes available again. */ - public static RowMutation hintFor(RowMutation mutation, UUID targetId) throws IOException + public static RowMutation hintFor(RowMutation mutation, UUID targetId) { UUID hintId = UUIDGen.getTimeUUID(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2506f1d/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index e187841..8d2dc60 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.service; -import java.io.DataOutputStream; import java.io.IOException; import java.lang.management.ManagementFactory; import java.net.InetAddress; @@ -54,7 +53,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.io.util.FastByteArrayOutputStream; +import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.TokenMetadata; @@ -115,7 +114,7 @@ public class StorageProxy implements StorageProxyMBean AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) - throws IOException, OverloadedException + throws OverloadedException { assert mutation instanceof RowMutation; sendToHintedEndpoints((RowMutation) mutation, targets, responseHandler, localDataCenter, consistency_level); @@ -135,7 +134,6 @@ public class StorageProxy implements StorageProxyMBean AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) - throws IOException { if (logger.isTraceEnabled()) logger.trace("insert writing local & replicate " + mutation.toString(true)); @@ -152,7 +150,6 @@ public class StorageProxy implements StorageProxyMBean AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) - throws IOException { if (logger.isTraceEnabled()) logger.trace("insert writing local & replicate " + mutation.toString(true)); @@ -395,12 +392,10 @@ public class StorageProxy implements StorageProxyMBean long startTime = System.nanoTime(); List<AbstractWriteResponseHandler> responseHandlers = new ArrayList<AbstractWriteResponseHandler>(mutations.size()); - IMutation mostRecentMutation = null; try { for (IMutation mutation : mutations) { - mostRecentMutation = mutation; if (mutation instanceof CounterMutation) { responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter)); @@ -446,11 +441,6 @@ public class StorageProxy implements StorageProxyMBean Tracing.trace("Overloaded"); throw e; } - catch (IOException e) - { - assert mostRecentMutation != null; - throw new RuntimeException("error writing key " + ByteBufferUtil.bytesToHex(mostRecentMutation.key()), e); - } finally { writeMetrics.addNano(System.nanoTime() - startTime); @@ -596,7 +586,7 @@ public class StorageProxy implements StorageProxyMBean WritePerformer performer, Runnable callback, WriteType writeType) - throws UnavailableException, OverloadedException, IOException + throws UnavailableException, OverloadedException { String table = mutation.getTable(); AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy(); @@ -763,7 +753,7 @@ public class StorageProxy implements StorageProxyMBean HintRunnable runnable = new HintRunnable(target) { - public void runMayThrow() throws IOException + public void runMayThrow() { logger.debug("Adding hint for {}", target); @@ -784,7 +774,7 @@ public class StorageProxy implements StorageProxyMBean return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable); } - public static void writeHintForMutation(RowMutation mutation, InetAddress target) throws IOException + public static void writeHintForMutation(RowMutation mutation, InetAddress target) { UUID hostId = StorageService.instance.getTokenMetadata().getHostId(target); assert hostId != null : "Missing host ID for " + target.getHostAddress(); @@ -816,18 +806,6 @@ public class StorageProxy implements StorageProxyMBean private static void sendMessagesToOneDC(MessageOut message, Collection<InetAddress> targets, boolean localDC, AbstractWriteResponseHandler handler) { - try - { - sendMessagesToOneDCInternal(message, targets, localDC, handler); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - - private static void sendMessagesToOneDCInternal(MessageOut message, Collection<InetAddress> targets, boolean localDC, AbstractWriteResponseHandler handler) throws IOException - { Iterator<InetAddress> iter = targets.iterator(); InetAddress target = iter.next(); @@ -847,21 +825,28 @@ public class StorageProxy implements StorageProxyMBean } // Add all the other destinations of the same message as a FORWARD_HEADER entry - FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(bos); - out.writeInt(targets.size() - 1); - while (iter.hasNext()) - { - InetAddress destination = iter.next(); - CompactEndpointSerializationHelper.serialize(destination, out); - int id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout()); - out.writeInt(id); - logger.trace("Adding FWD message to {}@{}", id, destination); + DataOutputBuffer out = new DataOutputBuffer(); + try + { + out.writeInt(targets.size() - 1); + while (iter.hasNext()) + { + InetAddress destination = iter.next(); + CompactEndpointSerializationHelper.serialize(destination, out); + int id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout()); + out.writeInt(id); + logger.trace("Adding FWD message to {}@{}", id, destination); + } + message = message.withParameter(RowMutation.FORWARD_TO, out.getData()); + // send the combined message + forward headers + int id = MessagingService.instance().sendRR(message, target, handler); + logger.trace("Sending message to {}@{}", id, target); + } + catch (IOException e) + { + // DataOutputBuffer is in-memory, doesn't throw IOException + throw new AssertionError(e); } - message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray()); - // send the combined message + forward headers - int id = MessagingService.instance().sendRR(message, target, handler); - logger.trace("Sending message to {}@{}", id, target); } private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler) @@ -871,7 +856,7 @@ public class StorageProxy implements StorageProxyMBean Runnable runnable = new DroppableRunnable(MessagingService.Verb.MUTATION) { - public void runMayThrow() throws IOException + public void runMayThrow() { rm.apply(); responseHandler.response(null); @@ -894,7 +879,7 @@ public class StorageProxy implements StorageProxyMBean * quicker response and because the WriteResponseHandlers don't make it easy to send back an error. We also always gather * the write latencies at the coordinator node to make gathering point similar to the case of standard writes. */ - public static AbstractWriteResponseHandler mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, OverloadedException, IOException + public static AbstractWriteResponseHandler mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, OverloadedException { InetAddress endpoint = findSuitableEndpoint(cm.getTable(), cm.key(), localDataCenter, cm.consistency()); @@ -963,7 +948,7 @@ public class StorageProxy implements StorageProxyMBean // Must be called on a replica of the mutation. This replica becomes the // leader of this mutation. public static AbstractWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback) - throws UnavailableException, IOException, OverloadedException + throws UnavailableException, OverloadedException { return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback, WriteType.COUNTER); } @@ -971,7 +956,7 @@ public class StorageProxy implements StorageProxyMBean // Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while // applyCounterMutationOnLeader assumes it is on the MUTATION stage already) public static AbstractWriteResponseHandler applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter) - throws UnavailableException, IOException, OverloadedException + throws UnavailableException, OverloadedException { return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null, WriteType.COUNTER); } @@ -984,7 +969,7 @@ public class StorageProxy implements StorageProxyMBean { return new LocalMutationRunnable() { - public void runMayThrow() throws IOException + public void runMayThrow() { assert mutation instanceof CounterMutation; final CounterMutation cm = (CounterMutation) mutation; @@ -1738,7 +1723,7 @@ public class StorageProxy implements StorageProxyMBean public interface WritePerformer { - public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException, OverloadedException; + public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws OverloadedException; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2506f1d/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index ec8241c..e606e06 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -607,14 +607,22 @@ public class FBUtilities public void close() {} } - public static <T> byte[] serialize(T object, IVersionedSerializer<T> serializer, int version) throws IOException - { - int size = (int) serializer.serializedSize(object, version); - DataOutputBuffer buffer = new DataOutputBuffer(size); - serializer.serialize(object, buffer, version); - assert buffer.getLength() == size && buffer.getData().length == size - : String.format("Final buffer length %s to accommodate data size of %s (predicted %s) for %s", - buffer.getData().length, buffer.getLength(), size, object); - return buffer.getData(); + public static <T> byte[] serialize(T object, IVersionedSerializer<T> serializer, int version) + { + try + { + int size = (int) serializer.serializedSize(object, version); + DataOutputBuffer buffer = new DataOutputBuffer(size); + serializer.serialize(object, buffer, version); + assert buffer.getLength() == size && buffer.getData().length == size + : String.format("Final buffer length %s to accommodate data size of %s (predicted %s) for %s", + buffer.getData().length, buffer.getLength(), size, object); + return buffer.getData(); + } + catch (IOException e) + { + // We're doing in-memory serialization... + throw new AssertionError(e); + } } }
