Author: jbellis
Date: Thu Apr  1 01:35:59 2010
New Revision: 929767

URL: http://svn.apache.org/viewvc?rev=929767&view=rev
Log:
merge from 0.6

Added:
    
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java   
(with props)

Added: 
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=929767&view=auto
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java 
(added)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java 
Thu Apr  1 01:35:59 2010
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.cassandra.cache.ICacheExpungeHook;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ExpiringMap;
+import org.apache.cassandra.utils.FBUtilities;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+class ConsistencyChecker implements Runnable
+{
+    private static Logger logger_ = 
LoggerFactory.getLogger(ConsistencyManager.class);
+    private static long scheduledTimeMillis_ = 600;
+    private static ExpiringMap<String, String> readRepairTable_ = new 
ExpiringMap<String, String>(scheduledTimeMillis_);
+
+    private final String table_;
+    private final Row row_;
+    protected final List<InetAddress> replicas_;
+    private final ReadCommand readCommand_;
+
+    public ConsistencyChecker(String table, Row row, List<InetAddress> 
replicas, ReadCommand readCommand)
+    {
+        table_ = table;
+        row_ = row;
+        replicas_ = replicas;
+        readCommand_ = readCommand;
+    }
+
+    public void run()
+       {
+        ReadCommand readCommandDigestOnly = constructReadMessage(true);
+               try
+               {
+                       Message message = 
readCommandDigestOnly.makeReadMessage();
+            if (logger_.isDebugEnabled())
+              logger_.debug("Reading consistency digest for " + 
readCommand_.key + " from " + message.getMessageId() + "@[" + 
StringUtils.join(replicas_, ", ") + "]");
+            MessagingService.instance.sendRR(message, replicas_.toArray(new 
InetAddress[replicas_.size()]), new DigestResponseHandler());
+               }
+               catch (IOException ex)
+               {
+                       throw new RuntimeException(ex);
+               }
+       }
+
+    private ReadCommand constructReadMessage(boolean isDigestQuery)
+    {
+        ReadCommand readCommand = readCommand_.copy();
+        readCommand.setDigestQuery(isDigestQuery);
+        return readCommand;
+    }
+
+    class DigestResponseHandler implements IAsyncCallback
+       {
+               Collection<Message> responses_ = new 
LinkedBlockingQueue<Message>();
+
+        // syncronized so "size() == " works
+               public synchronized void response(Message msg)
+               {
+                       responses_.add(msg);
+            if (responses_.size() != ConsistencyChecker.this.replicas_.size())
+                return;
+
+            for (Message response : responses_)
+            {
+                try
+                {
+                    byte[] body = response.getMessageBody();
+                    ByteArrayInputStream bufIn = new 
ByteArrayInputStream(body);
+                    ReadResponse result = 
ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+                    byte[] digest = result.digest();
+                    if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
+                    {
+                        doReadRepair();
+                        break;
+                    }
+                }
+                catch (Exception e)
+                {
+                    throw new RuntimeException("Error handling responses for " 
+ row_, e);
+                }
+            }
+        }
+
+        private void doReadRepair() throws IOException
+               {
+            replicas_.add(FBUtilities.getLocalAddress());
+            IResponseResolver<Row> readResponseResolver = new 
ReadResponseResolver(table_, replicas_.size());
+            IAsyncCallback responseHandler = new 
DataRepairHandler(replicas_.size(), readResponseResolver);
+            ReadCommand readCommand = constructReadMessage(false);
+            Message message = readCommand.makeReadMessage();
+            if (logger_.isDebugEnabled())
+              logger_.debug("Performing read repair for " + readCommand_.key + 
" to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + 
"]");
+            MessagingService.instance.sendRR(message, replicas_.toArray(new 
InetAddress[replicas_.size()]), responseHandler);
+               }
+       }
+
+       static class DataRepairHandler implements IAsyncCallback, 
ICacheExpungeHook<String, String>
+       {
+               private final Collection<Message> responses_ = new 
LinkedBlockingQueue<Message>();
+               private final IResponseResolver<Row> readResponseResolver_;
+               private final int majority_;
+               
+               DataRepairHandler(int responseCount, IResponseResolver<Row> 
readResponseResolver)
+               {
+                       readResponseResolver_ = readResponseResolver;
+                       majority_ = (responseCount / 2) + 1;  
+               }
+
+        // synchronized so the " == majority" is safe
+               public synchronized void response(Message message)
+               {
+                       if (logger_.isDebugEnabled())
+                         logger_.debug("Received responses in 
DataRepairHandler : " + message.toString());
+                       responses_.add(message);
+            if (responses_.size() == majority_)
+            {
+                String messageId = message.getMessageId();
+                readRepairTable_.put(messageId, messageId, this);
+            }
+        }
+
+               public void callMe(String key, String value)
+               {
+            try
+                       {
+                               readResponseResolver_.resolve(responses_);
+            }
+            catch (Exception ex)
+            {
+                throw new RuntimeException(ex);
+            }
+        }
+    }
+}

Propchange: 
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to