[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-tephra/pull/20


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90766611
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
 ---
@@ -259,6 +262,8 @@ private long computePruneUpperBound(TimeRegions 
timeRegions) throws IOException
 if (pruneUpperBoundForTime != -1) {
   Long minPruneUpperBoundRegions = 
Collections.min(pruneUpperBoundRegions.values());
   return Math.min(pruneUpperBoundForTime, 
minPruneUpperBoundRegions);
+} else {
+  LOG.debug("Ignoring invalid prune upper bound -1 for time {}", 
time);
 }
   } else {
 if (LOG.isDebugEnabled()) {
--- End diff --

Sure, will update the message


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90766397
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
 ---
@@ -259,6 +262,8 @@ private long computePruneUpperBound(TimeRegions 
timeRegions) throws IOException
 if (pruneUpperBoundForTime != -1) {
   Long minPruneUpperBoundRegions = 
Collections.min(pruneUpperBoundRegions.values());
   return Math.min(pruneUpperBoundForTime, 
minPruneUpperBoundRegions);
+} else {
+  LOG.debug("Ignoring invalid prune upper bound -1 for time {}", 
time);
 }
   } else {
 if (LOG.isDebugEnabled()) {
--- End diff --

Maybe the message here should also be rephrased: "Ignoring regions for time 
{} because not all of the regions recorded a pruneUpperBound."

This may seem picky, but remember that these messages will be seen by 
devops who does not understand the internals. If you rephrase it this way, they 
will understand better what is gong on. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90766371
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
 ---
@@ -259,6 +262,8 @@ private long computePruneUpperBound(TimeRegions 
timeRegions) throws IOException
 if (pruneUpperBoundForTime != -1) {
   Long minPruneUpperBoundRegions = 
Collections.min(pruneUpperBoundRegions.values());
   return Math.min(pruneUpperBoundForTime, 
minPruneUpperBoundRegions);
+} else {
+  LOG.debug("Ignoring invalid prune upper bound -1 for time {}", 
time);
--- End diff --

A better message would be: Ignoring regions for time {} because no 
pruneUpperBound was found for that time and the data must be incomplete. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90766319
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
 ---
@@ -179,15 +180,17 @@ public void pruneComplete(long time, long 
maxPrunedInvalid) throws IOException {
   return;
 }
 
-// Get regions for given time, so as to not delete them
+// Get regions for the given time, so as to not delete them. The prune 
upper bounds for regions are recorded
+// by TransactionProcessor and the deletion is done by this class. To 
avoid update/delete race condition,
+// we only delete stale regions.
--- End diff --

OK, the comment makes this clear that it is because of race condition. Not 
sure what kind of race conditions? Also, I believe my comment still holds that 
this means, as long as a region exists, we will never delete its entries here. 
Do we really have to keep a record of a region's existence for every single 
pruneInterval since the beginning of time? That can be a lot of records over 
time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90766057
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
 ---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.janitor.TransactionPruningPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Default implementation of the {@link TransactionPruningPlugin} for 
HBase.
+ *
+ * This plugin determines the prune upper bound for transactional HBase 
tables that use
+ * coprocessor {@link TransactionProcessor}.
+ *
+ * State storage:
+ *
+ * This plugin expects the TransactionProcessor to save the prune upper 
bound for invalid transactions
+ * after every major compaction of a region. Let's call this (region, 
prune upper bound).
+ * In addition, the plugin also persists the following information on a 
run at time t
+ * 
+ *   
+ * (t, set of regions): Set of transactional regions at time 
t.
+ * Transactional regions are regions of the tables that have the 
coprocessor TransactionProcessor
+ * attached to them.
+ *   
+ *   
+ * (t, prune upper bound): This is the smallest not in-progress 
transaction that
+ * will not have writes in any HBase regions that are created after 
time t.
+ * This value is determined by the Transaction Service based on the 
transaction state at time t
+ * and passed on to the plugin.
+ *   
+ * 
+ *
+ * Computing prune upper bound:
+ *
+ * In a typical HBase instance, there can be a constant change in the 
number of regions due to region creations,
+ * splits and merges. At any given time there can always be a region on 
which a major compaction has not been run.
+ * Since the prune upper bound will get recorded for a region only after a 
major compaction,
+ * using only the latest set of regions we may not be able to find the
+ * prune upper bounds for all the current regions. Hence we persist the 
set of regions that exist at that time
+ * of each run of the plugin, and use historical region set for time 
t, t - 1, etc.
+ * to determine the prune upper bound.
+ *
+ * From the regions saved at time t, t - 1, etc.,
+ * the plugin tries to find the latest (t, set of regions) where 
all regions have been major compacted,
+ * i.e, all regions have prune upper bound recorded in (region, prune 
upper bound).
+ * 
+ * If such a set is found for time t1, the prune upper bound 
returned by the plugin is the minimum of
+ * 
+ *   Prune upper bounds of regions in set (t1, set of 
regions)
+ *   Prune upper bound from (t1, prune upper bound)
+ * 
+ *
+ * 
+ * Above, when we find (t1, set of regions), there may a region 
that was created after time t1,
+ * but has a data write from an invalid transaction that is smaller than 
the prune u

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90765102
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
 ---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.janitor.TransactionPruningPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Default implementation of the {@link TransactionPruningPlugin} for 
HBase.
+ *
+ * This plugin determines the prune upper bound for transactional HBase 
tables that use
+ * coprocessor {@link TransactionProcessor}.
+ *
+ * State storage:
+ *
+ * This plugin expects the TransactionProcessor to save the prune upper 
bound for invalid transactions
+ * after every major compaction of a region. Let's call this (region, 
prune upper bound).
+ * In addition, the plugin also persists the following information on a 
run at time t
+ * 
+ *   
+ * (t, set of regions): Set of transactional regions at time 
t.
+ * Transactional regions are regions of the tables that have the 
coprocessor TransactionProcessor
+ * attached to them.
+ *   
+ *   
+ * (t, prune upper bound): This is the smallest not in-progress 
transaction that
+ * will not have writes in any HBase regions that are created after 
time t.
+ * This value is determined by the Transaction Service based on the 
transaction state at time t
+ * and passed on to the plugin.
+ *   
+ * 
+ *
+ * Computing prune upper bound:
+ *
+ * In a typical HBase instance, there can be a constant change in the 
number of regions due to region creations,
+ * splits and merges. At any given time there can always be a region on 
which a major compaction has not been run.
+ * Since the prune upper bound will get recorded for a region only after a 
major compaction,
+ * using only the latest set of regions we may not be able to find the
+ * prune upper bounds for all the current regions. Hence we persist the 
set of regions that exist at that time
+ * of each run of the plugin, and use historical region set for time 
t, t - 1, etc.
+ * to determine the prune upper bound.
+ *
+ * From the regions saved at time t, t - 1, etc.,
+ * the plugin tries to find the latest (t, set of regions) where 
all regions have been major compacted,
+ * i.e, all regions have prune upper bound recorded in (region, prune 
upper bound).
+ * 
+ * If such a set is found for time t1, the prune upper bound 
returned by the plugin is the minimum of
+ * 
+ *   Prune upper bounds of regions in set (t1, set of 
regions)
+ *   Prune upper bound from (t1, prune upper bound)
+ * 
+ *
+ * 
+ * Above, when we find (t1, set of regions), there may a region 
that was created after time t1,
+ * but has a data write from an invalid transaction that is smaller than 
the prune u

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90765092
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
 ---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.janitor.TransactionPruningPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Default implementation of the {@link TransactionPruningPlugin} for 
HBase.
+ *
+ * This plugin determines the prune upper bound for transactional HBase 
tables that use
+ * coprocessor {@link TransactionProcessor}.
+ *
+ * State storage:
+ *
+ * This plugin expects the TransactionProcessor to save the prune upper 
bound for invalid transactions
+ * after every major compaction of a region. Let's call this (region, 
prune upper bound).
+ * In addition, the plugin also persists the following information on a 
run at time t
+ * 
+ *   
+ * (t, set of regions): Set of transactional regions at time 
t.
+ * Transactional regions are regions of the tables that have the 
coprocessor TransactionProcessor
+ * attached to them.
+ *   
+ *   
+ * (t, prune upper bound): This is the smallest not in-progress 
transaction that
+ * will not have writes in any HBase regions that are created after 
time t.
+ * This value is determined by the Transaction Service based on the 
transaction state at time t
+ * and passed on to the plugin.
+ *   
+ * 
+ *
+ * Computing prune upper bound:
+ *
+ * In a typical HBase instance, there can be a constant change in the 
number of regions due to region creations,
+ * splits and merges. At any given time there can always be a region on 
which a major compaction has not been run.
+ * Since the prune upper bound will get recorded for a region only after a 
major compaction,
+ * using only the latest set of regions we may not be able to find the
+ * prune upper bounds for all the current regions. Hence we persist the 
set of regions that exist at that time
+ * of each run of the plugin, and use historical region set for time 
t, t - 1, etc.
+ * to determine the prune upper bound.
+ *
+ * From the regions saved at time t, t - 1, etc.,
+ * the plugin tries to find the latest (t, set of regions) where 
all regions have been major compacted,
+ * i.e, all regions have prune upper bound recorded in (region, prune 
upper bound).
+ * 
+ * If such a set is found for time t1, the prune upper bound 
returned by the plugin is the minimum of
+ * 
+ *   Prune upper bounds of regions in set (t1, set of 
regions)
+ *   Prune upper bound from (t1, prune upper bound)
+ * 
+ *
+ * 
+ * Above, when we find (t1, set of regions), there may a region 
that was created after time t1,
+ * but has a data write from an invalid transaction that is smaller than 
the prune u

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90759372
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
@@ -58,10 +110,248 @@ public long getPruneUpperBound(byte[] regionId) 
throws IOException {
 }
   }
 
