Github user anew commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/20#discussion_r89391984 --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java --- @@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) throws IOException { } } + /** + * Get latest prune upper bounds for given regions + * + * @param regions a set of regions + * @return a map containing region id and its latest prune upper bound value + * @throws IOException when not able to read the data from HBase + */ + public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException { + Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + try (Table stateTable = stateTableSupplier.get()) { + byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY); + Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); + + try (ResultScanner scanner = stateTable.getScanner(scan)) { + Result next; + while ((next = scanner.next()) != null) { + byte[] region = getRegionFromKey(next.getRow()); + if (regions.contains(region)) { + byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL); + if (timeBytes != null) { + long pruneUpperBoundRegion = Bytes.toLong(timeBytes); + resultMap.put(region, pruneUpperBoundRegion); + } + } + } + } + return resultMap; + } + } + + /** + * Delete all regions that are not in the given exclude set and whose prune upper bound is less than a given value + * + * @param deletionPruneUpperBound prune upper bound below which regions will be deleted + * @param excludeRegions set of regions that should not be deleted + * @throws IOException when not able to delete data in HBase + */ + public void deleteRegionsWithPruneUpperBoundBefore(long deletionPruneUpperBound, SortedSet<byte[]> excludeRegions) + throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY); + Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); + + try (ResultScanner scanner = stateTable.getScanner(scan)) { + Result next; + while ((next = scanner.next()) != null) { + byte[] region = getRegionFromKey(next.getRow()); + if (!excludeRegions.contains(region)) { + byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL); + if (timeBytes != null) { + long pruneUpperBoundRegion = Bytes.toLong(timeBytes); + if (pruneUpperBoundRegion < deletionPruneUpperBound) { + stateTable.delete(new Delete(next.getRow())); + } + } + } + } + } + } + } + + // --------------------------------------------------- + // ------- Methods for regions at a given time ------- + // Key: 0x2<time><region-id> + // Col 't': <empty byte array> + // --------------------------------------------------- + + /** + * Persist the regions for a given time + * + * @param time timestamp in milliseconds + * @param regions set of regions at the time + * @throws IOException when not able to persist the data to HBase + */ + public void saveRegionsForTime(long time, Set<byte[]> regions) throws IOException { + byte[] timeBytes = Bytes.toBytes(getInvertedTime(time)); + try (Table stateTable = stateTableSupplier.get()) { + for (byte[] region : regions) { + Put put = new Put(makeTimeRegionKey(timeBytes, region)); + put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY); + stateTable.put(put); + } + } + } + + /** + * Return all the persisted regions for a time equal to or less than the given time + * + * @param time timestamp in milliseconds + * @return set of regions and time at which they were recorded + * @throws IOException when not able to read the data from HBase + */ + @Nullable + public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException { + byte[] timeBytes = Bytes.toBytes(getInvertedTime(time)); + try (Table stateTable = stateTableSupplier.get()) { + Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, REGION_TIME_COL); + + SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + long currentRegionTime = -1; + try (ResultScanner scanner = stateTable.getScanner(scan)) { + Result next; + while ((next = scanner.next()) != null) { + Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow()); + // Stop if reached next time value + if (currentRegionTime == -1) { + currentRegionTime = timeRegion.getKey(); + } else if (timeRegion.getKey() < currentRegionTime) { + break; + } else if (timeRegion.getKey() > currentRegionTime) { + throw new IllegalStateException( + String.format("Got out of order time %d when expecting time lesser than %d", + timeRegion.getKey(), currentRegionTime)); + } + regions.add(timeRegion.getValue()); + } + } + return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, regions); + } + } + + /** + * Delete all the regions that were recorded for all times equal or less than the given time + * + * @param time timestamp in milliseconds + * @throws IOException when not able to delete data in HBase + */ + public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException { + byte[] timeBytes = Bytes.toBytes(getInvertedTime(time)); + try (Table stateTable = stateTableSupplier.get()) { + Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, REGION_TIME_COL); + + try (ResultScanner scanner = stateTable.getScanner(scan)) { + Result next; + while ((next = scanner.next()) != null) { + stateTable.delete(new Delete(next.getRow())); + } + } + } + } + + // ---------------------------------------------------------------- + // ------- Methods for max prune upper bound for given time ------- + // Key: 0x3<inverted time> + // Col 'p': <prune upper bound> + // ---------------------------------------------------------------- + + /** + * Persist prune upper bound for a given time + * + * @param time time in milliseconds + * @param pruneUpperBoundTime prune upper bound for the given time + * @throws IOException when not able to persist the data to HBase + */ + public void savePruneUpperBoundForTime(long time, long pruneUpperBoundTime) throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + Put put = new Put(makePruneUpperBoundTimeKey(Bytes.toBytes(getInvertedTime(time)))); + put.addColumn(FAMILY, PRUNE_UPPER_BOUND_TIME_COL, Bytes.toBytes(pruneUpperBoundTime)); + stateTable.put(put); + } + } + + /** + * Return prune upper bound for the given time + * + * @param time time in milliseconds + * @return prune upper bound for the given time + * @throws IOException when not able to read the data from HBase + */ + public long getPruneUpperBoundForTime(long time) throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + Get get = new Get(makePruneUpperBoundTimeKey(Bytes.toBytes(getInvertedTime(time)))); + get.addColumn(FAMILY, PRUNE_UPPER_BOUND_TIME_COL); + byte[] result = stateTable.get(get).getValue(FAMILY, PRUNE_UPPER_BOUND_TIME_COL); + return result == null ? -1 : Bytes.toLong(result); + } + } + + /** + * Delete all prune upper bounds recorded for a time less than the given time + * + * @param time time in milliseconds + * @throws IOException when not able to delete data in HBase + */ + public void deletePruneUpperBoundsOnOrBeforeTime(long time) throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + Scan scan = new Scan(makePruneUpperBoundTimeKey(Bytes.toBytes(getInvertedTime(time))), + PRUNE_UPPER_BOUND_TIME_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_TIME_COL); + + try (ResultScanner scanner = stateTable.getScanner(scan)) { + Result next; + while ((next = scanner.next()) != null) { + stateTable.delete(new Delete(next.getRow())); + } + } + } + } + private byte[] makeRegionKey(byte[] regionId) { return Bytes.add(REGION_KEY_PREFIX, regionId); } + private byte[] getRegionFromKey(byte[] regionKey) { + return Bytes.copy(regionKey, 1, regionKey.length - 1); + } + + private byte[] makeTimeRegionKey(byte[] time, byte[] regionId) { + return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId); + } + + private byte[] makePruneUpperBoundTimeKey(byte[] time) { + return Bytes.add(PRUNE_UPPER_BOUND_TIME_KEY_PREFIX, time); + } + + private Map.Entry<Long, byte[]> getTimeRegion(byte[] key) { + int offset = 1; --- End diff -- REGION_TIME_KEY_PREFIX.length
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---