Author: jbellis
Date: Wed Apr 28 16:30:20 2010
New Revision: 939004

URL: http://svn.apache.org/viewvc?rev=939004&view=rev
Log:
split CommitLogExecutorService into BatchCommitLogExecutorService and 
PeriodicCommitLogExecutorService.  Since Periodic mode doesn't need to block 
for each mutation, the executor can avoid generating extra garbage from 
unnecessary FutureTask wrappers.  patch by jbellis; reviewed by gdusbabek for 
CASSANDRA-1014

Added:
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
   (contents, props changed)
      - copied, changed from r933475, 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorServiceMBean.java
   (contents, props changed)
      - copied, changed from r933475, 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorServiceMBean.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
      - copied, changed from r933475, 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorServiceMBean.java
Removed:
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorServiceMBean.java
Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=939004&r1=939003&r2=939004&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Wed Apr 28 16:30:20 2010
@@ -1,5 +1,7 @@
 0.6.2
  * fix contrib/word_count build. (CASSANDRA-992)
+ * split CommitLogExecutorService into BatchCommitLogExecutorService and 
+   PeriodicCommitLogExecutorService (CASSANDRA-1014)
 
 
 0.6.1

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java?rev=939004&r1=939003&r2=939004&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java 
(original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java 
Wed Apr 28 16:30:20 2010
@@ -397,18 +397,7 @@ public class Table 
         {
             if (writeCommitLog)
             {
-                Future<CommitLogSegment.CommitLogContext> future = 
CommitLog.instance().add(mutation, serializedMutation);
-                if (waitForCommitLog)
-                {
-                    try
-                    {
-                        future.get();
-                    }
-                    catch (Exception e)
-                    {
-                        throw new RuntimeException(e);
-                    }
-                }
+                CommitLog.instance().add(mutation, serializedMutation);
             }
         
             for (ColumnFamily columnFamily : mutation.getColumnFamilies())

Added: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java?rev=939004&view=auto
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
 (added)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
 Wed Apr 28 16:30:20 2010
@@ -0,0 +1,69 @@
+package org.apache.cassandra.db.commitlog;
+
+import java.lang.management.ManagementFactory;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.TimeUnit;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+public abstract class AbstractCommitLogExecutorService extends 
AbstractExecutorService implements ICommitLogExecutorService
+{
+    protected volatile long completedTaskCount = 0;
+
+    protected static void registerMBean(Object o)
+    {
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            mbs.registerMBean(o, new 
ObjectName("org.apache.cassandra.db:type=Commitlog"));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Get the current number of running tasks
+     */
+    public int getActiveCount()
+    {
+        return 1;
+    }
+
+    /**
+     * Get the number of completed tasks
+     */
+    public long getCompletedTasks()
+    {
+        return completedTaskCount;
+    }
+
+    // cassandra is crash-only so there's no need to implement the shutdown 
methods
+
+    public boolean isShutdown()
+    {
+        return false;
+    }
+
+    public boolean isTerminated()
+    {
+        return false;
+    }
+
+    public void shutdown()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public List<Runnable> shutdownNow()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException
+    {
+        throw new UnsupportedOperationException();
+    }
+}

Copied: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
 (from r933475, 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java)
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java?p2=cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java&p1=cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java&r1=933475&r2=939004&rev=939004&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
 Wed Apr 28 16:30:20 2010
@@ -22,97 +22,45 @@ package org.apache.cassandra.db.commitlo
 
 
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.*;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.WrappedRunnable;
 
-class CommitLogExecutorService extends AbstractExecutorService implements 
CommitLogExecutorServiceMBean
+class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService 
implements ICommitLogExecutorService, BatchCommitLogExecutorServiceMBean
 {
     private final BlockingQueue<CheaterFutureTask> queue;
 
-    private volatile long completedTaskCount = 0;
-
-    public CommitLogExecutorService()
+    public BatchCommitLogExecutorService()
     {
-        this(DatabaseDescriptor.getCommitLogSync() == 
DatabaseDescriptor.CommitLogSync.batch
-             ? DatabaseDescriptor.getConcurrentWriters()
-             : 1024 * Runtime.getRuntime().availableProcessors());
+        this(DatabaseDescriptor.getConcurrentWriters());
     }
 
-    public CommitLogExecutorService(int queueSize)
+    public BatchCommitLogExecutorService(int queueSize)
     {
         queue = new LinkedBlockingQueue<CheaterFutureTask>(queueSize);
         Runnable runnable = new WrappedRunnable()
         {
             public void runMayThrow() throws Exception
             {
-                if (DatabaseDescriptor.getCommitLogSync() == 
DatabaseDescriptor.CommitLogSync.batch)
-                {
-                    while (true)
-                    {
-                        processWithSyncBatch();
-                        completedTaskCount++;
-                    }
-                }
-                else
+                while (true)
                 {
-                    while (true)
-                    {
-                        process();
-                        completedTaskCount++;
-                    }
+                    processWithSyncBatch();
+                    completedTaskCount++;
                 }
             }
         };
         new Thread(runnable, "COMMIT-LOG-WRITER").start();
 
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(this, new 
ObjectName("org.apache.cassandra.db:type=Commitlog"));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        registerMBean(this);
     }
 
-
-    /**
-     * Get the current number of running tasks
-     */
-    public int getActiveCount()
-    {
-        return 1;
-    }
-
-    /**
-     * Get the number of completed tasks
-     */
-    public long getCompletedTasks()
-    {
-        return completedTaskCount;
-    }
-
-    /**
-     * Get the number of tasks waiting to be executed
-     */
     public long getPendingTasks()
     {
         return queue.size();
     }
 
-    private void process() throws InterruptedException
-    {
-        queue.take().run();
-    }
-
     private final ArrayList<CheaterFutureTask> incompleteTasks = new 
ArrayList<CheaterFutureTask>();
     private final ArrayList taskValues = new ArrayList(); // TODO not sure how 
to generify this
     private void processWithSyncBatch() throws Exception
@@ -184,30 +132,20 @@ class CommitLogExecutorService extends A
         }
     }
 
-    public boolean isShutdown()
+    public void add(CommitLog.LogRecordAdder adder)
     {
-        return false;
-    }
-
-    public boolean isTerminated()
-    {
-        return false;
-    }
-
-    // cassandra is crash-only so there's no need to implement the shutdown 
methods
-    public void shutdown()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public List<Runnable> shutdownNow()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException
-    {
-        throw new UnsupportedOperationException();
+        try
+        {
+            submit((Callable)adder).get();
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     private static class CheaterFutureTask<V> extends FutureTask<V>

Propchange: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorServiceMBean.java
 (from r933475, 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorServiceMBean.java)
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorServiceMBean.java?p2=cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorServiceMBean.java&p1=cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorServiceMBean.java&r1=933475&r2=939004&rev=939004&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorServiceMBean.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorServiceMBean.java
 Wed Apr 28 16:30:20 2010
@@ -20,6 +20,6 @@ package org.apache.cassandra.db.commitlo
 
 import org.apache.cassandra.concurrent.IExecutorMBean;
 
-public interface CommitLogExecutorServiceMBean extends IExecutorMBean
+public interface BatchCommitLogExecutorServiceMBean extends IExecutorMBean
 {
 }
\ No newline at end of file

Propchange: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorServiceMBean.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=939004&r1=939003&r2=939004&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
 Wed Apr 28 16:30:20 2010
@@ -97,7 +97,7 @@ public class CommitLog
         return segments.size();
     }
 
-    private final ExecutorService executor = new CommitLogExecutorService();
+    private final ICommitLogExecutorService executor;
 
     /**
      * param @ table - name of table for which we are maintaining
@@ -114,11 +114,13 @@ public class CommitLog
         
         if (DatabaseDescriptor.getCommitLogSync() == 
DatabaseDescriptor.CommitLogSync.periodic)
         {
-            final Runnable syncer = new WrappedRunnable()
+            executor = new PeriodicCommitLogExecutorService();
+            final Callable syncer = new Callable()
             {
-                public void runMayThrow() throws IOException
+                public Object call() throws Exception
                 {
                     sync();
+                    return null;
                 }
             };
 
@@ -145,6 +147,10 @@ public class CommitLog
                 }
             }, "PERIODIC-COMMIT-LOG-SYNCER").start();
         }
+        else
+        {
+            executor = new BatchCommitLogExecutorService();
+        }
     }
 
     public static void recover() throws IOException
@@ -322,10 +328,9 @@ public class CommitLog
      * of any problems. This way we can assume that the subsequent commit log
      * entry will override the garbage left over by the previous write.
     */
-    public Future<CommitLogSegment.CommitLogContext> add(RowMutation 
rowMutation, Object serializedRow) throws IOException
+    public void add(RowMutation rowMutation, Object serializedRow) throws 
IOException
     {
-        Callable<CommitLogSegment.CommitLogContext> task = new 
LogRecordAdder(rowMutation, serializedRow);
-        return executor.submit(task);
+        executor.add(new LogRecordAdder(rowMutation, serializedRow));
     }
 
     /*
@@ -450,7 +455,9 @@ public class CommitLog
         }
     }
 
-    class LogRecordAdder implements Callable<CommitLogSegment.CommitLogContext>
+    // TODO this should be a Runnable since it doesn't actually return 
anything, but it's difficult to do that
+    // without breaking the fragile CheaterFutureTask in BatchCLES.
+    class LogRecordAdder implements Callable, Runnable
     {
         final RowMutation rowMutation;
         final Object serializedRow;
@@ -461,18 +468,28 @@ public class CommitLog
             this.serializedRow = serializedRow;
         }
 
-        public CommitLogSegment.CommitLogContext call() throws Exception
+        public void run()
         {
-            CommitLogSegment.CommitLogContext context = 
currentSegment().write(rowMutation, serializedRow);
-
-            // roll log if necessary
-            if (currentSegment().length() >= SEGMENT_SIZE)
+            try
             {
-                sync();
-                segments.add(new 
CommitLogSegment(currentSegment().getHeader().getColumnFamilyCount()));
+                currentSegment().write(rowMutation, serializedRow);
+                // roll log if necessary
+                if (currentSegment().length() >= SEGMENT_SIZE)
+                {
+                    sync();
+                    segments.add(new 
CommitLogSegment(currentSegment().getHeader().getColumnFamilyCount()));
+                }
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
             }
+        }
 
-            return context;
+        public Object call() throws Exception
+        {
+            run();
+            return null;
         }
     }
 }

Added: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java?rev=939004&view=auto
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
 (added)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
 Wed Apr 28 16:30:20 2010
@@ -0,0 +1,20 @@
+package org.apache.cassandra.db.commitlog;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+import org.apache.cassandra.concurrent.IExecutorMBean;
+
+/**
+ * Like ExecutorService, but customized for batch and periodic commitlog 
execution.
+ */
+public interface ICommitLogExecutorService extends IExecutorMBean
+{
+    public <T> Future<T> submit(Callable<T> task);
+
+    /**
+     * submits the adder for execution and blocks for it to be synced, if 
necessary
+     */
+    public void add(CommitLog.LogRecordAdder adder);
+
+}

Copied: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
 (from r933475, 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java)
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java?p2=cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java&p1=cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java&r1=933475&r2=939004&rev=939004&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
 Wed Apr 28 16:30:20 2010
@@ -22,213 +22,81 @@ package org.apache.cassandra.db.commitlo
 
 
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.*;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.WrappedRunnable;
 
-class CommitLogExecutorService extends AbstractExecutorService implements 
CommitLogExecutorServiceMBean
+class PeriodicCommitLogExecutorService implements ICommitLogExecutorService, 
PeriodicCommitLogExecutorServiceMBean
 {
-    private final BlockingQueue<CheaterFutureTask> queue;
+    private final BlockingQueue<Runnable> queue;
+    protected volatile long completedTaskCount = 0;
 
-    private volatile long completedTaskCount = 0;
-
-    public CommitLogExecutorService()
+    public PeriodicCommitLogExecutorService()
     {
-        this(DatabaseDescriptor.getCommitLogSync() == 
DatabaseDescriptor.CommitLogSync.batch
-             ? DatabaseDescriptor.getConcurrentWriters()
-             : 1024 * Runtime.getRuntime().availableProcessors());
+        this(1024 * Runtime.getRuntime().availableProcessors());
     }
 
-    public CommitLogExecutorService(int queueSize)
+    public PeriodicCommitLogExecutorService(int queueSize)
     {
-        queue = new LinkedBlockingQueue<CheaterFutureTask>(queueSize);
+        queue = new LinkedBlockingQueue<Runnable>(queueSize);
         Runnable runnable = new WrappedRunnable()
         {
             public void runMayThrow() throws Exception
             {
-                if (DatabaseDescriptor.getCommitLogSync() == 
DatabaseDescriptor.CommitLogSync.batch)
-                {
-                    while (true)
-                    {
-                        processWithSyncBatch();
-                        completedTaskCount++;
-                    }
-                }
-                else
+                while (true)
                 {
-                    while (true)
-                    {
-                        process();
-                        completedTaskCount++;
-                    }
+                    queue.take().run();
+                    completedTaskCount++;
                 }
             }
         };
         new Thread(runnable, "COMMIT-LOG-WRITER").start();
 
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(this, new 
ObjectName("org.apache.cassandra.db:type=Commitlog"));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-
-    /**
-     * Get the current number of running tasks
-     */
-    public int getActiveCount()
-    {
-        return 1;
-    }
-
-    /**
-     * Get the number of completed tasks
-     */
-    public long getCompletedTasks()
-    {
-        return completedTaskCount;
+        AbstractCommitLogExecutorService.registerMBean(this);
     }
 
-    /**
-     * Get the number of tasks waiting to be executed
-     */
-    public long getPendingTasks()
+    public void add(CommitLog.LogRecordAdder adder)
     {
-        return queue.size();
-    }
-
-    private void process() throws InterruptedException
-    {
-        queue.take().run();
-    }
-
-    private final ArrayList<CheaterFutureTask> incompleteTasks = new 
ArrayList<CheaterFutureTask>();
-    private final ArrayList taskValues = new ArrayList(); // TODO not sure how 
to generify this
-    private void processWithSyncBatch() throws Exception
-    {
-        CheaterFutureTask firstTask = queue.take();
-        if (!(firstTask.getRawCallable() instanceof CommitLog.LogRecordAdder))
-        {
-            firstTask.run();
-            return;
-        }
-
-        // attempt to do a bunch of LogRecordAdder ops before syncing
-        // (this is a little clunky since there is no blocking peek method,
-        //  so we have to break it into firstTask / extra tasks)
-        incompleteTasks.clear();
-        taskValues.clear();
-        long end = System.nanoTime() + (long)(1000000 * 
DatabaseDescriptor.getCommitLogSyncBatchWindow());
-
-        // it doesn't seem worth bothering future-izing the exception
-        // since if a commitlog op throws, we're probably screwed anyway
-        incompleteTasks.add(firstTask);
-        taskValues.add(firstTask.getRawCallable().call());
-        while (!queue.isEmpty()
-               && queue.peek().getRawCallable() instanceof 
CommitLog.LogRecordAdder
-               && System.nanoTime() < end)
-        {
-            CheaterFutureTask task = queue.remove();
-            incompleteTasks.add(task);
-            taskValues.add(task.getRawCallable().call());
-        }
-
-        // now sync and set the tasks' values (which allows thread calling 
get() to proceed)
         try
         {
-            CommitLog.instance().sync();
+            queue.put(adder);
         }
-        catch (IOException e)
+        catch (InterruptedException e)
         {
             throw new RuntimeException(e);
         }
-        for (int i = 0; i < incompleteTasks.size(); i++)
-        {
-            incompleteTasks.get(i).set(taskValues.get(i));
-        }
     }
 
-
-    @Override
-    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
-    {
-        return newTaskFor(Executors.callable(runnable, value));
-    }
-
-    @Override
-    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
-    {
-        return new CheaterFutureTask(callable);
-    }
-
-    public void execute(Runnable command)
+    public <T> Future<T> submit(Callable<T> task)
     {
+        FutureTask<T> ft = new FutureTask<T>(task);
         try
         {
-            queue.put((CheaterFutureTask)command);
+            queue.put(ft);
         }
         catch (InterruptedException e)
         {
             throw new RuntimeException(e);
         }
+        return ft;
     }
 
-    public boolean isShutdown()
-    {
-        return false;
-    }
-
-    public boolean isTerminated()
-    {
-        return false;
-    }
-
-    // cassandra is crash-only so there's no need to implement the shutdown 
methods
-    public void shutdown()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public List<Runnable> shutdownNow()
+    public long getPendingTasks()
     {
-        throw new UnsupportedOperationException();
+        return queue.size();
     }
 
-    public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException
+    public int getActiveCount()
     {
-        throw new UnsupportedOperationException();
+        return 1;
     }
 
-    private static class CheaterFutureTask<V> extends FutureTask<V>
+    public long getCompletedTasks()
     {
-        private final Callable rawCallable;
-
-        public CheaterFutureTask(Callable<V> callable)
-        {
-            super(callable);
-            rawCallable = callable;
-        }
-
-        public Callable getRawCallable()
-        {
-            return rawCallable;
-        }
-
-        @Override
-        public void set(V v)
-        {
-            super.set(v);
-        }
+        return completedTaskCount;
     }
-}
+}
\ No newline at end of file

Added: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorServiceMBean.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorServiceMBean.java?rev=939004&view=auto
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorServiceMBean.java
 (added)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorServiceMBean.java
 Wed Apr 28 16:30:20 2010
@@ -0,0 +1,25 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.commitlog;
+
+import org.apache.cassandra.concurrent.IExecutorMBean;
+
+public interface PeriodicCommitLogExecutorServiceMBean extends IExecutorMBean
+{
+}
\ No newline at end of file


Reply via email to