+  /**
+   * Get latest prune upper bounds for given regions. This is a batch 
operation of method
+   * {@link #getPruneUpperBoundForRegion(byte[])}
+   *
+   * @param regions a set of regions
+   * @return a map containing region id and its latest prune upper bound 
value
+   * @throws IOException when not able to read the data from HBase
+   */
+  public Map getPruneUpperBoundForRegions(SortedSet 
regions) throws IOException {
+Map resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (regions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  resultMap.put(region, pruneUpperBoundRegion);
+}
+  }
+}
+  }
+  return resultMap;
+}
+  }
+
+  /**
+   * Delete all regions that are not in the given exclude set and whose 
prune upper bound is less than a given value.
+   * After the invalid list is pruned up to deletionPruneUpperBound, we do 
not need entries for regions that have
+   * prune upper bound less than deletionPruneUpperBound. We however limit 
the deletion to only regions that are
+   * no longer in existence (due to deletion, etc.), to avoid 
update/delete race conditions.
+   *
+   * @param deletionPruneUpperBound prune upper bound below which regions 
will be deleted
+   * @param excludeRegions set of regions that should not be deleted
+   * @throws IOException when not able to delete data in HBase
+   */
+  public void deleteRegionsWithPruneUpperBoundBefore(long 
deletionPruneUpperBound, SortedSet excludeRegions)
+throws IOException {
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (!excludeRegions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  if (pruneUpperBoundRegion < deletionPruneUpperBound) {
+stateTable.delete(new Delete(next.getRow()));
+  }
+}
+  }
+}
+  }
+}
+  }
+
+  // ---
+  // --- Methods for regions at a given time ---
+  // ---
+  // Key: 0x2
+  // Col 't': 
+  // ---
+
+  /**
+   * Persist the regions for the given time. {@link 
HBaseTransactionPruningPlugin} saves the set of
+   * transactional regions existing in the HBase instance periodically.
+   *
+   * @param time timestamp in milliseconds
+   * @param regions set of regions at the time
+   * @throws IOException when not able to persist the data to HBase
+   */
+  public void saveRegionsForTime(long time, Set regions) throws 
IOException {
+byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+try (Table stateTable = stateTableSupplier.get()) {
+  for (byte[] region : regions) {
+Put put = new Put(makeTimeRegionKey(timeBytes, region));
+put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+stateTable.put(put);
+  }
+}
+  }
+
+  /**
+   * Return the set of regions saved for the time just before the given 
time. This method finds the greatest time
+   * that is less than the given time, a

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90759621
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
 ---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.janitor.TransactionPruningPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Default implementation of the {@link TransactionPruningPlugin} for 
HBase.
+ *
+ * This plugin determines the prune upper bound for transactional HBase 
tables that use
+ * coprocessor {@link TransactionProcessor}.
+ *
+ * State storage:
+ *
+ * This plugin expects the TransactionProcessor to save the prune upper 
bound for invalid transactions
+ * after every major compaction of a region. Let's call this (region, 
prune upper bound).
+ * In addition, the plugin also persists the following information on a 
run at time t
+ * 
+ *   
+ * (t, set of regions): Set of transactional regions at time 
t.
+ * Transactional regions are regions of the tables that have the 
coprocessor TransactionProcessor
+ * attached to them.
+ *   
+ *   
+ * (t, prune upper bound): This is the smallest not in-progress 
transaction that
+ * will not have writes in any HBase regions that are created after 
time t.
+ * This value is determined by the Transaction Service based on the 
transaction state at time t
+ * and passed on to the plugin.
+ *   
+ * 
+ *
+ * Computing prune upper bound:
+ *
+ * In a typical HBase instance, there can be a constant change in the 
number of regions due to region creations,
+ * splits and merges. At any given time there can always be a region on 
which a major compaction has not been run.
+ * Since the prune upper bound will get recorded for a region only after a 
major compaction,
+ * using only the latest set of regions we may not be able to find the
+ * prune upper bounds for all the current regions. Hence we persist the 
set of regions that exist at that time
+ * of each run of the plugin, and use historical region set for time 
t, t - 1, etc.
+ * to determine the prune upper bound.
+ *
+ * From the regions saved at time t, t - 1, etc.,
+ * the plugin tries to find the latest (t, set of regions) where 
all regions have been major compacted,
+ * i.e, all regions have prune upper bound recorded in (region, prune 
upper bound).
+ * 
+ * If such a set is found for time t1, the prune upper bound 
returned by the plugin is the minimum of
+ * 
+ *   Prune upper bounds of regions in set (t1, set of 
regions)
+ *   Prune upper bound from (t1, prune upper bound)
+ * 
+ *
+ * 
+ * Above, when we find (t1, set of regions), there may a region 
that was created after time t1,
+ * but has a data write from an invalid transaction that is smaller than 
the prune upper boun

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90759422
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
@@ -58,10 +110,248 @@ public long getPruneUpperBound(byte[] regionId) 
throws IOException {
 }
   }
 
+  /**
+   * Get latest prune upper bounds for given regions. This is a batch 
operation of method
+   * {@link #getPruneUpperBoundForRegion(byte[])}
+   *
+   * @param regions a set of regions
+   * @return a map containing region id and its latest prune upper bound 
value
+   * @throws IOException when not able to read the data from HBase
+   */
+  public Map getPruneUpperBoundForRegions(SortedSet 
regions) throws IOException {
+Map resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (regions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  resultMap.put(region, pruneUpperBoundRegion);
+}
+  }
+}
+  }
+  return resultMap;
+}
+  }
+
+  /**
+   * Delete all regions that are not in the given exclude set and whose 
prune upper bound is less than a given value.
+   * After the invalid list is pruned up to deletionPruneUpperBound, we do 
not need entries for regions that have
+   * prune upper bound less than deletionPruneUpperBound. We however limit 
the deletion to only regions that are
+   * no longer in existence (due to deletion, etc.), to avoid 
update/delete race conditions.
+   *
+   * @param deletionPruneUpperBound prune upper bound below which regions 
will be deleted
+   * @param excludeRegions set of regions that should not be deleted
+   * @throws IOException when not able to delete data in HBase
+   */
+  public void deleteRegionsWithPruneUpperBoundBefore(long 
deletionPruneUpperBound, SortedSet excludeRegions)
+throws IOException {
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (!excludeRegions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  if (pruneUpperBoundRegion < deletionPruneUpperBound) {
+stateTable.delete(new Delete(next.getRow()));
+  }
+}
+  }
+}
+  }
+}
+  }
+
+  // ---
+  // --- Methods for regions at a given time ---
+  // ---
+  // Key: 0x2
+  // Col 't': 
+  // ---
+
+  /**
+   * Persist the regions for the given time. {@link 
HBaseTransactionPruningPlugin} saves the set of
+   * transactional regions existing in the HBase instance periodically.
+   *
+   * @param time timestamp in milliseconds
+   * @param regions set of regions at the time
+   * @throws IOException when not able to persist the data to HBase
+   */
+  public void saveRegionsForTime(long time, Set regions) throws 
IOException {
+byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+try (Table stateTable = stateTableSupplier.get()) {
+  for (byte[] region : regions) {
+Put put = new Put(makeTimeRegionKey(timeBytes, region));
+put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+stateTable.put(put);
+  }
+}
+  }
+
+  /**
+   * Return the set of regions saved for the time just before the given 
time. This method finds the greatest time
+   * that is less than the given time, a

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90759897
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
 ---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.janitor.TransactionPruningPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Default implementation of the {@link TransactionPruningPlugin} for 
HBase.
+ *
+ * This plugin determines the prune upper bound for transactional HBase 
tables that use
+ * coprocessor {@link TransactionProcessor}.
+ *
+ * State storage:
+ *
+ * This plugin expects the TransactionProcessor to save the prune upper 
bound for invalid transactions
+ * after every major compaction of a region. Let's call this (region, 
prune upper bound).
+ * In addition, the plugin also persists the following information on a 
run at time t
+ * 
+ *   
+ * (t, set of regions): Set of transactional regions at time 
t.
+ * Transactional regions are regions of the tables that have the 
coprocessor TransactionProcessor
+ * attached to them.
+ *   
+ *   
+ * (t, prune upper bound): This is the smallest not in-progress 
transaction that
+ * will not have writes in any HBase regions that are created after 
time t.
+ * This value is determined by the Transaction Service based on the 
transaction state at time t
+ * and passed on to the plugin.
+ *   
+ * 
+ *
+ * Computing prune upper bound:
+ *
+ * In a typical HBase instance, there can be a constant change in the 
number of regions due to region creations,
+ * splits and merges. At any given time there can always be a region on 
which a major compaction has not been run.
+ * Since the prune upper bound will get recorded for a region only after a 
major compaction,
+ * using only the latest set of regions we may not be able to find the
+ * prune upper bounds for all the current regions. Hence we persist the 
set of regions that exist at that time
+ * of each run of the plugin, and use historical region set for time 
t, t - 1, etc.
+ * to determine the prune upper bound.
+ *
+ * From the regions saved at time t, t - 1, etc.,
+ * the plugin tries to find the latest (t, set of regions) where 
all regions have been major compacted,
+ * i.e, all regions have prune upper bound recorded in (region, prune 
upper bound).
+ * 
+ * If such a set is found for time t1, the prune upper bound 
returned by the plugin is the minimum of
+ * 
+ *   Prune upper bounds of regions in set (t1, set of 
regions)
+ *   Prune upper bound from (t1, prune upper bound)
+ * 
+ *
+ * 
+ * Above, when we find (t1, set of regions), there may a region 
that was created after time t1,
+ * but has a data write from an invalid transaction that is smaller than 
the prune upper boun

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90759970
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
 ---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.janitor.TransactionPruningPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Default implementation of the {@link TransactionPruningPlugin} for 
HBase.
+ *
+ * This plugin determines the prune upper bound for transactional HBase 
tables that use
+ * coprocessor {@link TransactionProcessor}.
+ *
+ * State storage:
+ *
+ * This plugin expects the TransactionProcessor to save the prune upper 
bound for invalid transactions
+ * after every major compaction of a region. Let's call this (region, 
prune upper bound).
+ * In addition, the plugin also persists the following information on a 
run at time t
+ * 
+ *   
+ * (t, set of regions): Set of transactional regions at time 
t.
+ * Transactional regions are regions of the tables that have the 
coprocessor TransactionProcessor
+ * attached to them.
+ *   
+ *   
+ * (t, prune upper bound): This is the smallest not in-progress 
transaction that
+ * will not have writes in any HBase regions that are created after 
time t.
+ * This value is determined by the Transaction Service based on the 
transaction state at time t
+ * and passed on to the plugin.
+ *   
+ * 
+ *
+ * Computing prune upper bound:
+ *
+ * In a typical HBase instance, there can be a constant change in the 
number of regions due to region creations,
+ * splits and merges. At any given time there can always be a region on 
which a major compaction has not been run.
+ * Since the prune upper bound will get recorded for a region only after a 
major compaction,
+ * using only the latest set of regions we may not be able to find the
+ * prune upper bounds for all the current regions. Hence we persist the 
set of regions that exist at that time
+ * of each run of the plugin, and use historical region set for time 
t, t - 1, etc.
+ * to determine the prune upper bound.
+ *
+ * From the regions saved at time t, t - 1, etc.,
+ * the plugin tries to find the latest (t, set of regions) where 
all regions have been major compacted,
+ * i.e, all regions have prune upper bound recorded in (region, prune 
upper bound).
+ * 
+ * If such a set is found for time t1, the prune upper bound 
returned by the plugin is the minimum of
+ * 
+ *   Prune upper bounds of regions in set (t1, set of 
regions)
+ *   Prune upper bound from (t1, prune upper bound)
+ * 
+ *
+ * 
+ * Above, when we find (t1, set of regions), there may a region 
that was created after time t1,
+ * but has a data write from an invalid transaction that is smaller than 
the prune upper boun

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90759660
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
 ---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.janitor.TransactionPruningPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Default implementation of the {@link TransactionPruningPlugin} for 
HBase.
+ *
+ * This plugin determines the prune upper bound for transactional HBase 
tables that use
+ * coprocessor {@link TransactionProcessor}.
+ *
+ * State storage:
+ *
+ * This plugin expects the TransactionProcessor to save the prune upper 
bound for invalid transactions
+ * after every major compaction of a region. Let's call this (region, 
prune upper bound).
+ * In addition, the plugin also persists the following information on a 
run at time t
+ * 
+ *   
+ * (t, set of regions): Set of transactional regions at time 
t.
+ * Transactional regions are regions of the tables that have the 
coprocessor TransactionProcessor
+ * attached to them.
+ *   
+ *   
+ * (t, prune upper bound): This is the smallest not in-progress 
transaction that
+ * will not have writes in any HBase regions that are created after 
time t.
+ * This value is determined by the Transaction Service based on the 
transaction state at time t
+ * and passed on to the plugin.
+ *   
+ * 
+ *
+ * Computing prune upper bound:
+ *
+ * In a typical HBase instance, there can be a constant change in the 
number of regions due to region creations,
+ * splits and merges. At any given time there can always be a region on 
which a major compaction has not been run.
+ * Since the prune upper bound will get recorded for a region only after a 
major compaction,
+ * using only the latest set of regions we may not be able to find the
+ * prune upper bounds for all the current regions. Hence we persist the 
set of regions that exist at that time
+ * of each run of the plugin, and use historical region set for time 
t, t - 1, etc.
+ * to determine the prune upper bound.
+ *
+ * From the regions saved at time t, t - 1, etc.,
+ * the plugin tries to find the latest (t, set of regions) where 
all regions have been major compacted,
+ * i.e, all regions have prune upper bound recorded in (region, prune 
upper bound).
+ * 
+ * If such a set is found for time t1, the prune upper bound 
returned by the plugin is the minimum of
+ * 
+ *   Prune upper bounds of regions in set (t1, set of 
regions)
+ *   Prune upper bound from (t1, prune upper bound)
+ * 
+ *
+ * 
+ * Above, when we find (t1, set of regions), there may a region 
that was created after time t1,
+ * but has a data write from an invalid transaction that is smaller than 
the prune upper boun

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90759321
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
@@ -58,10 +110,248 @@ public long getPruneUpperBound(byte[] regionId) 
throws IOException {
 }
   }
 
+  /**
+   * Get latest prune upper bounds for given regions. This is a batch 
operation of method
+   * {@link #getPruneUpperBoundForRegion(byte[])}
+   *
+   * @param regions a set of regions
+   * @return a map containing region id and its latest prune upper bound 
value
+   * @throws IOException when not able to read the data from HBase
+   */
+  public Map getPruneUpperBoundForRegions(SortedSet 
regions) throws IOException {
+Map resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (regions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  resultMap.put(region, pruneUpperBoundRegion);
+}
+  }
+}
+  }
+  return resultMap;
+}
+  }
+
+  /**
+   * Delete all regions that are not in the given exclude set and whose 
prune upper bound is less than a given value.
+   * After the invalid list is pruned up to deletionPruneUpperBound, we do 
not need entries for regions that have
+   * prune upper bound less than deletionPruneUpperBound. We however limit 
the deletion to only regions that are
+   * no longer in existence (due to deletion, etc.), to avoid 
update/delete race conditions.
+   *
+   * @param deletionPruneUpperBound prune upper bound below which regions 
will be deleted
+   * @param excludeRegions set of regions that should not be deleted
+   * @throws IOException when not able to delete data in HBase
+   */
+  public void deleteRegionsWithPruneUpperBoundBefore(long 
deletionPruneUpperBound, SortedSet excludeRegions)
+throws IOException {
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (!excludeRegions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  if (pruneUpperBoundRegion < deletionPruneUpperBound) {
+stateTable.delete(new Delete(next.getRow()));
+  }
+}
+  }
+}
+  }
+}
+  }
+
+  // ---
+  // --- Methods for regions at a given time ---
+  // ---
+  // Key: 0x2
+  // Col 't': 
+  // ---
+
+  /**
+   * Persist the regions for the given time. {@link 
HBaseTransactionPruningPlugin} saves the set of
+   * transactional regions existing in the HBase instance periodically.
+   *
+   * @param time timestamp in milliseconds
+   * @param regions set of regions at the time
+   * @throws IOException when not able to persist the data to HBase
+   */
+  public void saveRegionsForTime(long time, Set regions) throws 
IOException {
+byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+try (Table stateTable = stateTableSupplier.get()) {
+  for (byte[] region : regions) {
+Put put = new Put(makeTimeRegionKey(timeBytes, region));
+put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+stateTable.put(put);
+  }
+}
+  }
+
+  /**
+   * Return the set of regions saved for the time just before the given 
time. This method finds the greatest time
--- End diff --

Seems that this 

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90759195
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/janitor/TransactionPruningPlugin.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.janitor;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Data janitor interface to manage the invalid transaction list.
+ *
+ * 
+ * An invalid transaction can only be removed from the invalid list after 
the data written
+ * by the invalid transactions has been removed from all the data stores.
+ * The term data store is used here to represent a set of tables in a 
database that have
+ * the same data clean up policy, like all Apache Phoenix tables in an 
HBase instance.
+ *
+ * 
+ * Typically every data store will have a background job which cleans up 
the data written by invalid transactions.
+ * Prune upper bound for a data store is defined as the largest invalid 
transaction whose data has been
+ * cleaned up from that data store.
+ * 
+ * prune-upper-bound = min(max(invalid list), min(in-progress list) - 1)
+ * 
+ * where invalid list and in-progress list are from the transaction 
snapshot used to clean up the invalid data in the
+ * data store.
+ *
+ * 
+ * There will be one such plugin per data store. The plugins will be 
executed as part of the Transaction Service.
+ * Each plugin will be invoked periodically to fetch the prune upper bound 
for its data store.
+ * Invalid transaction list can pruned up to the minimum of prune upper 
bounds returned by all the plugins.
+ */
+public interface TransactionPruningPlugin {
+  /**
+   * Called once when the Transaction Service starts up.
+   *
+   * @param conf configuration for the plugin
+   */
+  void initialize(Configuration conf) throws IOException;
+
+  /**
+   * Called periodically to fetch prune upper bound for a data store. The 
plugin examines the state of data cleanup
+   * in the data store and determines the smallest invalid transaction 
whose writes no longer exist in the data
+   * store. It then returns this smallest invalid transaction as the prune 
upper bound for this data store.
+   *
+   * @param time start time of this prune iteration in milliseconds
+   * @param pruneUpperBoundForTime the largest invalid transaction that 
can be possibly removed
+   *   from the invalid list for the given 
time.
+   *   In terms of HBase, this is the smallest 
not in-progress transaction that will
+   *   not have writes in any HBase regions 
that are created after the given time.
+   *   The plugin will typically return a 
reduced upper bound based on the state of
+   *   the invalid transaction data clean up 
in the data store.
--- End diff --

I still don't understand what this is. I though this is an upper bound 
determined by the tx manager, based on its knowlegde of what invalid 
transactions may still have active processes and therefore future writes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90759960
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
 ---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.janitor.TransactionPruningPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Default implementation of the {@link TransactionPruningPlugin} for 
HBase.
+ *
+ * This plugin determines the prune upper bound for transactional HBase 
tables that use
+ * coprocessor {@link TransactionProcessor}.
+ *
+ * State storage:
+ *
+ * This plugin expects the TransactionProcessor to save the prune upper 
bound for invalid transactions
+ * after every major compaction of a region. Let's call this (region, 
prune upper bound).
+ * In addition, the plugin also persists the following information on a 
run at time t
+ * 
+ *   
+ * (t, set of regions): Set of transactional regions at time 
t.
+ * Transactional regions are regions of the tables that have the 
coprocessor TransactionProcessor
+ * attached to them.
+ *   
+ *   
+ * (t, prune upper bound): This is the smallest not in-progress 
transaction that
+ * will not have writes in any HBase regions that are created after 
time t.
+ * This value is determined by the Transaction Service based on the 
transaction state at time t
+ * and passed on to the plugin.
+ *   
+ * 
+ *
+ * Computing prune upper bound:
+ *
+ * In a typical HBase instance, there can be a constant change in the 
number of regions due to region creations,
+ * splits and merges. At any given time there can always be a region on 
which a major compaction has not been run.
+ * Since the prune upper bound will get recorded for a region only after a 
major compaction,
+ * using only the latest set of regions we may not be able to find the
+ * prune upper bounds for all the current regions. Hence we persist the 
set of regions that exist at that time
+ * of each run of the plugin, and use historical region set for time 
t, t - 1, etc.
+ * to determine the prune upper bound.
+ *
+ * From the regions saved at time t, t - 1, etc.,
+ * the plugin tries to find the latest (t, set of regions) where 
all regions have been major compacted,
+ * i.e, all regions have prune upper bound recorded in (region, prune 
upper bound).
+ * 
+ * If such a set is found for time t1, the prune upper bound 
returned by the plugin is the minimum of
+ * 
+ *   Prune upper bounds of regions in set (t1, set of 
regions)
+ *   Prune upper bound from (t1, prune upper bound)
+ * 
+ *
+ * 
+ * Above, when we find (t1, set of regions), there may a region 
that was created after time t1,
+ * but has a data write from an invalid transaction that is smaller than 
the prune upper boun

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90759375
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
@@ -58,10 +110,248 @@ public long getPruneUpperBound(byte[] regionId) 
throws IOException {
 }
   }
 
+  /**
+   * Get latest prune upper bounds for given regions. This is a batch 
operation of method
+   * {@link #getPruneUpperBoundForRegion(byte[])}
+   *
+   * @param regions a set of regions
+   * @return a map containing region id and its latest prune upper bound 
value
+   * @throws IOException when not able to read the data from HBase
+   */
+  public Map getPruneUpperBoundForRegions(SortedSet 
regions) throws IOException {
+Map resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (regions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  resultMap.put(region, pruneUpperBoundRegion);
+}
+  }
+}
+  }
+  return resultMap;
+}
+  }
+
+  /**
+   * Delete all regions that are not in the given exclude set and whose 
prune upper bound is less than a given value.
+   * After the invalid list is pruned up to deletionPruneUpperBound, we do 
not need entries for regions that have
+   * prune upper bound less than deletionPruneUpperBound. We however limit 
the deletion to only regions that are
+   * no longer in existence (due to deletion, etc.), to avoid 
update/delete race conditions.
+   *
+   * @param deletionPruneUpperBound prune upper bound below which regions 
will be deleted
+   * @param excludeRegions set of regions that should not be deleted
+   * @throws IOException when not able to delete data in HBase
+   */
+  public void deleteRegionsWithPruneUpperBoundBefore(long 
deletionPruneUpperBound, SortedSet excludeRegions)
+throws IOException {
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (!excludeRegions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  if (pruneUpperBoundRegion < deletionPruneUpperBound) {
+stateTable.delete(new Delete(next.getRow()));
+  }
+}
+  }
+}
+  }
+}
+  }
+
+  // ---
+  // --- Methods for regions at a given time ---
+  // ---
+  // Key: 0x2
+  // Col 't': 
+  // ---
+
+  /**
+   * Persist the regions for the given time. {@link 
HBaseTransactionPruningPlugin} saves the set of
+   * transactional regions existing in the HBase instance periodically.
+   *
+   * @param time timestamp in milliseconds
+   * @param regions set of regions at the time
+   * @throws IOException when not able to persist the data to HBase
+   */
+  public void saveRegionsForTime(long time, Set regions) throws 
IOException {
+byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+try (Table stateTable = stateTableSupplier.get()) {
+  for (byte[] region : regions) {
+Put put = new Put(makeTimeRegionKey(timeBytes, region));
+put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+stateTable.put(put);
+  }
+}
+  }
+
+  /**
+   * Return the set of regions saved for the time just before the given 
time. This method finds the greatest time
+   * that is less than the given time, a

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90760015
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
 ---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.janitor.TransactionPruningPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Default implementation of the {@link TransactionPruningPlugin} for 
HBase.
+ *
+ * This plugin determines the prune upper bound for transactional HBase 
tables that use
+ * coprocessor {@link TransactionProcessor}.
+ *
+ * State storage:
+ *
+ * This plugin expects the TransactionProcessor to save the prune upper 
bound for invalid transactions
+ * after every major compaction of a region. Let's call this (region, 
prune upper bound).
+ * In addition, the plugin also persists the following information on a 
run at time t
+ * 
+ *   
+ * (t, set of regions): Set of transactional regions at time 
t.
+ * Transactional regions are regions of the tables that have the 
coprocessor TransactionProcessor
+ * attached to them.
+ *   
+ *   
+ * (t, prune upper bound): This is the smallest not in-progress 
transaction that
+ * will not have writes in any HBase regions that are created after 
time t.
+ * This value is determined by the Transaction Service based on the 
transaction state at time t
+ * and passed on to the plugin.
+ *   
+ * 
+ *
+ * Computing prune upper bound:
+ *
+ * In a typical HBase instance, there can be a constant change in the 
number of regions due to region creations,
+ * splits and merges. At any given time there can always be a region on 
which a major compaction has not been run.
+ * Since the prune upper bound will get recorded for a region only after a 
major compaction,
+ * using only the latest set of regions we may not be able to find the
+ * prune upper bounds for all the current regions. Hence we persist the 
set of regions that exist at that time
+ * of each run of the plugin, and use historical region set for time 
t, t - 1, etc.
+ * to determine the prune upper bound.
+ *
+ * From the regions saved at time t, t - 1, etc.,
+ * the plugin tries to find the latest (t, set of regions) where 
all regions have been major compacted,
+ * i.e, all regions have prune upper bound recorded in (region, prune 
upper bound).
+ * 
+ * If such a set is found for time t1, the prune upper bound 
returned by the plugin is the minimum of
+ * 
+ *   Prune upper bounds of regions in set (t1, set of 
regions)
+ *   Prune upper bound from (t1, prune upper bound)
+ * 
+ *
+ * 
+ * Above, when we find (t1, set of regions), there may a region 
that was created after time t1,
+ * but has a data write from an invalid transaction that is smaller than 
the prune upper boun

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90759085
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/janitor/TransactionPruningPlugin.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.janitor;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Data janitor interface to manage the invalid transaction list.
+ *
+ * 
+ * An invalid transaction can only be removed from the invalid list after 
the data written
+ * by the invalid transactions has been removed from all the data stores.
+ * The term data store is used here to represent a set of tables in a 
database that have
+ * the same data clean up policy, like all Apache Phoenix tables in an 
HBase instance.
+ *
+ * 
+ * Typically every data store will have a background job which cleans up 
the data written by invalid transactions.
+ * Prune upper bound for a data store is defined as the largest invalid 
transaction whose data has been
+ * cleaned up from that data store.
+ * 
+ * prune-upper-bound = min(max(invalid list), min(in-progress list) - 1)
+ * 
+ * where invalid list and in-progress list are from the transaction 
snapshot used to clean up the invalid data in the
+ * data store.
+ *
+ * 
+ * There will be one such plugin per data store. The plugins will be 
executed as part of the Transaction Service.
+ * Each plugin will be invoked periodically to fetch the prune upper bound 
for its data store.
+ * Invalid transaction list can pruned up to the minimum of prune upper 
bounds returned by all the plugins.
+ */
+public interface TransactionPruningPlugin {
+  /**
+   * Called once when the Transaction Service starts up.
+   *
+   * @param conf configuration for the plugin
+   */
+  void initialize(Configuration conf) throws IOException;
+
+  /**
+   * Called periodically to fetch prune upper bound for a data store. The 
plugin examines the state of data cleanup
+   * in the data store and determines the smallest invalid transaction 
whose writes no longer exist in the data
--- End diff --

or a greatest lower bound for transaction ids that may not be pruned?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-01 Thread poornachandra
Github user poornachandra commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90580295
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
@@ -19,20 +19,46 @@
 
 package org.apache.tephra.hbase.coprocessor.janitor;
 
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import javax.annotation.Nullable;
 
 /**
  * Persist data janitor state into an HBase table.
--- End diff --

Added javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89938020
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DefaultDataJanitorPlugin.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.janitor.DataJanitorPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Default implementation of the {@link DataJanitorPlugin} for HBase
+ */
+@SuppressWarnings("WeakerAccess")
+public class DefaultDataJanitorPlugin implements DataJanitorPlugin {
--- End diff --

I think this is the HBaseJanitorPlugin? And if you had another store, it 
would be entirely different?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89938559
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/janitor/DataJanitorPlugin.java ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.janitor;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Data janitor interface to manage the invalid transaction list.
+ * There will be one such plugin per data store that will be invoked 
periodically
+ * to fetch the prune upper bound for each data store.
+ * Invalid transaction list will pruned up to the minimum of prune upper 
bounds returned by all the plugins.
+ */
+public interface DataJanitorPlugin {
+  /**
+   * Called once at the beginning to initialize the plugin
+   */
+  void initialize(Configuration conf) throws IOException;
+
+  /**
+   * Called periodically to fetch prune upper bound for a data store
+   *
+   * @param time start time of this prune iteration
+   * @param pruneUpperBoundForTime upper bound for prune tx id for the 
given start time
+   */
+  long fetchPruneUpperBound(long time, long pruneUpperBoundForTime) throws 
IOException;
+
+  /**
+   * Called after pruning the invalid list.
+   * The plugin can use the pruneUpperBound passed to clean up its state
--- End diff --

is this the same pruneUpperBound that was returned by fetchPruneUpperBound? 
What is the semantic of this - does it mean it is guaranteed that all invalid 
transactions less than that upper bound have been removed from the invalid list?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89206674
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/janitor/DataJanitorPlugin.java ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.janitor;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Data janitor interface to manage the invalid transaction list.
+ * There will be one such plugin per data store that will be invoked 
periodically
+ * to fetch the prune upper bound for each data store.
+ * Invalid transaction list will pruned up to the minimum of prune upper 
bounds returned by all the plugins.
+ */
+public interface DataJanitorPlugin {
+  /**
+   * Called once at the beginning to initialize the plugin
+   */
+  void initialize(Configuration conf) throws IOException;
+
+  /**
+   * Called periodically to fetch prune upper bound for a data store
+   *
+   * @param time start time of this prune iteration
+   * @param pruneUpperBoundForTime upper bound for prune tx id for the 
given start time
+   */
+  long fetchPruneUpperBound(long time, long pruneUpperBoundForTime) throws 
IOException;
+
+  /**
+   * Called after pruning the invalid list.
+   * The plugin can use the pruneUpperBound passed to clean up its state
+   *
+   * @param time start time of this prune iteration
+   * @param pruneUpperBound prune upper bound used to prune the invalid 
list
+   */
+  void pruneComplete(long time, long pruneUpperBound) throws IOException;
+
+  /**
+   * Called once during shutdown
+   */
+  void destroy() throws IOException;
--- End diff --

should destroy() throw exceptions? I mean, normally it should not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89939212
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DefaultDataJanitorPlugin.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.janitor.DataJanitorPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Default implementation of the {@link DataJanitorPlugin} for HBase
+ */
+@SuppressWarnings("WeakerAccess")
+public class DefaultDataJanitorPlugin implements DataJanitorPlugin {
+  public static final Logger LOG = 
LoggerFactory.getLogger(DefaultDataJanitorPlugin.class);
+
+  protected Configuration conf;
+  protected Connection connection;
+  protected DataJanitorState dataJanitorState;
+
+  @Override
+  public void initialize(Configuration conf) throws IOException {
+this.conf = conf;
+this.connection = ConnectionFactory.createConnection(conf);
+
+final TableName stateTable = 
TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
+
TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE));
+LOG.info("Initializing plugin with state table {}", 
stateTable.getNameWithNamespaceInclAsString());
+this.dataJanitorState = new DataJanitorState(new 
DataJanitorState.TableSupplier() {
+  @Override
+  public Table get() throws IOException {
+return connection.getTable(stateTable);
+  }
+});
+  }
+
+  @Override
+  public long fetchPruneUpperBound(long time, long pruneUpperBoundForTime) 
throws IOException {
+LOG.debug("Fetching prune upper bound for time {} and max prune upper 
bound {}", time, pruneUpperBoundForTime);
+if (time < 0 || pruneUpperBoundForTime < 0) {
+  return -1;
+}
+
+SortedSet transactionalRegions = getTransactionalRegions();
+if (!transactionalRegions.isEmpty()) {
+  LOG.debug("Saving {} transactional regions for time {}", 
transactionalRegions.size(), time);
+  dataJanitorState.saveRegionsForTime(time, transactionalRegions);
+  // Save prune upper bound for time as the final step.
+  // We can then use its existence to make sure that the data for a 
given time is complete or not
+  LOG.debug("Saving max prune upper bound {} for time {}", 
pruneUpperBoundForTime, time);
+  dataJanitorState.savePruneUpperBoundForTime(time, 
pruneUpperBoundForTime);
+}
+
+return computePruneUpperBound(new TimeRegions(time, 
transactionalRegions));
+  }
+
+  @Override
+  public void pruneComplete(long time, long pruneUpperBound) throws 
IOException {
+LOG.debug("Prune complete for time {} and prune upper bound {}", time, 
pruneUpperBound);
+// Get regions for given time, so as to not delete them
+TimeRegions regionsToExclude =

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89206461
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/janitor/DataJanitorPlugin.java ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.janitor;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Data janitor interface to manage the invalid transaction list.
+ * There will be one such plugin per data store that will be invoked 
periodically
+ * to fetch the prune upper bound for each data store.
+ * Invalid transaction list will pruned up to the minimum of prune upper 
bounds returned by all the plugins.
+ */
+public interface DataJanitorPlugin {
--- End diff --

The naming in not clear. Can we call it a TransactionPruningPlugin? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89938230
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/janitor/DataJanitorPlugin.java ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.janitor;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Data janitor interface to manage the invalid transaction list.
+ * There will be one such plugin per data store that will be invoked 
periodically
+ * to fetch the prune upper bound for each data store.
+ * Invalid transaction list will pruned up to the minimum of prune upper 
bounds returned by all the plugins.
+ */
+public interface DataJanitorPlugin {
+  /**
+   * Called once at the beginning to initialize the plugin
+   */
+  void initialize(Configuration conf) throws IOException;
--- End diff --

at the beginning of what? The lifetime of a transaction manager? Or the 
beginning of a prune operation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89938297
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/janitor/DataJanitorPlugin.java ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.janitor;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Data janitor interface to manage the invalid transaction list.
+ * There will be one such plugin per data store that will be invoked 
periodically
--- End diff --

It would make sense to mention here that this a plugin for the tx manager. 
Its name is a little misleading because the DataJanitor is a coprocessor that 
runs in each region server. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89206584
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/janitor/DataJanitorPlugin.java ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.janitor;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Data janitor interface to manage the invalid transaction list.
+ * There will be one such plugin per data store that will be invoked 
periodically
--- End diff --

Is there one per each HBase instance? Or one per HBase table?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89938407
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/janitor/DataJanitorPlugin.java ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.janitor;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Data janitor interface to manage the invalid transaction list.
+ * There will be one such plugin per data store that will be invoked 
periodically
+ * to fetch the prune upper bound for each data store.
+ * Invalid transaction list will pruned up to the minimum of prune upper 
bounds returned by all the plugins.
+ */
+public interface DataJanitorPlugin {
+  /**
+   * Called once at the beginning to initialize the plugin
+   */
+  void initialize(Configuration conf) throws IOException;
+
+  /**
+   * Called periodically to fetch prune upper bound for a data store
+   *
+   * @param time start time of this prune iteration
+   * @param pruneUpperBoundForTime upper bound for prune tx id for the 
given start time
--- End diff --

this is confusing. I thought this _returns_ the prune upper bound? Why does 
it need the upper bound passed in as a parameter. I think you need to explain 
better what this parameter means. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89938969
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DefaultDataJanitorPlugin.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.janitor.DataJanitorPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Default implementation of the {@link DataJanitorPlugin} for HBase
+ */
+@SuppressWarnings("WeakerAccess")
+public class DefaultDataJanitorPlugin implements DataJanitorPlugin {
+  public static final Logger LOG = 
LoggerFactory.getLogger(DefaultDataJanitorPlugin.class);
+
+  protected Configuration conf;
+  protected Connection connection;
+  protected DataJanitorState dataJanitorState;
+
+  @Override
+  public void initialize(Configuration conf) throws IOException {
+this.conf = conf;
+this.connection = ConnectionFactory.createConnection(conf);
+
+final TableName stateTable = 
TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
+
TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE));
+LOG.info("Initializing plugin with state table {}", 
stateTable.getNameWithNamespaceInclAsString());
+this.dataJanitorState = new DataJanitorState(new 
DataJanitorState.TableSupplier() {
+  @Override
+  public Table get() throws IOException {
+return connection.getTable(stateTable);
+  }
+});
+  }
+
+  @Override
+  public long fetchPruneUpperBound(long time, long pruneUpperBoundForTime) 
throws IOException {
+LOG.debug("Fetching prune upper bound for time {} and max prune upper 
bound {}", time, pruneUpperBoundForTime);
+if (time < 0 || pruneUpperBoundForTime < 0) {
+  return -1;
+}
+
+SortedSet transactionalRegions = getTransactionalRegions();
+if (!transactionalRegions.isEmpty()) {
+  LOG.debug("Saving {} transactional regions for time {}", 
transactionalRegions.size(), time);
+  dataJanitorState.saveRegionsForTime(time, transactionalRegions);
--- End diff --

I am not sure I understand. Why does fetch() have to save anything? 
Shouldn't it just read (=fetch)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89207961
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
@@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) 
throws IOException {
 }
   }
 
+  /**
+   * Get latest prune upper bounds for given regions
+   *
+   * @param regions a set of regions
+   * @return a map containing region id and its latest prune upper bound 
value
+   * @throws IOException when not able to read the data from HBase
+   */
+  public Map getPruneUpperBoundForRegions(SortedSet 
regions) throws IOException {
+Map resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (regions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  resultMap.put(region, pruneUpperBoundRegion);
+}
+  }
+}
+  }
+  return resultMap;
+}
+  }
+
+  /**
+   * Delete all regions that are not in the given exclude set and whose 
prune upper bound is less than a given value
--- End diff --

what is this used for? Might be useful to say that in the javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89377823
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
@@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) 
throws IOException {
 }
   }
 
+  /**
+   * Get latest prune upper bounds for given regions
+   *
+   * @param regions a set of regions
+   * @return a map containing region id and its latest prune upper bound 
value
+   * @throws IOException when not able to read the data from HBase
+   */
+  public Map getPruneUpperBoundForRegions(SortedSet 
regions) throws IOException {
+Map resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (regions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  resultMap.put(region, pruneUpperBoundRegion);
+}
+  }
+}
+  }
+  return resultMap;
+}
+  }
+
+  /**
+   * Delete all regions that are not in the given exclude set and whose 
prune upper bound is less than a given value
+   *
+   * @param deletionPruneUpperBound prune upper bound below which regions 
will be deleted
+   * @param excludeRegions set of regions that should not be deleted
+   * @throws IOException when not able to delete data in HBase
+   */
+  public void deleteRegionsWithPruneUpperBoundBefore(long 
deletionPruneUpperBound, SortedSet excludeRegions)
+throws IOException {
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (!excludeRegions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  if (pruneUpperBoundRegion < deletionPruneUpperBound) {
+stateTable.delete(new Delete(next.getRow()));
+  }
+}
+  }
+}
+  }
+}
+  }
+
+  // ---
+  // --- Methods for regions at a given time ---
+  // Key: 0x2
+  // Col 't': 
+  // ---
+
+  /**
+   * Persist the regions for a given time
+   *
+   * @param time timestamp in milliseconds
+   * @param regions set of regions at the time
+   * @throws IOException when not able to persist the data to HBase
+   */
+  public void saveRegionsForTime(long time, Set regions) throws 
IOException {
+byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+try (Table stateTable = stateTableSupplier.get()) {
+  for (byte[] region : regions) {
+Put put = new Put(makeTimeRegionKey(timeBytes, region));
+put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+stateTable.put(put);
+  }
+}
+  }
+
+  /**
+   * Return all the persisted regions for a time equal to or less than the 
given time
+   *
--- End diff --

If I read the code correctly, then this finds the greatest time that is 
less than the given time, and then return all regions with that exact time, but 
none that are older than that. Is that correct? The javadoc is not that clear 
about it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89391878
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
@@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) 
throws IOException {
 }
   }
 
+  /**
+   * Get latest prune upper bounds for given regions
+   *
+   * @param regions a set of regions
+   * @return a map containing region id and its latest prune upper bound 
value
+   * @throws IOException when not able to read the data from HBase
+   */
+  public Map getPruneUpperBoundForRegions(SortedSet 
regions) throws IOException {
+Map resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (regions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  resultMap.put(region, pruneUpperBoundRegion);
+}
+  }
+}
+  }
+  return resultMap;
+}
+  }
+
+  /**
+   * Delete all regions that are not in the given exclude set and whose 
prune upper bound is less than a given value
+   *
+   * @param deletionPruneUpperBound prune upper bound below which regions 
will be deleted
+   * @param excludeRegions set of regions that should not be deleted
+   * @throws IOException when not able to delete data in HBase
+   */
+  public void deleteRegionsWithPruneUpperBoundBefore(long 
deletionPruneUpperBound, SortedSet excludeRegions)
+throws IOException {
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (!excludeRegions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  if (pruneUpperBoundRegion < deletionPruneUpperBound) {
+stateTable.delete(new Delete(next.getRow()));
+  }
+}
+  }
+}
+  }
+}
+  }
+
+  // ---
+  // --- Methods for regions at a given time ---
+  // Key: 0x2
+  // Col 't': 
+  // ---
+
+  /**
+   * Persist the regions for a given time
+   *
+   * @param time timestamp in milliseconds
+   * @param regions set of regions at the time
+   * @throws IOException when not able to persist the data to HBase
+   */
+  public void saveRegionsForTime(long time, Set regions) throws 
IOException {
+byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+try (Table stateTable = stateTableSupplier.get()) {
+  for (byte[] region : regions) {
+Put put = new Put(makeTimeRegionKey(timeBytes, region));
+put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+stateTable.put(put);
+  }
+}
+  }
+
+  /**
+   * Return all the persisted regions for a time equal to or less than the 
given time
+   *
+   * @param time timestamp in milliseconds
+   * @return set of regions and time at which they were recorded
+   * @throws IOException when not able to read the data from HBase
+   */
+  @Nullable
+  public TimeRegions getRegionsOnOrBeforeTime(long time) throws 
IOException {
+byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+try (Table stateTable = stateTableSupplier.get()) {
+  Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), 
REGION_TIME_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, REGION_TIME_COL);
+
+  SortedSet regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+  long currentRe

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89391984
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
@@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) 
throws IOException {
 }
   }
 
