Author: jbellis Date: Thu Apr 1 01:35:59 2010 New Revision: 929767 URL: http://svn.apache.org/viewvc?rev=929767&view=rev Log: merge from 0.6
Added: cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java (with props) Added: cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=929767&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java Thu Apr 1 01:35:59 2010 @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.lang.StringUtils; + +import org.apache.cassandra.cache.ICacheExpungeHook; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.ReadResponse; +import org.apache.cassandra.db.Row; +import org.apache.cassandra.net.IAsyncCallback; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.ExpiringMap; +import org.apache.cassandra.utils.FBUtilities; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +class ConsistencyChecker implements Runnable +{ + private static Logger logger_ = LoggerFactory.getLogger(ConsistencyManager.class); + private static long scheduledTimeMillis_ = 600; + private static ExpiringMap<String, String> readRepairTable_ = new ExpiringMap<String, String>(scheduledTimeMillis_); + + private final String table_; + private final Row row_; + protected final List<InetAddress> replicas_; + private final ReadCommand readCommand_; + + public ConsistencyChecker(String table, Row row, List<InetAddress> replicas, ReadCommand readCommand) + { + table_ = table; + row_ = row; + replicas_ = replicas; + readCommand_ = readCommand; + } + + public void run() + { + ReadCommand readCommandDigestOnly = constructReadMessage(true); + try + { + Message message = readCommandDigestOnly.makeReadMessage(); + if (logger_.isDebugEnabled()) + logger_.debug("Reading consistency digest for " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]"); + MessagingService.instance.sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]), new DigestResponseHandler()); + } + catch (IOException ex) + { + throw new RuntimeException(ex); + } + } + + private ReadCommand constructReadMessage(boolean isDigestQuery) + { + ReadCommand readCommand = readCommand_.copy(); + readCommand.setDigestQuery(isDigestQuery); + return readCommand; + } + + class DigestResponseHandler implements IAsyncCallback + { + Collection<Message> responses_ = new LinkedBlockingQueue<Message>(); + + // syncronized so "size() == " works + public synchronized void response(Message msg) + { + responses_.add(msg); + if (responses_.size() != ConsistencyChecker.this.replicas_.size()) + return; + + for (Message response : responses_) + { + try + { + byte[] body = response.getMessageBody(); + ByteArrayInputStream bufIn = new ByteArrayInputStream(body); + ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn)); + byte[] digest = result.digest(); + if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest)) + { + doReadRepair(); + break; + } + } + catch (Exception e) + { + throw new RuntimeException("Error handling responses for " + row_, e); + } + } + } + + private void doReadRepair() throws IOException + { + replicas_.add(FBUtilities.getLocalAddress()); + IResponseResolver<Row> readResponseResolver = new ReadResponseResolver(table_, replicas_.size()); + IAsyncCallback responseHandler = new DataRepairHandler(replicas_.size(), readResponseResolver); + ReadCommand readCommand = constructReadMessage(false); + Message message = readCommand.makeReadMessage(); + if (logger_.isDebugEnabled()) + logger_.debug("Performing read repair for " + readCommand_.key + " to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]"); + MessagingService.instance.sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]), responseHandler); + } + } + + static class DataRepairHandler implements IAsyncCallback, ICacheExpungeHook<String, String> + { + private final Collection<Message> responses_ = new LinkedBlockingQueue<Message>(); + private final IResponseResolver<Row> readResponseResolver_; + private final int majority_; + + DataRepairHandler(int responseCount, IResponseResolver<Row> readResponseResolver) + { + readResponseResolver_ = readResponseResolver; + majority_ = (responseCount / 2) + 1; + } + + // synchronized so the " == majority" is safe + public synchronized void response(Message message) + { + if (logger_.isDebugEnabled()) + logger_.debug("Received responses in DataRepairHandler : " + message.toString()); + responses_.add(message); + if (responses_.size() == majority_) + { + String messageId = message.getMessageId(); + readRepairTable_.put(messageId, messageId, this); + } + } + + public void callMe(String key, String value) + { + try + { + readResponseResolver_.resolve(responses_); + } + catch (Exception ex) + { + throw new RuntimeException(ex); + } + } + } +} Propchange: cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java ------------------------------------------------------------------------------ svn:eol-style = native