Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r89391984
  
    --- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
    @@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) 
throws IOException {
         }
       }
     
    +  /**
    +   * Get latest prune upper bounds for given regions
    +   *
    +   * @param regions a set of regions
    +   * @return a map containing region id and its latest prune upper bound 
value
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> 
regions) throws IOException {
    +    Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (regions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              resultMap.put(region, pruneUpperBoundRegion);
    +            }
    +          }
    +        }
    +      }
    +      return resultMap;
    +    }
    +  }
    +
    +  /**
    +   * Delete all regions that are not in the given exclude set and whose 
prune upper bound is less than a given value
    +   *
    +   * @param deletionPruneUpperBound prune upper bound below which regions 
will be deleted
    +   * @param excludeRegions set of regions that should not be deleted
    +   * @throws IOException when not able to delete data in HBase
    +   */
    +  public void deleteRegionsWithPruneUpperBoundBefore(long 
deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
    +    throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (!excludeRegions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              if (pruneUpperBoundRegion < deletionPruneUpperBound) {
    +                stateTable.delete(new Delete(next.getRow()));
    +              }
    +            }
    +          }
    +        }
    +      }
    +    }
    +  }
    +
    +  // ---------------------------------------------------
    +  // ------- Methods for regions at a given time -------
    +  // Key: 0x2<time><region-id>
    +  // Col 't': <empty byte array>
    +  // ---------------------------------------------------
    +
    +  /**
    +   * Persist the regions for a given time
    +   *
    +   * @param time timestamp in milliseconds
    +   * @param regions set of regions at the time
    +   * @throws IOException when not able to persist the data to HBase
    +   */
    +  public void saveRegionsForTime(long time, Set<byte[]> regions) throws 
IOException {
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      for (byte[] region : regions) {
    +        Put put = new Put(makeTimeRegionKey(timeBytes, region));
    +        put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
    +        stateTable.put(put);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Return all the persisted regions for a time equal to or less than the 
given time
    +   *
    +   * @param time timestamp in milliseconds
    +   * @return set of regions and time at which they were recorded
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  @Nullable
    +  public TimeRegions getRegionsOnOrBeforeTime(long time) throws 
IOException {
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), 
REGION_TIME_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, REGION_TIME_COL);
    +
    +      SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
    +      long currentRegionTime = -1;
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          Map.Entry<Long, byte[]> timeRegion = 
getTimeRegion(next.getRow());
    +          // Stop if reached next time value
    +          if (currentRegionTime == -1) {
    +            currentRegionTime = timeRegion.getKey();
    +          } else if (timeRegion.getKey() < currentRegionTime) {
    +            break;
    +          } else if (timeRegion.getKey() > currentRegionTime) {
    +            throw new IllegalStateException(
    +              String.format("Got out of order time %d when expecting time 
lesser than %d",
    +                            timeRegion.getKey(), currentRegionTime));
    +          }
    +          regions.add(timeRegion.getValue());
    +        }
    +      }
    +      return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, 
regions);
    +    }
    +  }
    +
    +  /**
    +   * Delete all the regions that were recorded for all times equal or less 
than the given time
    +   *
    +   * @param time timestamp in milliseconds
    +   * @throws IOException when not able to delete data in HBase
    +   */
    +  public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException 
{
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), 
REGION_TIME_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, REGION_TIME_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          stateTable.delete(new Delete(next.getRow()));
    +        }
    +      }
    +    }
    +  }
    +
    +  // ----------------------------------------------------------------
    +  // ------- Methods for max prune upper bound for given time -------
    +  // Key: 0x3<inverted time>
    +  // Col 'p': <prune upper bound>
    +  // ----------------------------------------------------------------
    +
    +  /**
    +   * Persist prune upper bound for a given time
    +   *
    +   * @param time time in milliseconds
    +   * @param pruneUpperBoundTime prune upper bound for the given time
    +   * @throws IOException when not able to persist the data to HBase
    +   */
    +  public void savePruneUpperBoundForTime(long time, long 
pruneUpperBoundTime) throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Put put = new 
Put(makePruneUpperBoundTimeKey(Bytes.toBytes(getInvertedTime(time))));
    +      put.addColumn(FAMILY, PRUNE_UPPER_BOUND_TIME_COL, 
Bytes.toBytes(pruneUpperBoundTime));
    +      stateTable.put(put);
    +    }
    +  }
    +
    +  /**
    +   * Return prune upper bound for the given time
    +   *
    +   * @param time time in milliseconds
    +   * @return prune upper bound for the given time
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  public long getPruneUpperBoundForTime(long time) throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Get get = new 
Get(makePruneUpperBoundTimeKey(Bytes.toBytes(getInvertedTime(time))));
    +      get.addColumn(FAMILY, PRUNE_UPPER_BOUND_TIME_COL);
    +      byte[] result = stateTable.get(get).getValue(FAMILY, 
PRUNE_UPPER_BOUND_TIME_COL);
    +      return result == null ? -1 : Bytes.toLong(result);
    +    }
    +  }
    +
    +  /**
    +   * Delete all prune upper bounds recorded for a time less than the given 
time
    +   *
    +   * @param time time in milliseconds
    +   * @throws IOException when not able to delete data in HBase
    +   */
    +  public void deletePruneUpperBoundsOnOrBeforeTime(long time) throws 
IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Scan scan = new 
Scan(makePruneUpperBoundTimeKey(Bytes.toBytes(getInvertedTime(time))),
    +                           PRUNE_UPPER_BOUND_TIME_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_TIME_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          stateTable.delete(new Delete(next.getRow()));
    +        }
    +      }
    +    }
    +  }
    +
       private byte[] makeRegionKey(byte[] regionId) {
         return Bytes.add(REGION_KEY_PREFIX, regionId);
       }
     
    +  private byte[] getRegionFromKey(byte[] regionKey) {
    +    return Bytes.copy(regionKey, 1, regionKey.length - 1);
    +  }
    +
    +  private byte[] makeTimeRegionKey(byte[] time, byte[] regionId) {
    +    return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId);
    +  }
    +
    +  private byte[] makePruneUpperBoundTimeKey(byte[] time) {
    +    return Bytes.add(PRUNE_UPPER_BOUND_TIME_KEY_PREFIX, time);
    +  }
    +
    +  private Map.Entry<Long, byte[]> getTimeRegion(byte[] key) {
    +    int offset = 1;
    --- End diff --
    
    REGION_TIME_KEY_PREFIX.length


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to