+  /**
+   * Get latest prune upper bounds for given regions
+   *
+   * @param regions a set of regions
+   * @return a map containing region id and its latest prune upper bound 
value
+   * @throws IOException when not able to read the data from HBase
+   */
+  public Map getPruneUpperBoundForRegions(SortedSet 
regions) throws IOException {
+Map resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (regions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  resultMap.put(region, pruneUpperBoundRegion);
+}
+  }
+}
+  }
+  return resultMap;
+}
+  }
+
+  /**
+   * Delete all regions that are not in the given exclude set and whose 
prune upper bound is less than a given value
+   *
+   * @param deletionPruneUpperBound prune upper bound below which regions 
will be deleted
+   * @param excludeRegions set of regions that should not be deleted
+   * @throws IOException when not able to delete data in HBase
+   */
+  public void deleteRegionsWithPruneUpperBoundBefore(long 
deletionPruneUpperBound, SortedSet excludeRegions)
+throws IOException {
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (!excludeRegions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  if (pruneUpperBoundRegion < deletionPruneUpperBound) {
+stateTable.delete(new Delete(next.getRow()));
+  }
+}
+  }
+}
+  }
+}
+  }
+
+  // ---
+  // --- Methods for regions at a given time ---
+  // Key: 0x2
+  // Col 't': 
+  // ---
+
+  /**
+   * Persist the regions for a given time
+   *
+   * @param time timestamp in milliseconds
+   * @param regions set of regions at the time
+   * @throws IOException when not able to persist the data to HBase
+   */
+  public void saveRegionsForTime(long time, Set regions) throws 
IOException {
+byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+try (Table stateTable = stateTableSupplier.get()) {
+  for (byte[] region : regions) {
+Put put = new Put(makeTimeRegionKey(timeBytes, region));
+put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+stateTable.put(put);
+  }
+}
+  }
+
+  /**
+   * Return all the persisted regions for a time equal to or less than the 
given time
+   *
+   * @param time timestamp in milliseconds
+   * @return set of regions and time at which they were recorded
+   * @throws IOException when not able to read the data from HBase
+   */
+  @Nullable
+  public TimeRegions getRegionsOnOrBeforeTime(long time) throws 
IOException {
+byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+try (Table stateTable = stateTableSupplier.get()) {
+  Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), 
REGION_TIME_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, REGION_TIME_COL);
+
+  SortedSet regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+  long currentRe

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89207839
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
@@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) 
throws IOException {
 }
   }
 
