[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-tephra/pull/32


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100208194
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
 ---
@@ -74,30 +81,35 @@ public Void call() throws Exception {
 futureList.clear();
 numOps.set(NUM_OPS);
 // Start thread that release PruneUpperBoundWriters
-executor = Executors.newFixedThreadPool(3);
-for (int i = 0; i < 3; i++) {
-  futureList.add(executor.submit(new Callable() {
+executor = Executors.newFixedThreadPool(NUM_THREADS);
+for (int i = 0; i < NUM_THREADS; i++) {
+  futureList.add(executor.submit(new Runnable() {
 
 @Override
-public Void call() throws Exception {
+public void run() {
   // We need to release all NUM_OPS 'gets' that were executed to 
trigger shutdown of the single instance of
   // PruneUpperBoundWriter
-  while (numOps.decrementAndGet() >= 0) {
+  while (numOps.decrementAndGet() > 0) {
 supplier.release();
-TimeUnit.MICROSECONDS.sleep(random.nextInt(10));
+try {
+  TimeUnit.MICROSECONDS.sleep(random.nextInt(10));
+} catch (InterruptedException e) {
+  LOG.warn("Received an exception.", e);
+}
   }
-  return null;
 }
   }));
 }
 
 for (Future future : futureList) {
   future.get(1, TimeUnit.SECONDS);
 }
+// Since we got one instance in the beginning, we need to release it
+supplier.release();
--- End diff --

It would be good to assert that the writer is running and alive before we 
do the final release


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100201292
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
 ---
@@ -18,77 +18,85 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Thread that will write the the prune upper bound
  */
-public class PruneUpperBoundWriter {
-  private static final Log LOG = 
LogFactory.getLog(TransactionProcessor.class);
+public class PruneUpperBoundWriter extends AbstractIdleService {
--- End diff --

It would be good to add a comment saying this should not be started/stopped 
by itself. Instead use `PruneUpperBoundWriterSupplier`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100202379
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+
+import com.google.common.base.Supplier;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+
+/**
+ * Supplies instances of {@link PruneUpperBoundWriter} implementations.
+ */
+public class PruneUpperBoundWriterSupplier implements 
Supplier {
+  private static final Log LOG = 
LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
+
+  private static volatile PruneUpperBoundWriter instance;
+  private static volatile int refCount = 0;
+  private static final Object lock = new Object();
+
+  private final TableName tableName;
+  private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
+
+  public PruneUpperBoundWriterSupplier(TableName tableName, 
DataJanitorState dataJanitorState,
+   long pruneFlushInterval) {
+this.tableName = tableName;
+this.dataJanitorState = dataJanitorState;
+this.pruneFlushInterval = pruneFlushInterval;
+  }
+
+  @Override
+  public PruneUpperBoundWriter get() {
+synchronized (lock) {
+  if (instance == null) {
+instance = new PruneUpperBoundWriter(tableName, dataJanitorState, 
pruneFlushInterval);
+instance.startAndWait();
+  }
+  refCount++;
+  LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " 
+ refCount);
+  return instance;
+}
+  }
+
+  public void release() {
+synchronized (lock) {
+  refCount--;
+  LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " 
+ refCount);
--- End diff --

Same here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100200476
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
 ---
@@ -18,77 +18,85 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Thread that will write the the prune upper bound
  */
-public class PruneUpperBoundWriter {
-  private static final Log LOG = 
LogFactory.getLog(TransactionProcessor.class);
+public class PruneUpperBoundWriter extends AbstractIdleService {
+  private static final Log LOG = 
LogFactory.getLog(PruneUpperBoundWriter.class);
 
-  private final TableName pruneStateTable;
+  private final TableName tableName;
   private final DataJanitorState dataJanitorState;
-  private final byte[] regionName;
-  private final String regionNameAsString;
   private final long pruneFlushInterval;
-  private final AtomicLong pruneUpperBound;
-  private final AtomicBoolean shouldFlush;
+  private final ConcurrentSkipListMap pruneEntries;
+
+  private volatile boolean stopped;
+  private volatile Thread flushThread;
 
-  private Thread flushThread;
   private long lastChecked;
 
-  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, 
TableName pruneStateTable, String regionNameAsString,
-   byte[] regionName, long pruneFlushInterval) 
{
-this.pruneStateTable = pruneStateTable;
+  public PruneUpperBoundWriter(TableName tableName, DataJanitorState 
dataJanitorState, long pruneFlushInterval) {
+this.tableName = tableName;
 this.dataJanitorState = dataJanitorState;
-this.regionName = regionName;
-this.regionNameAsString = regionNameAsString;
 this.pruneFlushInterval = pruneFlushInterval;
-this.pruneUpperBound = new AtomicLong();
-this.shouldFlush = new AtomicBoolean(false);
-startFlushThread();
+this.pruneEntries = new ConcurrentSkipListMap<>();
+  }
+
+  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+// The number of entries in this map is bound by the number of regions 
in this region server and thus it will not
+// grow indefinitely
+pruneEntries.put(ByteBuffer.wrap(regionName), pruneUpperBound);
   }
 
   public boolean isAlive() {
-return flushThread.isAlive();
+return flushThread != null && flushThread.isAlive();
   }
 
-  public void persistPruneEntry(long pruneUpperBound) {
-this.pruneUpperBound.set(pruneUpperBound);
-this.shouldFlush.set(true);
+  @Override
+  protected void startUp() throws Exception {
+LOG.info("Starting PruneUpperBoundWriter Thread.");
+startFlushThread();
   }
 
-  public void stop() {
+  @Override
+  protected void shutDown() throws Exception {
 if (flushThread != null) {
+  stopped = true;
+  LOG.info("Stopping PruneUpperBoundWriter Thread.");
   flushThread.interrupt();
+  flushThread.join(TimeUnit.SECONDS.toMillis(1));
 }
   }
 
   private void startFlushThread() {
 flushThread = new Thread("tephra-prune-upper-bound-writer") {
   @Override
   public void run() {
-while (!isInterrupted()) {
+while (!isInterrupted() && !stopped) {
--- End diff --

We can replace the check for `stopped` with `isRunning()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100193873
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
 ---
@@ -18,56 +18,56 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Thread that will write the the prune upper bound
  */
-public class PruneUpperBoundWriter {
-  private static final Log LOG = 
LogFactory.getLog(TransactionProcessor.class);
+public class PruneUpperBoundWriter extends AbstractIdleService {
+  private static final Log LOG = 
LogFactory.getLog(PruneUpperBoundWriter.class);
 
-  private final TableName pruneStateTable;
   private final DataJanitorState dataJanitorState;
-  private final byte[] regionName;
-  private final String regionNameAsString;
   private final long pruneFlushInterval;
-  private final AtomicLong pruneUpperBound;
-  private final AtomicBoolean shouldFlush;
+  private final ConcurrentSkipListMap pruneEntries;
--- End diff --

Use `new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100202356
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+
+import com.google.common.base.Supplier;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+
+/**
+ * Supplies instances of {@link PruneUpperBoundWriter} implementations.
+ */
+public class PruneUpperBoundWriterSupplier implements 
Supplier {
+  private static final Log LOG = 
LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
+
+  private static volatile PruneUpperBoundWriter instance;
+  private static volatile int refCount = 0;
+  private static final Object lock = new Object();
+
+  private final TableName tableName;
+  private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
+
+  public PruneUpperBoundWriterSupplier(TableName tableName, 
DataJanitorState dataJanitorState,
+   long pruneFlushInterval) {
+this.tableName = tableName;
+this.dataJanitorState = dataJanitorState;
+this.pruneFlushInterval = pruneFlushInterval;
+  }
+
+  @Override
+  public PruneUpperBoundWriter get() {
+synchronized (lock) {
+  if (instance == null) {
+instance = new PruneUpperBoundWriter(tableName, dataJanitorState, 
pruneFlushInterval);
+instance.startAndWait();
+  }
+  refCount++;
+  LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " 
+ refCount);
--- End diff --

Would be good to enclose this log statement with `LOG.isDebugEnabled()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100202843
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Tests to verify the behavior of {@link PruneUpperBoundWriterSupplier}.
+ */
+public class PruneUpperBoundWriterSupplierTest {
+  private static final int NUM_OPS = 10;
+
+  @Test
+  public void testSupplier() throws Exception {
+final PruneUpperBoundWriterSupplier supplier = new 
PruneUpperBoundWriterSupplier(null, null, 10L);
+final PruneUpperBoundWriter writer = supplier.get();
+final AtomicInteger numOps = new AtomicInteger(NUM_OPS);
+final Random random = new Random(System.currentTimeMillis());
+
+// Start threads that will 'get' PruneUpperBoundWriters
+ExecutorService executor = Executors.newFixedThreadPool(3);
+List futureList = new ArrayList<>();
+for (int i = 0; i < 3; i++) {
+  futureList.add(executor.submit(new Callable() {
+
+@Override
+public Void call() throws Exception {
+  // Since we already got one PruneUpperBoundWriter, we need to 
get NUM_OPS - 1
+  while (numOps.decrementAndGet() > 0) {
+PruneUpperBoundWriter newWriter = supplier.get();
+Assert.assertTrue(newWriter == writer);
+int waitTime = random.nextInt(10);
+TimeUnit.MICROSECONDS.sleep(waitTime);
+  }
+  return null;
+}
+  }));
+}
+
+for (Future future : futureList) {
+  future.get(5, TimeUnit.SECONDS);
+}
+executor.shutdown();
+executor.awaitTermination(2, TimeUnit.SECONDS);
+
+futureList.clear();
+numOps.set(NUM_OPS);
+// Start thread that release PruneUpperBoundWriters
+executor = Executors.newFixedThreadPool(3);
+for (int i = 0; i < 3; i++) {
+  futureList.add(executor.submit(new Callable() {
--- End diff --

Same here about using `Runnable`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100202630
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Tests to verify the behavior of {@link PruneUpperBoundWriterSupplier}.
+ */
+public class PruneUpperBoundWriterSupplierTest {
+  private static final int NUM_OPS = 10;
--- End diff --

Since this is a fast operation, let's make this a bigger number. 1000 maybe?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100202537
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
 ---
@@ -334,6 +337,46 @@ public Table get() throws IOException {
 }
   }
 
+  @Test(timeout = 6L)
+  public void testClusterShutdown() throws Exception {
--- End diff --

Is this test required now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100202882
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Tests to verify the behavior of {@link PruneUpperBoundWriterSupplier}.
+ */
+public class PruneUpperBoundWriterSupplierTest {
+  private static final int NUM_OPS = 10;
+
+  @Test
+  public void testSupplier() throws Exception {
+final PruneUpperBoundWriterSupplier supplier = new 
PruneUpperBoundWriterSupplier(null, null, 10L);
+final PruneUpperBoundWriter writer = supplier.get();
+final AtomicInteger numOps = new AtomicInteger(NUM_OPS);
+final Random random = new Random(System.currentTimeMillis());
+
+// Start threads that will 'get' PruneUpperBoundWriters
+ExecutorService executor = Executors.newFixedThreadPool(3);
+List futureList = new ArrayList<>();
+for (int i = 0; i < 3; i++) {
--- End diff --

It would be good to have more threads too


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100201530
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
 ---
@@ -18,77 +18,85 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Thread that will write the the prune upper bound
  */
-public class PruneUpperBoundWriter {
-  private static final Log LOG = 
LogFactory.getLog(TransactionProcessor.class);
+public class PruneUpperBoundWriter extends AbstractIdleService {
+  private static final Log LOG = 
LogFactory.getLog(PruneUpperBoundWriter.class);
 
-  private final TableName pruneStateTable;
+  private final TableName tableName;
   private final DataJanitorState dataJanitorState;
-  private final byte[] regionName;
-  private final String regionNameAsString;
   private final long pruneFlushInterval;
-  private final AtomicLong pruneUpperBound;
-  private final AtomicBoolean shouldFlush;
+  private final ConcurrentSkipListMap pruneEntries;
+
+  private volatile boolean stopped;
+  private volatile Thread flushThread;
 
-  private Thread flushThread;
   private long lastChecked;
 
-  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, 
TableName pruneStateTable, String regionNameAsString,
-   byte[] regionName, long pruneFlushInterval) 
{
-this.pruneStateTable = pruneStateTable;
+  public PruneUpperBoundWriter(TableName tableName, DataJanitorState 
dataJanitorState, long pruneFlushInterval) {
+this.tableName = tableName;
 this.dataJanitorState = dataJanitorState;
-this.regionName = regionName;
-this.regionNameAsString = regionNameAsString;
 this.pruneFlushInterval = pruneFlushInterval;
-this.pruneUpperBound = new AtomicLong();
-this.shouldFlush = new AtomicBoolean(false);
-startFlushThread();
+this.pruneEntries = new ConcurrentSkipListMap<>();
+  }
+
+  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+// The number of entries in this map is bound by the number of regions 
in this region server and thus it will not
+// grow indefinitely
+pruneEntries.put(ByteBuffer.wrap(regionName), pruneUpperBound);
   }
 
   public boolean isAlive() {
-return flushThread.isAlive();
+return flushThread != null && flushThread.isAlive();
   }
 
-  public void persistPruneEntry(long pruneUpperBound) {
-this.pruneUpperBound.set(pruneUpperBound);
-this.shouldFlush.set(true);
+  @Override
+  protected void startUp() throws Exception {
+LOG.info("Starting PruneUpperBoundWriter Thread.");
+startFlushThread();
   }
 
-  public void stop() {
+  @Override
+  protected void shutDown() throws Exception {
 if (flushThread != null) {
+  stopped = true;
+  LOG.info("Stopping PruneUpperBoundWriter Thread.");
--- End diff --

It would be good to move this log message outside the if condition


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100202801
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Tests to verify the behavior of {@link PruneUpperBoundWriterSupplier}.
+ */
+public class PruneUpperBoundWriterSupplierTest {
+  private static final int NUM_OPS = 10;
+
+  @Test
+  public void testSupplier() throws Exception {
+final PruneUpperBoundWriterSupplier supplier = new 
PruneUpperBoundWriterSupplier(null, null, 10L);
+final PruneUpperBoundWriter writer = supplier.get();
+final AtomicInteger numOps = new AtomicInteger(NUM_OPS);
+final Random random = new Random(System.currentTimeMillis());
+
+// Start threads that will 'get' PruneUpperBoundWriters
+ExecutorService executor = Executors.newFixedThreadPool(3);
+List futureList = new ArrayList<>();
+for (int i = 0; i < 3; i++) {
+  futureList.add(executor.submit(new Callable() {
--- End diff --

`Callable` can be replaced with `Runnable` so that the return statement is 
not required.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100203117
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Tests to verify the behavior of {@link PruneUpperBoundWriterSupplier}.
+ */
+public class PruneUpperBoundWriterSupplierTest {
+  private static final int NUM_OPS = 10;
+
+  @Test
+  public void testSupplier() throws Exception {
+final PruneUpperBoundWriterSupplier supplier = new 
PruneUpperBoundWriterSupplier(null, null, 10L);
+final PruneUpperBoundWriter writer = supplier.get();
+final AtomicInteger numOps = new AtomicInteger(NUM_OPS);
+final Random random = new Random(System.currentTimeMillis());
+
+// Start threads that will 'get' PruneUpperBoundWriters
+ExecutorService executor = Executors.newFixedThreadPool(3);
+List futureList = new ArrayList<>();
+for (int i = 0; i < 3; i++) {
+  futureList.add(executor.submit(new Callable() {
+
+@Override
+public Void call() throws Exception {
+  // Since we already got one PruneUpperBoundWriter, we need to 
get NUM_OPS - 1
+  while (numOps.decrementAndGet() > 0) {
+PruneUpperBoundWriter newWriter = supplier.get();
+Assert.assertTrue(newWriter == writer);
+int waitTime = random.nextInt(10);
+TimeUnit.MICROSECONDS.sleep(waitTime);
+  }
+  return null;
+}
+  }));
+}
+
+for (Future future : futureList) {
+  future.get(5, TimeUnit.SECONDS);
+}
+executor.shutdown();
+executor.awaitTermination(2, TimeUnit.SECONDS);
+
+futureList.clear();
+numOps.set(NUM_OPS);
+// Start thread that release PruneUpperBoundWriters
+executor = Executors.newFixedThreadPool(3);
+for (int i = 0; i < 3; i++) {
+  futureList.add(executor.submit(new Callable() {
+
+@Override
+public Void call() throws Exception {
+  // We need to release all NUM_OPS 'gets' that were executed to 
trigger shutdown of the single instance of
+  // PruneUpperBoundWriter
+  while (numOps.decrementAndGet() >= 0) {
--- End diff --

Can we change this to leave one instance out, so that we can assert that 
that instance is still running after the n-1 instances were released back. We 
can later release it separately and then assert that it is not running.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread gokulavasan
Github user gokulavasan commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100187618
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
 ---
@@ -18,56 +18,56 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Thread that will write the the prune upper bound
  */
-public class PruneUpperBoundWriter {
-  private static final Log LOG = 
LogFactory.getLog(TransactionProcessor.class);
+public class PruneUpperBoundWriter extends AbstractIdleService {
+  private static final Log LOG = 
LogFactory.getLog(PruneUpperBoundWriter.class);
 
-  private final TableName pruneStateTable;
   private final DataJanitorState dataJanitorState;
-  private final byte[] regionName;
-  private final String regionNameAsString;
   private final long pruneFlushInterval;
-  private final AtomicLong pruneUpperBound;
-  private final AtomicBoolean shouldFlush;
+  private final ConcurrentSkipListMap pruneEntries;
+
+  private volatile boolean stopped;
+  private volatile Thread flushThread;
 
-  private Thread flushThread;
   private long lastChecked;
 
-  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, 
TableName pruneStateTable, String regionNameAsString,
-   byte[] regionName, long pruneFlushInterval) 
{
-this.pruneStateTable = pruneStateTable;
+  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, long 
pruneFlushInterval) {
 this.dataJanitorState = dataJanitorState;
-this.regionName = regionName;
-this.regionNameAsString = regionNameAsString;
 this.pruneFlushInterval = pruneFlushInterval;
-this.pruneUpperBound = new AtomicLong();
-this.shouldFlush = new AtomicBoolean(false);
-startFlushThread();
+this.pruneEntries = new ConcurrentSkipListMap<>();
+  }
+
+  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+// The number of entries in this map is bound by the number of regions 
in this region server and thus it will not
+// grow indefinitely
+pruneEntries.put(ByteBuffer.wrap(regionName), pruneUpperBound);
   }
 
   public boolean isAlive() {
-return flushThread.isAlive();
+return flushThread != null && flushThread.isAlive();
   }
 
-  public void persistPruneEntry(long pruneUpperBound) {
-this.pruneUpperBound.set(pruneUpperBound);
-this.shouldFlush.set(true);
+  @Override
+  protected void startUp() throws Exception {
+startFlushThread();
   }
 
-  public void stop() {
+  @Override
+  protected void shutDown() throws Exception {
 if (flushThread != null) {
--- End diff --

Added log statements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread gokulavasan
Github user gokulavasan commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100173006
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
 ---
@@ -18,56 +18,56 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Thread that will write the the prune upper bound
  */
-public class PruneUpperBoundWriter {
-  private static final Log LOG = 
LogFactory.getLog(TransactionProcessor.class);
+public class PruneUpperBoundWriter extends AbstractIdleService {
+  private static final Log LOG = 
LogFactory.getLog(PruneUpperBoundWriter.class);
 
-  private final TableName pruneStateTable;
   private final DataJanitorState dataJanitorState;
-  private final byte[] regionName;
-  private final String regionNameAsString;
   private final long pruneFlushInterval;
-  private final AtomicLong pruneUpperBound;
-  private final AtomicBoolean shouldFlush;
+  private final ConcurrentSkipListMap pruneEntries;
+
+  private volatile boolean stopped;
+  private volatile Thread flushThread;
 
-  private Thread flushThread;
   private long lastChecked;
 
-  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, 
TableName pruneStateTable, String regionNameAsString,
-   byte[] regionName, long pruneFlushInterval) 
{
-this.pruneStateTable = pruneStateTable;
+  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, long 
pruneFlushInterval) {
 this.dataJanitorState = dataJanitorState;
-this.regionName = regionName;
-this.regionNameAsString = regionNameAsString;
 this.pruneFlushInterval = pruneFlushInterval;
-this.pruneUpperBound = new AtomicLong();
-this.shouldFlush = new AtomicBoolean(false);
-startFlushThread();
+this.pruneEntries = new ConcurrentSkipListMap<>();
+  }
+
+  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+// The number of entries in this map is bound by the number of regions 
in this region server and thus it will not
+// grow indefinitely
+pruneEntries.put(ByteBuffer.wrap(regionName), pruneUpperBound);
   }
 
   public boolean isAlive() {
-return flushThread.isAlive();
+return flushThread != null && flushThread.isAlive();
   }
 
-  public void persistPruneEntry(long pruneUpperBound) {
-this.pruneUpperBound.set(pruneUpperBound);
-this.shouldFlush.set(true);
+  @Override
+  protected void startUp() throws Exception {
+startFlushThread();
   }
 
-  public void stop() {
+  @Override
+  protected void shutDown() throws Exception {
 if (flushThread != null) {
+  stopped = true;
   flushThread.interrupt();
 }
--- End diff --

What would be a good timeout?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread gokulavasan
Github user gokulavasan commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100172848
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
 ---
@@ -18,56 +18,56 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Thread that will write the the prune upper bound
  */
-public class PruneUpperBoundWriter {
-  private static final Log LOG = 
LogFactory.getLog(TransactionProcessor.class);
+public class PruneUpperBoundWriter extends AbstractIdleService {
+  private static final Log LOG = 
LogFactory.getLog(PruneUpperBoundWriter.class);
 
-  private final TableName pruneStateTable;
   private final DataJanitorState dataJanitorState;
-  private final byte[] regionName;
-  private final String regionNameAsString;
   private final long pruneFlushInterval;
-  private final AtomicLong pruneUpperBound;
-  private final AtomicBoolean shouldFlush;
+  private final ConcurrentSkipListMap pruneEntries;
--- End diff --

We need the key to be Comparable. Hence we need to use ByteBuffer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100054015
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
 ---
@@ -18,56 +18,56 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Thread that will write the the prune upper bound
  */
-public class PruneUpperBoundWriter {
-  private static final Log LOG = 
LogFactory.getLog(TransactionProcessor.class);
+public class PruneUpperBoundWriter extends AbstractIdleService {
+  private static final Log LOG = 
LogFactory.getLog(PruneUpperBoundWriter.class);
 
-  private final TableName pruneStateTable;
   private final DataJanitorState dataJanitorState;
-  private final byte[] regionName;
-  private final String regionNameAsString;
   private final long pruneFlushInterval;
-  private final AtomicLong pruneUpperBound;
-  private final AtomicBoolean shouldFlush;
+  private final ConcurrentSkipListMap pruneEntries;
+
+  private volatile boolean stopped;
+  private volatile Thread flushThread;
 
-  private Thread flushThread;
   private long lastChecked;
 
-  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, 
TableName pruneStateTable, String regionNameAsString,
-   byte[] regionName, long pruneFlushInterval) 
{
-this.pruneStateTable = pruneStateTable;
+  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, long 
pruneFlushInterval) {
 this.dataJanitorState = dataJanitorState;
-this.regionName = regionName;
-this.regionNameAsString = regionNameAsString;
 this.pruneFlushInterval = pruneFlushInterval;
-this.pruneUpperBound = new AtomicLong();
-this.shouldFlush = new AtomicBoolean(false);
-startFlushThread();
+this.pruneEntries = new ConcurrentSkipListMap<>();
+  }
+
+  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+// The number of entries in this map is bound by the number of regions 
in this region server and thus it will not
+// grow indefinitely
+pruneEntries.put(ByteBuffer.wrap(regionName), pruneUpperBound);
   }
 
   public boolean isAlive() {
-return flushThread.isAlive();
+return flushThread != null && flushThread.isAlive();
   }
 
-  public void persistPruneEntry(long pruneUpperBound) {
-this.pruneUpperBound.set(pruneUpperBound);
-this.shouldFlush.set(true);
+  @Override
+  protected void startUp() throws Exception {
+startFlushThread();
   }
 
-  public void stop() {
+  @Override
+  protected void shutDown() throws Exception {
 if (flushThread != null) {
--- End diff --

Also would be good to add a log statement saying "stopping 
PruneUpperBoundWriter" and "stopped PruneUpperBoundWriter"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100042435
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+
+import com.google.common.base.Supplier;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Supplies instances of {@link PruneUpperBoundWriter} implementations.
+ */
+public class PruneUpperBoundWriterSupplier implements 
Supplier {
+  private static final Log LOG = 
LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
+
+  private static volatile PruneUpperBoundWriter instance;
+  private static volatile int refCount = 0;
+  private static final Object lock = new Object();
+
+  private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
+
+  public PruneUpperBoundWriterSupplier(DataJanitorState dataJanitorState, 
long pruneFlushInterval) {
+this.dataJanitorState = dataJanitorState;
+this.pruneFlushInterval = pruneFlushInterval;
+  }
+
+  @Override
+  public PruneUpperBoundWriter get() {
+synchronized (lock) {
--- End diff --

We can make an `instance == null` check outside the lock. In case instance 
is not null just increment refCount and return.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100039577
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
 ---
@@ -18,56 +18,56 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Thread that will write the the prune upper bound
  */
-public class PruneUpperBoundWriter {
-  private static final Log LOG = 
LogFactory.getLog(TransactionProcessor.class);
+public class PruneUpperBoundWriter extends AbstractIdleService {
+  private static final Log LOG = 
LogFactory.getLog(PruneUpperBoundWriter.class);
 
-  private final TableName pruneStateTable;
   private final DataJanitorState dataJanitorState;
-  private final byte[] regionName;
-  private final String regionNameAsString;
   private final long pruneFlushInterval;
-  private final AtomicLong pruneUpperBound;
-  private final AtomicBoolean shouldFlush;
+  private final ConcurrentSkipListMap pruneEntries;
+
+  private volatile boolean stopped;
+  private volatile Thread flushThread;
 
-  private Thread flushThread;
   private long lastChecked;
 
-  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, 
TableName pruneStateTable, String regionNameAsString,
-   byte[] regionName, long pruneFlushInterval) 
{
-this.pruneStateTable = pruneStateTable;
+  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, long 
pruneFlushInterval) {
 this.dataJanitorState = dataJanitorState;
-this.regionName = regionName;
-this.regionNameAsString = regionNameAsString;
 this.pruneFlushInterval = pruneFlushInterval;
-this.pruneUpperBound = new AtomicLong();
-this.shouldFlush = new AtomicBoolean(false);
-startFlushThread();
+this.pruneEntries = new ConcurrentSkipListMap<>();
+  }
+
+  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+// The number of entries in this map is bound by the number of regions 
in this region server and thus it will not
+// grow indefinitely
+pruneEntries.put(ByteBuffer.wrap(regionName), pruneUpperBound);
   }
 
   public boolean isAlive() {
-return flushThread.isAlive();
+return flushThread != null && flushThread.isAlive();
   }
 
-  public void persistPruneEntry(long pruneUpperBound) {
-this.pruneUpperBound.set(pruneUpperBound);
-this.shouldFlush.set(true);
+  @Override
+  protected void startUp() throws Exception {
+startFlushThread();
   }
 
-  public void stop() {
+  @Override
+  protected void shutDown() throws Exception {
 if (flushThread != null) {
+  stopped = true;
   flushThread.interrupt();
 }
--- End diff --

Also add `flushThread.join()` after the interrupt to wait for the flush 
thread to stop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100048456
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.AbstractHBaseTableTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Tests to verify the behavior of {@link PruneUpperBoundWriterSupplier}.
+ */
+public class PruneUpperBoundWriterSupplierTest extends 
AbstractHBaseTableTest {
--- End diff --

Can you add comments on what this test is trying to do? I'm not sure I 
follow the test.

Also, do we need to start an HBase instance for this test? We just need to 
make sure that prune writer gets stopped when reference count reaches zero, 
right? In that case can we pass in DataJanitorState as null to 
PruneUpperBoundWriterSupplier, and not call persistPruneEntry?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100035027
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
 ---
@@ -167,7 +167,7 @@ protected Configuration 
getConfiguration(CoprocessorEnvironment env) {
   protected Supplier 
getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
 return new TransactionStateCacheSupplier(env.getConfiguration());
   }
-
+  
--- End diff --

If you reset this newline then this file will not have any changes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100043323
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
 ---
@@ -334,6 +337,45 @@ public Table get() throws IOException {
 }
   }
 
+  @Test(timeout = 6L)
+  public void testClusterShutdown() throws Exception {
+java.util.concurrent.ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+try {
+  // Create a new transaction snapshot
+  InMemoryTransactionStateCache.setTransactionSnapshot(
+new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
+ImmutableSortedMap.of()));
+  // Run major compaction
--- End diff --

The comment can be changed to - `Try to create a situation where HBase 
shuts down when the async prune writer thread is trying to write to the prune 
table on a compaction. This will surface any deadlocks during HBase shutdown.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100042655
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+
+import com.google.common.base.Supplier;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Supplies instances of {@link PruneUpperBoundWriter} implementations.
+ */
+public class PruneUpperBoundWriterSupplier implements 
Supplier {
+  private static final Log LOG = 
LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
+
+  private static volatile PruneUpperBoundWriter instance;
+  private static volatile int refCount = 0;
+  private static final Object lock = new Object();
+
+  private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
+
+  public PruneUpperBoundWriterSupplier(DataJanitorState dataJanitorState, 
long pruneFlushInterval) {
+this.dataJanitorState = dataJanitorState;
+this.pruneFlushInterval = pruneFlushInterval;
+  }
+
+  @Override
+  public PruneUpperBoundWriter get() {
+synchronized (lock) {
+  if (instance == null) {
+instance = new PruneUpperBoundWriter(dataJanitorState, 
pruneFlushInterval);
+instance.startAndWait();
+  }
+  refCount++;
+  return instance;
+}
+  }
+
+  public void release() {
+synchronized (lock) {
+  refCount--;
--- End diff --

Same here, decrement refCount outside lock and return if refCount is 
greater than zero. Only lock when stopping.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100039885
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
 ---
@@ -76,18 +76,21 @@ private void startFlushThread() {
 flushThread = new Thread("tephra-prune-upper-bound-writer") {
   @Override
   public void run() {
-while (!isInterrupted()) {
+while (!isInterrupted() && !stopped) {
   long now = System.currentTimeMillis();
   if (now > (lastChecked + pruneFlushInterval)) {
-if (shouldFlush.compareAndSet(true, false)) {
+if (!pruneEntries.isEmpty()) {
--- End diff --

This check is no longer required since we check for `while 
(pruneEntries.firstEntry() != null)` later


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100040861
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
 ---
@@ -76,18 +76,21 @@ private void startFlushThread() {
 flushThread = new Thread("tephra-prune-upper-bound-writer") {
   @Override
   public void run() {
-while (!isInterrupted()) {
+while (!isInterrupted() && !stopped) {
   long now = System.currentTimeMillis();
   if (now > (lastChecked + pruneFlushInterval)) {
-if (shouldFlush.compareAndSet(true, false)) {
+if (!pruneEntries.isEmpty()) {
   // should flush data
   try {
-dataJanitorState.savePruneUpperBoundForRegion(regionName, 
pruneUpperBound.get());
+while (pruneEntries.firstEntry() != null) {
+  Map.Entry firstEntry = 
pruneEntries.firstEntry();
+  
dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey().array(), 
firstEntry.getValue());
+  // We can now remove the entry only if the key and value 
match with what we wrote since it is
+  // possible that a new pruneUpperBound for the same key 
has been added
+  pruneEntries.remove(firstEntry.getKey(), 
firstEntry.getValue());
+}
   } catch (IOException ex) {
-LOG.warn("Cannot record prune upper bound for region " + 
regionNameAsString + " in the table " +
-   
pruneStateTable.getNameWithNamespaceInclAsString() + " after compacting 
region.", ex);
-// Retry again
-shouldFlush.set(true);
+LOG.warn("Cannot record prune upper bound for a region.", 
ex);
--- End diff --

It would be good to add prune state table name to the log message, so that 
it can help in figuring out mis-configurations if any


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100042824
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
 ---
@@ -360,8 +404,18 @@ public TransactionStateCache get() {
 @Override
 public void postCompact(ObserverContext 
e, Store store, StoreFile resultFile,
 CompactionRequest request) throws IOException {
-  super.postCompact(e, store, resultFile, request);
-  lastMajorCompactionTime.set(System.currentTimeMillis());
+  synchronized (compactionLock) {
+super.postCompact(e, store, resultFile, request);
+lastMajorCompactionTime.set(System.currentTimeMillis());
+  }
+}
+
+@Override
+public void stop(CoprocessorEnvironment e) throws IOException {
+  synchronized (stopLock) {
+System.out.println("*** Stopping " + 
this.getClass().getName());
--- End diff --

Remove the debug println


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100028436
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCache.java
 ---
@@ -70,7 +71,11 @@ public void setConf(Configuration conf) {
 
   @Override
   protected void startUp() throws Exception {
-refreshState();
+try {
+  refreshState();
+} catch (IOException ioe) {
+  LOG.info("Error refreshing transaction state cache: " + 
ioe.getMessage());
--- End diff --

Would be a good idea to log the stack trace


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-08 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r100040267
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
 ---
@@ -18,56 +18,56 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Thread that will write the the prune upper bound
  */
-public class PruneUpperBoundWriter {
-  private static final Log LOG = 
LogFactory.getLog(TransactionProcessor.class);
+public class PruneUpperBoundWriter extends AbstractIdleService {
+  private static final Log LOG = 
LogFactory.getLog(PruneUpperBoundWriter.class);
 
-  private final TableName pruneStateTable;
   private final DataJanitorState dataJanitorState;
-  private final byte[] regionName;
-  private final String regionNameAsString;
   private final long pruneFlushInterval;
-  private final AtomicLong pruneUpperBound;
-  private final AtomicBoolean shouldFlush;
+  private final ConcurrentSkipListMap pruneEntries;
--- End diff --

It would be simpler to use `ConcurrentSkipListMap`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-07 Thread gokulavasan
Github user gokulavasan commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r99905573
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
 ---
@@ -18,55 +18,48 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
 
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Thread that will write the the prune upper bound
  */
-public class PruneUpperBoundWriter {
-  private static final Log LOG = 
LogFactory.getLog(TransactionProcessor.class);
+public class PruneUpperBoundWriter extends AbstractIdleService {
+  private static final Log LOG = 
LogFactory.getLog(PruneUpperBoundWriter.class);
 
-  private final TableName pruneStateTable;
   private final DataJanitorState dataJanitorState;
-  private final byte[] regionName;
-  private final String regionNameAsString;
   private final long pruneFlushInterval;
-  private final AtomicLong pruneUpperBound;
-  private final AtomicBoolean shouldFlush;
+  private final Queue pruneEntries;
 
   private Thread flushThread;
   private long lastChecked;
 
-  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, 
TableName pruneStateTable, String regionNameAsString,
-   byte[] regionName, long pruneFlushInterval) 
{
-this.pruneStateTable = pruneStateTable;
+  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, long 
pruneFlushInterval) {
 this.dataJanitorState = dataJanitorState;
-this.regionName = regionName;
-this.regionNameAsString = regionNameAsString;
 this.pruneFlushInterval = pruneFlushInterval;
-this.pruneUpperBound = new AtomicLong();
-this.shouldFlush = new AtomicBoolean(false);
+this.pruneEntries = new ConcurrentLinkedQueue<>();
 startFlushThread();
   }
 
-  public boolean isAlive() {
-return flushThread.isAlive();
+  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+pruneEntries.add(new PruneInfo(regionName, pruneUpperBound));
--- End diff --

@poornachandra Should that max size be configurable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-07 Thread gokulavasan
Github user gokulavasan commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r99903840
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+
+import com.google.common.base.Supplier;
+
+/**
+ * Supplies instances of {@link PruneUpperBoundWriter} implementations.
+ */
+public class PruneUpperBoundWriterSupplier implements 
Supplier {
+
+  private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
+
+  protected static volatile PruneUpperBoundWriter instance;
+  protected static Object lock = new Object();
+
+  public PruneUpperBoundWriterSupplier(DataJanitorState dataJanitorState, 
long pruneFlushInterval) {
+this.dataJanitorState = dataJanitorState;
+this.pruneFlushInterval = pruneFlushInterval;
+  }
+
+  @Override
+  public PruneUpperBoundWriter get() {
+if (instance == null) {
+  synchronized (lock) {
+if (instance == null) {
+  instance = new PruneUpperBoundWriter(dataJanitorState, 
pruneFlushInterval);
+  instance.start();
--- End diff --

@poornachandra Do you have any thoughts on how to address this? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-07 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r99879068
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+
+import com.google.common.base.Supplier;
+
+/**
+ * Supplies instances of {@link PruneUpperBoundWriter} implementations.
+ */
+public class PruneUpperBoundWriterSupplier implements 
Supplier {
+
+  private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
+
+  protected static volatile PruneUpperBoundWriter instance;
--- End diff --

I don't think this needs to be protected


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-07 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r99880416
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
 ---
@@ -18,55 +18,48 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
 
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Thread that will write the the prune upper bound
  */
-public class PruneUpperBoundWriter {
-  private static final Log LOG = 
LogFactory.getLog(TransactionProcessor.class);
+public class PruneUpperBoundWriter extends AbstractIdleService {
+  private static final Log LOG = 
LogFactory.getLog(PruneUpperBoundWriter.class);
 
-  private final TableName pruneStateTable;
   private final DataJanitorState dataJanitorState;
-  private final byte[] regionName;
-  private final String regionNameAsString;
   private final long pruneFlushInterval;
-  private final AtomicLong pruneUpperBound;
-  private final AtomicBoolean shouldFlush;
+  private final Queue pruneEntries;
 
   private Thread flushThread;
   private long lastChecked;
 
-  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, 
TableName pruneStateTable, String regionNameAsString,
-   byte[] regionName, long pruneFlushInterval) 
{
-this.pruneStateTable = pruneStateTable;
+  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, long 
pruneFlushInterval) {
 this.dataJanitorState = dataJanitorState;
-this.regionName = regionName;
-this.regionNameAsString = regionNameAsString;
 this.pruneFlushInterval = pruneFlushInterval;
-this.pruneUpperBound = new AtomicLong();
-this.shouldFlush = new AtomicBoolean(false);
+this.pruneEntries = new ConcurrentLinkedQueue<>();
 startFlushThread();
   }
 
-  public boolean isAlive() {
-return flushThread.isAlive();
+  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
+pruneEntries.add(new PruneInfo(regionName, pruneUpperBound));
--- End diff --

I think it would be good to limit the number of entries in the 
`pruneEntries` queue. If we are over the limit then we can drop earlier 
elements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-07 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r99879750
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
 ---
@@ -18,55 +18,48 @@
 
 package org.apache.tephra.hbase.txprune;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
 
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Thread that will write the the prune upper bound
  */
-public class PruneUpperBoundWriter {
-  private static final Log LOG = 
LogFactory.getLog(TransactionProcessor.class);
+public class PruneUpperBoundWriter extends AbstractIdleService {
+  private static final Log LOG = 
LogFactory.getLog(PruneUpperBoundWriter.class);
 
-  private final TableName pruneStateTable;
   private final DataJanitorState dataJanitorState;
-  private final byte[] regionName;
-  private final String regionNameAsString;
   private final long pruneFlushInterval;
-  private final AtomicLong pruneUpperBound;
-  private final AtomicBoolean shouldFlush;
+  private final Queue pruneEntries;
 
   private Thread flushThread;
   private long lastChecked;
 
-  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, 
TableName pruneStateTable, String regionNameAsString,
-   byte[] regionName, long pruneFlushInterval) 
{
-this.pruneStateTable = pruneStateTable;
+  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, long 
pruneFlushInterval) {
 this.dataJanitorState = dataJanitorState;
-this.regionName = regionName;
-this.regionNameAsString = regionNameAsString;
 this.pruneFlushInterval = pruneFlushInterval;
-this.pruneUpperBound = new AtomicLong();
-this.shouldFlush = new AtomicBoolean(false);
+this.pruneEntries = new ConcurrentLinkedQueue<>();
 startFlushThread();
--- End diff --

Since the flush thread is started in the `startUp` method, we don't need to 
start it in the constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-07 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r99879439
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+
+import com.google.common.base.Supplier;
+
+/**
+ * Supplies instances of {@link PruneUpperBoundWriter} implementations.
+ */
+public class PruneUpperBoundWriterSupplier implements 
Supplier {
+
+  private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
+
+  protected static volatile PruneUpperBoundWriter instance;
+  protected static Object lock = new Object();
--- End diff --

Same here about protected. Also it is better to make the `lock` object 
final.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-07 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r99882082
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+
+import com.google.common.base.Supplier;
+
+/**
+ * Supplies instances of {@link PruneUpperBoundWriter} implementations.
+ */
+public class PruneUpperBoundWriterSupplier implements 
Supplier {
+
+  private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
+
+  protected static volatile PruneUpperBoundWriter instance;
+  protected static Object lock = new Object();
+
+  public PruneUpperBoundWriterSupplier(DataJanitorState dataJanitorState, 
long pruneFlushInterval) {
+this.dataJanitorState = dataJanitorState;
+this.pruneFlushInterval = pruneFlushInterval;
+  }
+
+  @Override
+  public PruneUpperBoundWriter get() {
+if (instance == null) {
+  synchronized (lock) {
+if (instance == null) {
+  instance = new PruneUpperBoundWriter(dataJanitorState, 
pruneFlushInterval);
+  instance.start();
--- End diff --

We have the same problem as `TransactionStateCache` here too - we don't 
have a way to stop this thread after all instances of `TransactionProcessor` 
co-processors have been shutdown 
(https://issues.apache.org/jira/browse/TEPHRA-152)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-07 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/32#discussion_r99881005
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
 ---
@@ -79,15 +72,17 @@ public void run() {
 while (!isInterrupted()) {
   long now = System.currentTimeMillis();
   if (now > (lastChecked + pruneFlushInterval)) {
-if (shouldFlush.compareAndSet(true, false)) {
+if (!pruneEntries.isEmpty()) {
   // should flush data
   try {
-dataJanitorState.savePruneUpperBoundForRegion(regionName, 
pruneUpperBound.get());
+Iterator iterator = pruneEntries.iterator();
--- End diff --

Since we are doing concurrent operations here it would be good to use 
`peek()` and `poll()`, rather than a iterator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

2017-02-07 Thread gokulavasan
GitHub user gokulavasan opened a pull request:

https://github.com/apache/incubator-tephra/pull/32

(TEPHRA-215) (TEPHRA-218) Use single thread across all regions in a r…

…egion server to persist Prune Upper Bound info. Also don't refresh cache 
during startup of TransactionStateCache to avoid the possibility of Service 
stopping if tx.snapshot dir is not found during startup

JIRA : 
https://issues.apache.org/jira/browse/TEPHRA-215
https://issues.apache.org/jira/browse/TEPHRA-218

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gokulavasan/incubator-tephra 
feature/tephra-215

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-tephra/pull/32.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #32


commit 542e0f3512c22387c3fe349baeb1744b64a4ca06
Author: Gokul Gunasekaran 
Date:   2017-02-07T08:17:14Z

(TEPHRA-215) (TEPHRA-218) Use single thread across all regions in a region 
server to persist Prune Upper Bound info. Also don't refresh cache during 
startup of TransactionStateCache to avoid the possibility of Service stopping 
if tx.snapshot dir is not found during startup




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---