keith-turner closed pull request #975: [WIP] Issue 722
URL: https://github.com/apache/fluo/pull/975
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java
 
b/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java
index 75a9fa14..6f35fbc2 100644
--- 
a/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java
+++ 
b/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java
@@ -19,15 +19,12 @@
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableList.Builder;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.accumulo.core.client.ConditionalWriter;
 import org.apache.accumulo.core.client.ConditionalWriter.Result;
 import org.apache.accumulo.core.data.ConditionalMutation;
@@ -36,11 +33,10 @@
 import org.apache.fluo.core.util.FluoExecutors;
 import org.apache.fluo.core.util.Limit;
 
-public class AsyncConditionalWriter
-    implements AsyncFunction<Collection<ConditionalMutation>, 
Iterator<Result>> {
+public class AsyncConditionalWriter {
 
   private final ConditionalWriter cw;
-  private final ListeningExecutorService les;
+  private final ExecutorService es;
   private final Limit semaphore;
 
 
@@ -50,55 +46,38 @@ public AsyncConditionalWriter(Environment env, 
ConditionalWriter cw) {
         FluoConfigurationImpl.ASYNC_CW_THREADS_DEFAULT);
     int permits = 
env.getConfiguration().getInt(FluoConfigurationImpl.ASYNC_CW_LIMIT,
         FluoConfigurationImpl.ASYNC_CW_LIMIT_DEFAULT);
-    this.les =
-        
MoreExecutors.listeningDecorator(FluoExecutors.newFixedThreadPool(numThreads, 
"asyncCW"));
+    this.es = FluoExecutors.newFixedThreadPool(numThreads, "asyncCw");
     // the conditional writer currently has not memory limits... give it too 
much and it blows out
     // memory.. need to fix this in conditional writer
     // for now this needs to be memory based
     this.semaphore = new Limit(permits);
   }
 
-  private class IterTask implements Callable<Iterator<Result>> {
-
-    private Iterator<Result> input;
-    private int permitsAcquired;
-
-    public IterTask(Iterator<Result> iter, int permitsAcquired) {
-      this.input = iter;
-      this.permitsAcquired = permitsAcquired;
+  public CompletableFuture<Iterator<Result>> 
apply(Collection<ConditionalMutation> input) {
+    if (input.size() == 0) {
+      return 
CompletableFuture.completedFuture(Collections.<Result>emptyList().iterator());
     }
 
-    @Override
-    public Iterator<Result> call() throws Exception {
+    semaphore.acquire(input.size());
+    Iterator<Result> iter = cw.write(input.iterator());
+    return CompletableFuture.supplyAsync(() -> {
       try {
         Builder<Result> imlb = ImmutableList.builder();
-        while (input.hasNext()) {
-          Result result = input.next();
+        while (iter.hasNext()) {
+          Result result = iter.next();
           imlb.add(result);
         }
         return imlb.build().iterator();
       } finally {
-        semaphore.release(permitsAcquired);
+        semaphore.release(input.size());
       }
-    }
-
-  }
-
-  @Override
-  public ListenableFuture<Iterator<Result>> 
apply(Collection<ConditionalMutation> input) {
-    if (input.size() == 0) {
-      return 
Futures.immediateFuture(Collections.<Result>emptyList().iterator());
-    }
-
-    semaphore.acquire(input.size());
-    Iterator<Result> iter = cw.write(input.iterator());
-    return les.submit(new IterTask(iter, input.size()));
+    }, es);
   }
 
   public void close() {
-    les.shutdownNow();
+    es.shutdownNow();
     try {
-      les.awaitTermination(5, TimeUnit.SECONDS);
+      es.awaitTermination(5, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java 
b/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java
index 2b53cbfd..d87e9a71 100644
--- 
a/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java
+++ 
b/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java
@@ -21,12 +21,11 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.data.Mutation;
@@ -42,13 +41,15 @@
 
   private AtomicLong asyncBatchesAdded = new AtomicLong(0);
   private long asyncBatchesProcessed = 0;
+  // added to avoid findbugs false positive
+  private static final Supplier<Void> NULLS = () -> null;
 
   private static class MutationBatch {
 
     private Collection<Mutation> mutations;
     private CountDownLatch cdl;
     private boolean isAsync = false;
-    private ListenableFutureTask<Void> lf;
+    private CompletableFuture<Void> cf;
 
     public MutationBatch(Collection<Mutation> mutations, boolean isAsync) {
       this.mutations = mutations;
@@ -58,9 +59,9 @@ public MutationBatch(Collection<Mutation> mutations, boolean 
isAsync) {
       }
     }
 
-    public MutationBatch(Collection<Mutation> mutations, 
ListenableFutureTask<Void> lf) {
+    public MutationBatch(Collection<Mutation> mutations, 
CompletableFuture<Void> cf) {
       this.mutations = mutations;
-      this.lf = lf;
+      this.cf = cf;
       this.cdl = null;
       this.isAsync = false;
     }
@@ -70,8 +71,8 @@ public void countDown() {
         cdl.countDown();
       }
 
-      if (lf != null) {
-        lf.run();
+      if (cf != null) {
+        cf.complete(NULLS.get());
       }
     }
   }
@@ -170,27 +171,22 @@ void writeMutations(Collection<Mutation> ml) {
     }
   }
 
-  private static final Runnable DO_NOTHING = new Runnable() {
-    @Override
-    public void run() {}
-  };
-
-  ListenableFuture<Void> writeMutationsAsyncFuture(Collection<Mutation> ml) {
+  CompletableFuture<Void> writeMutationsAsyncFuture(Collection<Mutation> ml) {
     if (ml.size() == 0) {
-      return Futures.immediateFuture(null);
+      return CompletableFuture.completedFuture(NULLS.get());
     }
 
-    ListenableFutureTask<Void> lf = ListenableFutureTask.create(DO_NOTHING, 
null);
+    CompletableFuture<Void> cf = new CompletableFuture<>();
     try {
-      MutationBatch mb = new MutationBatch(ml, lf);
+      MutationBatch mb = new MutationBatch(ml, cf);
       mutQueue.put(mb);
-      return lf;
+      return cf;
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
-  ListenableFuture<Void> writeMutationsAsyncFuture(Mutation m) {
+  CompletableFuture<Void> writeMutationsAsyncFuture(Mutation m) {
     return writeMutationsAsyncFuture(Collections.singleton(m));
   }
 
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java 
b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index 0abdafed..94e4107b 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -26,15 +26,13 @@
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.ConditionalWriter;
@@ -818,33 +816,12 @@ public long getStartTimestamp() {
     return startTs;
   }
 
-  // async experiment
-
-  private abstract static class CommitCallback<V> implements FutureCallback<V> 
{
-
-    private CommitData cd;
-
-    CommitCallback(CommitData cd) {
-      this.cd = cd;
-    }
-
-    @Override
-    public void onSuccess(V result) {
-      try {
-        onSuccess(cd, result);
-      } catch (Exception e) {
-        cd.commitObserver.failed(e);
-      }
-    }
-
-    protected abstract void onSuccess(CommitData cd, V result) throws 
Exception;
-
-
-    @Override
-    public void onFailure(Throwable t) {
-      cd.commitObserver.failed(t);
-    }
-
+  /**
+   * Funcitonal interface to provide next step of asynchronous commit on 
successful completion of
+   * the previous one
+   */
+  private static interface OnSuccessInterface<V> {
+    public void onSuccess(V result) throws Exception;
   }
 
   private abstract static class SynchronousCommitTask implements Runnable {
@@ -895,6 +872,24 @@ public int getSize() {
     return size;
   }
 
+  private <V> void addCallback(CompletableFuture<V> cfuture, CommitData cd,
+      OnSuccessInterface<V> onSuccessInterface) {
+    cfuture.handleAsync((result, exception) -> {
+      if (exception != null) {
+        cd.commitObserver.failed(exception);
+        return null;
+      } else {
+        try {
+          onSuccessInterface.onSuccess(result);
+          return null;
+        } catch (Exception e) {
+          cd.commitObserver.failed(e);
+          return null;
+        }
+      }
+    }, env.getSharedResources().getAsyncCommitExecutor());
+  }
+
   @Override
   public synchronized void commitAsync(AsyncCommitObserver commitCallback) {
 
@@ -972,13 +967,8 @@ private void beginCommitAsync(CommitData cd, 
AsyncCommitObserver commitCallback,
     final ConditionalMutation pcm =
         prewrite(cd.prow, cd.pcol, cd.pval, cd.prow, cd.pcol, 
isTriggerRow(cd.prow));
 
-    ListenableFuture<Iterator<Result>> future = 
cd.acw.apply(Collections.singletonList(pcm));
-    Futures.addCallback(future, new CommitCallback<Iterator<Result>>(cd) {
-      @Override
-      protected void onSuccess(CommitData cd, Iterator<Result> result) throws 
Exception {
-        postLockPrimary(cd, pcm, Iterators.getOnlyElement(result));
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
+    CompletableFuture<Iterator<Result>> cfuture = 
cd.acw.apply(Collections.singletonList(pcm));
+    addCallback(cfuture, cd, result -> postLockPrimary(cd, pcm, 
Iterators.getOnlyElement(result)));
   }
 
   private void postLockPrimary(final CommitData cd, final ConditionalMutation 
pcm, Result result)
@@ -1059,13 +1049,8 @@ private void lockOtherColumns(CommitData cd) {
 
     cd.acceptedRows = new HashSet<>();
 
-    ListenableFuture<Iterator<Result>> future = cd.bacw.apply(mutations);
-    Futures.addCallback(future, new CommitCallback<Iterator<Result>>(cd) {
-      @Override
-      protected void onSuccess(CommitData cd, Iterator<Result> results) throws 
Exception {
-        postLockOther(cd, results);
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
+    CompletableFuture<Iterator<Result>> cfuture = cd.bacw.apply(mutations);
+    addCallback(cfuture, cd, results -> postLockOther(cd, results));
   }
 
   private void postLockOther(final CommitData cd, Iterator<Result> results) 
throws Exception {
@@ -1092,13 +1077,8 @@ protected void runCommitStep(CommitData cd) throws 
Exception {
     } else if (stopAfterPreCommit) {
       cd.commitObserver.committed();
     } else {
-      ListenableFuture<Stamp> future = 
env.getSharedResources().getOracleClient().getStampAsync();
-      Futures.addCallback(future, new CommitCallback<Stamp>(cd) {
-        @Override
-        protected void onSuccess(CommitData cd, Stamp stamp) throws Exception {
-          beginSecondCommitPhase(cd, stamp);
-        }
-      }, env.getSharedResources().getAsyncCommitExecutor());
+      CompletableFuture<Stamp> cfuture = 
env.getSharedResources().getOracleClient().getStampAsync();
+      addCallback(cfuture, cd, stamp -> beginSecondCommitPhase(cd, stamp));
     }
   }
 
@@ -1124,14 +1104,9 @@ private void rollbackOtherLocks(CommitData cd) throws 
Exception {
       mutations.add(m);
     }
 
-    ListenableFuture<Void> future =
+    CompletableFuture<Void> cfuture =
         
env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations);
-    Futures.addCallback(future, new CommitCallback<Void>(cd) {
-      @Override
-      protected void onSuccess(CommitData cd, Void v) throws Exception {
-        rollbackPrimaryLock(cd);
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
+    addCallback(cfuture, cd, result -> rollbackPrimaryLock(cd));
   }
 
   private void rollbackPrimaryLock(CommitData cd) throws Exception {
@@ -1143,14 +1118,10 @@ private void rollbackPrimaryLock(CommitData cd) throws 
Exception {
         DelLockValue.encodeRollback(startTs, true, true));
     m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | startTs, EMPTY);
 
-    ListenableFuture<Void> future =
+    CompletableFuture<Void> cfuture =
         env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(m);
-    Futures.addCallback(future, new CommitCallback<Void>(cd) {
-      @Override
-      protected void onSuccess(CommitData cd, Void v) throws Exception {
-        cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
+    addCallback(cfuture, cd,
+        result -> 
cd.commitObserver.commitFailed(cd.getShortCollisionMessage()));
   }
 
   private void beginSecondCommitPhase(CommitData cd, Stamp commitStamp) throws 
Exception {
@@ -1213,14 +1184,9 @@ private void writeNotificationsAsync(CommitData cd, 
final long commitTs) {
       }
     }
 
-    ListenableFuture<Void> future =
+    CompletableFuture<Void> cfuture =
         
env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations.values());
-    Futures.addCallback(future, new CommitCallback<Void>(cd) {
-      @Override
-      protected void onSuccess(CommitData cd, Void v) throws Exception {
-        commmitPrimary(cd, commitTs);
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
+    addCallback(cfuture, cd, result -> commmitPrimary(cd, commitTs));
   }
 
   private void commmitPrimary(CommitData cd, final long commitTs) {
@@ -1237,15 +1203,10 @@ private void commmitPrimary(CommitData cd, final long 
commitTs) {
     ColumnUtil.commitColumn(env, isTrigger, true, cd.pcol, isWrite(cd.pval), 
isDelete(cd.pval),
         isReadLock(cd.pval), startTs, commitTs, observedColumns, 
delLockMutation);
 
-    ListenableFuture<Iterator<Result>> future =
+    CompletableFuture<Iterator<Result>> cfuture =
         cd.acw.apply(Collections.singletonList(delLockMutation));
-    Futures.addCallback(future, new CommitCallback<Iterator<Result>>(cd) {
-      @Override
-      protected void onSuccess(CommitData cd, Iterator<Result> result) throws 
Exception {
-        handleUnkownStatsAfterPrimary(cd, commitTs, delLockMutation,
-            Iterators.getOnlyElement(result));
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
+    addCallback(cfuture, cd, result -> handleUnkownStatsAfterPrimary(cd, 
commitTs, delLockMutation,
+        Iterators.getOnlyElement(result)));
   }
 
   private void handleUnkownStatsAfterPrimary(CommitData cd, final long 
commitTs,
@@ -1321,16 +1282,9 @@ private void deleteLocks(CommitData cd, final long 
commitTs) {
       mutations.add(m);
     }
 
-
-    ListenableFuture<Void> future =
+    CompletableFuture<Void> cfuture =
         
env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations);
-    Futures.addCallback(future, new CommitCallback<Void>(cd) {
-      @Override
-      protected void onSuccess(CommitData cd, Void v) throws Exception {
-        finishCommit(cd, commitTs);
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
-
+    addCallback(cfuture, cd, result -> finishCommit(cd, commitTs));
   }
 
   @VisibleForTesting
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java 
b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
index 012c4a84..9c112397 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
@@ -19,6 +19,7 @@
 import java.util.ArrayList;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -27,8 +28,6 @@
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Timer;
 import com.codahale.metrics.Timer.Context;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
@@ -70,15 +69,10 @@
 
   private Participant currentLeader;
 
-  private static final class TimeRequest implements Callable<Stamp> {
+  private static final class TimeRequest {
     CountDownLatch cdl = new CountDownLatch(1);
     AtomicReference<Stamp> stampRef = new AtomicReference<>();
-    ListenableFutureTask<Stamp> lf = null;
-
-    @Override
-    public Stamp call() throws Exception {
-      return stampRef.get();
-    }
+    CompletableFuture<Stamp> cf = null;
   }
 
   private class TimestampRetriever extends LeaderSelectorListenerAdapter
@@ -211,11 +205,12 @@ private void doWork() {
 
           for (int i = 0; i < request.size(); i++) {
             TimeRequest tr = request.get(i);
-            tr.stampRef.set(new Stamp(txStampsStart + i, gcStamp));
-            if (tr.lf == null) {
+            Stamp stampRes = new Stamp(txStampsStart + i, gcStamp);
+            tr.stampRef.set(stampRes);
+            if (tr.cf == null) {
               tr.cdl.countDown();
             } else {
-              tr.lf.run();
+              tr.cf.complete(stampRes);
             }
           }
         } catch (InterruptedException e) {
@@ -386,18 +381,18 @@ public Stamp getStamp() {
     return tr.stampRef.get();
   }
 
-  public ListenableFuture<Stamp> getStampAsync() {
+  public CompletableFuture<Stamp> getStampAsync() {
     checkClosed();
 
     TimeRequest tr = new TimeRequest();
-    ListenableFutureTask<Stamp> lf = ListenableFutureTask.create(tr);
-    tr.lf = lf;
+    CompletableFuture<Stamp> cf = new CompletableFuture<>();
+    tr.cf = cf;
     try {
       queue.put(tr);
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
-    return lf;
+    return cf;
   }
 
   /**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to