+  /**
+   * Get latest prune upper bounds for given regions
+   *
+   * @param regions a set of regions
+   * @return a map containing region id and its latest prune upper bound 
value
+   * @throws IOException when not able to read the data from HBase
+   */
+  public Map getPruneUpperBoundForRegions(SortedSet 
regions) throws IOException {
+Map resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (regions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  resultMap.put(region, pruneUpperBoundRegion);
+}
+  }
+}
+  }
+  return resultMap;
+}
+  }
+
+  /**
+   * Delete all regions that are not in the given exclude set and whose 
prune upper bound is less than a given value
--- End diff --

period.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89391681
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
@@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) 
throws IOException {
 }
   }
 
+  /**
+   * Get latest prune upper bounds for given regions
+   *
+   * @param regions a set of regions
+   * @return a map containing region id and its latest prune upper bound 
value
+   * @throws IOException when not able to read the data from HBase
+   */
+  public Map getPruneUpperBoundForRegions(SortedSet 
regions) throws IOException {
+Map resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (regions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  resultMap.put(region, pruneUpperBoundRegion);
+}
+  }
+}
+  }
+  return resultMap;
+}
+  }
+
+  /**
+   * Delete all regions that are not in the given exclude set and whose 
prune upper bound is less than a given value
+   *
+   * @param deletionPruneUpperBound prune upper bound below which regions 
will be deleted
+   * @param excludeRegions set of regions that should not be deleted
+   * @throws IOException when not able to delete data in HBase
+   */
+  public void deleteRegionsWithPruneUpperBoundBefore(long 
deletionPruneUpperBound, SortedSet excludeRegions)
+throws IOException {
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (!excludeRegions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  if (pruneUpperBoundRegion < deletionPruneUpperBound) {
+stateTable.delete(new Delete(next.getRow()));
+  }
+}
+  }
+}
+  }
+}
+  }
+
+  // ---
+  // --- Methods for regions at a given time ---
+  // Key: 0x2
+  // Col 't': 
+  // ---
+
+  /**
+   * Persist the regions for a given time
+   *
+   * @param time timestamp in milliseconds
+   * @param regions set of regions at the time
+   * @throws IOException when not able to persist the data to HBase
+   */
+  public void saveRegionsForTime(long time, Set regions) throws 
IOException {
+byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+try (Table stateTable = stateTableSupplier.get()) {
+  for (byte[] region : regions) {
+Put put = new Put(makeTimeRegionKey(timeBytes, region));
+put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+stateTable.put(put);
+  }
+}
+  }
+
+  /**
+   * Return all the persisted regions for a time equal to or less than the 
given time
+   *
+   * @param time timestamp in milliseconds
+   * @return set of regions and time at which they were recorded
+   * @throws IOException when not able to read the data from HBase
+   */
+  @Nullable
+  public TimeRegions getRegionsOnOrBeforeTime(long time) throws 
IOException {
+byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+try (Table stateTable = stateTableSupplier.get()) {
+  Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), 
REGION_TIME_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, REGION_TIME_COL);
+
+  SortedSet regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+  long currentRe

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89938666
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
@@ -19,20 +19,46 @@
 
 package org.apache.tephra.hbase.coprocessor.janitor;
 
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import javax.annotation.Nullable;
 
 /**
  * Persist data janitor state into an HBase table.
--- End diff --

Is it true that this class is shared between the DataJanitor coprocessor 
(for saving prune state) and the DataJanitorPlugin in the tx manager (for 
reading the state)? If so, then this deserves mentioning in the javadocs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89938828
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
@@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) 
throws IOException {
 }
   }
 
+  /**
+   * Get latest prune upper bounds for given regions
+   *
+   * @param regions a set of regions
+   * @return a map containing region id and its latest prune upper bound 
value
+   * @throws IOException when not able to read the data from HBase
+   */
+  public Map getPruneUpperBoundForRegions(SortedSet 
regions) throws IOException {
+Map resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (regions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  resultMap.put(region, pruneUpperBoundRegion);
+}
+  }
+}
+  }
+  return resultMap;
+}
+  }
+
+  /**
+   * Delete all regions that are not in the given exclude set and whose 
prune upper bound is less than a given value
+   *
+   * @param deletionPruneUpperBound prune upper bound below which regions 
will be deleted
+   * @param excludeRegions set of regions that should not be deleted
+   * @throws IOException when not able to delete data in HBase
+   */
+  public void deleteRegionsWithPruneUpperBoundBefore(long 
deletionPruneUpperBound, SortedSet excludeRegions)
+throws IOException {
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (!excludeRegions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  if (pruneUpperBoundRegion < deletionPruneUpperBound) {
+stateTable.delete(new Delete(next.getRow()));
+  }
+}
+  }
+}
+  }
+}
+  }
+
+  // ---
+  // --- Methods for regions at a given time ---
+  // Key: 0x2
+  // Col 't': 
+  // ---
+
+  /**
+   * Persist the regions for a given time
--- End diff --

what exactly does this persist? Simply what regions existed at that time?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-21 Thread poornachandra
GitHub user poornachandra opened a pull request:

https://github.com/apache/incubator-tephra/pull/20

Compute global prune upper bound using compaction state of every region

JIRA - https://issues.apache.org/jira/browse/TEPHRA-198

This PR uses the compaction state recorded by co-processors in 
https://github.com/apache/incubator-tephra/pull/19 to compute the global prune 
upper bound.

TODO: add some more test cases to test DefaultDataJanitorPlugin.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/poornachandra/incubator-tephra 
feature/transaction-pruning

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-tephra/pull/20.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20


commit 0c4001d088b432bbe68a5251e89665930d710f04
Author: poorna 
Date:   2016-11-10T23:09:02Z

Compute global prune upper bound using compaction state of every region




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---