This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 0375c6aefdb HBASE-29863 Adding support for capturing files read during
scan (#7823)
0375c6aefdb is described below
commit 0375c6aefdbf96a8bbe145524ede30c21c6d2b1f
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Sat Mar 28 00:26:11 2026 +0530
HBASE-29863 Adding support for capturing files read during scan (#7823)
(cherry picked from commit 3cce140a93195cfe22838a02810a5b98a16b568f)
Signed-off-by: Andrew Purtell <[email protected]>
Reviewed-by: gvprathyusha6 <[email protected]>
---
.../hbase/mapreduce/TableSnapshotInputFormat.java | 13 +-
.../mapreduce/TableSnapshotInputFormatImpl.java | 2 +-
.../mapreduce/TestTableSnapshotInputFormat.java | 97 ++++++++++
.../hbase/client/ClientSideRegionScanner.java | 14 ++
.../java/org/apache/hadoop/hbase/mob/MobCell.java | 11 ++
.../hadoop/hbase/regionserver/KeyValueHeap.java | 19 ++
.../hadoop/hbase/regionserver/KeyValueScanner.java | 7 +
.../hadoop/hbase/regionserver/MobStoreScanner.java | 18 ++
.../hbase/regionserver/NonLazyKeyValueScanner.java | 11 ++
.../hadoop/hbase/regionserver/RegionScanner.java | 8 +
.../hbase/regionserver/RegionScannerImpl.java | 17 ++
.../regionserver/ReversedMobStoreScanner.java | 18 ++
.../hadoop/hbase/regionserver/SegmentScanner.java | 11 ++
.../hbase/regionserver/StoreFileScanner.java | 16 ++
.../hadoop/hbase/regionserver/StoreScanner.java | 37 +++-
.../hbase/client/TestBlockEvictionFromClient.java | 7 +
.../hbase/client/TestClientSideRegionScanner.java | 78 ++++++++
.../coprocessor/TestCoprocessorInterface.java | 6 +
.../regionserver/DelegatingKeyValueScanner.java | 6 +
.../hadoop/hbase/regionserver/TestHMobStore.java | 102 +++++++++-
.../hadoop/hbase/regionserver/TestHRegion.java | 55 ++++++
.../hbase/regionserver/TestKeyValueHeap.java | 76 ++++++++
.../hbase/regionserver/TestStoreFileScanner.java | 132 +++++++++++++
.../hbase/regionserver/TestStoreScanner.java | 211 +++++++++++++++++++++
.../hbase/regionserver/TestSwitchToStreamRead.java | 90 +++++++++
25 files changed, 1054 insertions(+), 8 deletions(-)
diff --git
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
index e7a124b98f1..ad4858ed4c4 100644
---
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
+++
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
@@ -22,6 +22,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -138,8 +139,8 @@ public class TableSnapshotInputFormat extends
InputFormat<ImmutableBytesWritable
}
}
- @InterfaceAudience.Private
- static class TableSnapshotRegionRecordReader
+ @InterfaceAudience.Public
+ public static class TableSnapshotRegionRecordReader
extends RecordReader<ImmutableBytesWritable, Result> {
private TableSnapshotInputFormatImpl.RecordReader delegate =
new TableSnapshotInputFormatImpl.RecordReader();
@@ -179,6 +180,14 @@ public class TableSnapshotInputFormat extends
InputFormat<ImmutableBytesWritable
return delegate.getProgress();
}
+ /**
+ * Returns the set of store file paths that were successfully read by the
underlying region
+ * scanner. Typically populated after the reader (or scanner) is closed.
+ */
+ public Set<Path> getFilesRead() {
+ return delegate.getScanner().getFilesRead();
+ }
+
@Override
public void close() throws IOException {
delegate.close();
diff --git
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
index f8fa1f6294c..5e1b840aa4e 100644
---
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
+++
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
@@ -562,7 +562,7 @@ public class TableSnapshotInputFormatImpl {
return getBestLocations(conf, blockDistribution, 3);
}
- private static String getSnapshotName(Configuration conf) {
+ public static String getSnapshotName(Configuration conf) {
String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
if (snapshotName == null) {
throw new IllegalArgumentException("Snapshot name must be provided");
diff --git
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
index 9909ce15af9..26dfe3ec4bb 100644
---
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
+++
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
@@ -29,8 +29,12 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -44,7 +48,9 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TestTableSnapshotScanner;
+import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import
org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionRecordReader;
import
org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
@@ -142,6 +148,97 @@ public class TestTableSnapshotInputFormat extends
TableSnapshotInputFormatTestBa
TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
}
+ @Test
+ public void testTableSnapshotRegionRecordReaderGetFilesRead() throws
Exception {
+ final TableName tableName = TableName.valueOf(name.getMethodName());
+ String snapshotName = name.getMethodName() + "_snapshot";
+ try {
+ // Setup: create table, load data, snapshot, and configure job with
restore dir
+ createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(),
getEndRow(), 1);
+
+ Configuration conf = UTIL.getConfiguration();
+ Job job = new Job(conf);
+ Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
+ Scan scan = new
Scan().withStartRow(getStartRow()).withStopRow(getEndRow());
+ TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
+ TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
NullWritable.class, job, false,
+ tmpTableDir);
+
+ // Get splits (one per region) and extract delegate split for restore
path and region info
+ TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
+ List<InputSplit> splits = tsif.getSplits(job);
+ Assert.assertEquals(1, splits.size());
+
+ InputSplit split = splits.get(0);
+ Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
+ TableSnapshotRegionSplit snapshotRegionSplit =
(TableSnapshotRegionSplit) split;
+ TableSnapshotInputFormatImpl.InputSplit implSplit =
snapshotRegionSplit.getDelegate();
+
+ // Collect expected store file paths from the restored region directory
+ Set<String> expectedFiles = new HashSet<>();
+ Path restorePath = new Path(implSplit.getRestoreDir());
+ FileSystem fs = restorePath.getFileSystem(conf);
+ Path tableDir =
+ CommonFSUtils.getTableDir(restorePath,
implSplit.getTableDescriptor().getTableName());
+ Path regionPath = new Path(tableDir,
implSplit.getRegionInfo().getEncodedName());
+ FileStatus[] familyDirs = fs.listStatus(regionPath);
+ if (familyDirs != null) {
+ for (FileStatus fam : familyDirs) {
+ if (fam.isDirectory()) {
+ FileStatus[] files = fs.listStatus(fam.getPath());
+ if (files != null) {
+ for (FileStatus f : files) {
+ if (f.isFile()) {
+ String referenceFileName = f.getPath().getName();
+
expectedFiles.add(HFileLink.getReferencedHFileName(referenceFileName));
+ }
+ }
+ }
+ }
+ }
+ }
+ Assert.assertFalse("Should have at least one store file after snapshot
restore",
+ expectedFiles.isEmpty());
+
+ // Create record reader, initialize with split (opens underlying
ClientSideRegionScanner)
+ TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
+
when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
+
+ RecordReader<ImmutableBytesWritable, Result> rr =
+ tsif.createRecordReader(split, taskAttemptContext);
+ Assert.assertTrue(rr instanceof TableSnapshotRegionRecordReader);
+ TableSnapshotRegionRecordReader recordReader =
(TableSnapshotRegionRecordReader) rr;
+ recordReader.initialize(split, taskAttemptContext);
+
+ // Before close: getFilesRead() must be empty
+ Set<Path> filesReadBeforeClose = recordReader.getFilesRead();
+ Assert.assertTrue("Should return empty set before closing",
filesReadBeforeClose.isEmpty());
+
+ // Read a few key-values; getFilesRead() must still be empty until close
+ int count = 0;
+ while (count < 3 && recordReader.nextKeyValue()) {
+ count++;
+ }
+
+ filesReadBeforeClose = recordReader.getFilesRead();
+ Assert.assertTrue("Should return empty set before closing even after
reading",
+ filesReadBeforeClose.isEmpty());
+
+ // Close reader so underlying scanner reports files successfully read
+ recordReader.close();
+
+ // After close: getFilesRead() must match expected store file set
+ Set<String> filesReadAfterClose =
+
recordReader.getFilesRead().stream().map(Path::getName).collect(Collectors.toSet());
+
+ Assert.assertEquals("Should contain all expected file paths",
expectedFiles,
+ filesReadAfterClose);
+ } finally {
+ UTIL.getAdmin().deleteSnapshot(snapshotName);
+ UTIL.deleteTable(tableName);
+ }
+ }
+
public static enum TestTableSnapshotCounters {
VALIDATION_ERROR
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
index df99fd40338..772aa5a07ee 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
@@ -19,7 +19,10 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -53,6 +56,7 @@ public class ClientSideRegionScanner extends
AbstractClientScanner {
RegionScanner scanner;
List<Cell> values;
boolean hasMore = true;
+ private final Set<Path> filesRead;
public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path
rootDir,
TableDescriptor htd, RegionInfo hri, Scan scan, ScanMetrics scanMetrics)
throws IOException {
@@ -85,6 +89,7 @@ public class ClientSideRegionScanner extends
AbstractClientScanner {
// create an internal region scanner
this.scanner = region.getScanner(scan);
+ this.filesRead = new HashSet<>();
values = new ArrayList<>();
if (scanMetrics == null) {
@@ -129,6 +134,7 @@ public class ClientSideRegionScanner extends
AbstractClientScanner {
if (this.scanner != null) {
try {
this.scanner.close();
+ this.filesRead.addAll(this.scanner.getFilesRead());
this.scanner = null;
} catch (IOException ex) {
LOG.warn("Exception while closing scanner", ex);
@@ -162,6 +168,14 @@ public class ClientSideRegionScanner extends
AbstractClientScanner {
return region;
}
+ /**
+ * Returns the set of store file paths that were successfully read by the
underlying region
+ * scanner. Populated when this scanner is closed.
+ */
+ public Set<Path> getFilesRead() {
+ return Collections.unmodifiableSet(this.filesRead);
+ }
+
@Override
public boolean renewLease() {
throw new UnsupportedOperationException();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCell.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCell.java
index fe66535ee55..116008b7a6b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCell.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCell.java
@@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.mob;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
@@ -63,6 +66,14 @@ public class MobCell implements Closeable {
return cell;
}
+ /**
+ * Returns the set of file paths successfully read by the underlying MOB
store file scanner.
+ * Should be called after {@link #close()} to get the path of the MOB file
that was read.
+ */
+ public Set<Path> getFilesRead() {
+ return sfScanner != null ? sfScanner.getFilesRead() :
Collections.emptySet();
+ }
+
@Override
public void close() throws IOException {
if (this.sfScanner != null) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
index 5fbb680edcd..9035b645225 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
@@ -19,10 +19,14 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.List;
import java.util.PriorityQueue;
+import java.util.Set;
import java.util.function.IntConsumer;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.ExtendedCell;
@@ -66,6 +70,8 @@ public class KeyValueHeap extends
NonReversedNonLazyKeyValueScanner
protected KVScannerComparator comparator;
+ private final Set<Path> filesRead = new HashSet<>();
+
/**
* Constructor. This KeyValueHeap will handle closing of passed in
KeyValueScanners.
*/
@@ -215,19 +221,31 @@ public class KeyValueHeap extends
NonReversedNonLazyKeyValueScanner
public void close() {
for (KeyValueScanner scanner : this.scannersForDelayedClose) {
scanner.close();
+ filesRead.addAll(scanner.getFilesRead());
}
this.scannersForDelayedClose.clear();
if (this.current != null) {
this.current.close();
+ filesRead.addAll(this.current.getFilesRead());
}
if (this.heap != null) {
// Order of closing the scanners shouldn't matter here, so simply
iterate and close them.
for (KeyValueScanner scanner : heap) {
scanner.close();
+ filesRead.addAll(scanner.getFilesRead());
}
}
}
+ /**
+ * Returns the set of store file paths successfully read by the scanners in
this heap. Populated
+ * as each scanner is closed (e.g. in close() or shipped()).
+ */
+ @Override
+ public Set<Path> getFilesRead() {
+ return Collections.unmodifiableSet(filesRead);
+ }
+
/**
* Seeks all scanners at or below the specified seek key. If we earlied-out
of a row, we may end
* up skipping values that were never reached yet. Rather than iterating
down, we want to give the
@@ -418,6 +436,7 @@ public class KeyValueHeap extends
NonReversedNonLazyKeyValueScanner
public void shipped() throws IOException {
for (KeyValueScanner scanner : this.scannersForDelayedClose) {
scanner.close(); // There wont be further fetch of Cells from these
scanners. Just close.
+ filesRead.addAll(scanner.getFilesRead());
}
this.scannersForDelayedClose.clear();
if (this.current != null) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
index bfe47772f1a..564a374498a 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Set;
import java.util.function.IntConsumer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ExtendedCell;
@@ -139,6 +140,12 @@ public interface KeyValueScanner extends Shipper,
Closeable {
*/
Path getFilePath();
+ /**
+ * Returns the set of store file paths that were successfully read by this
scanner. Typically
+ * populated only after the scanner is closed.
+ */
+ Set<Path> getFilesRead();
+
// Support for "Reversed Scanner"
/**
* Seek the scanner at or before the row of specified Cell, it firstly tries
to seek the scanner
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
index 9de37c3f40c..60b0c6b5159 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
@@ -19,9 +19,13 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.NavigableSet;
import org.apache.hadoop.hbase.Cell;
+import java.util.Set;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mob.MobCell;
@@ -44,6 +48,7 @@ public class MobStoreScanner extends StoreScanner {
private boolean readEmptyValueOnMobCellMiss = false;
private final HMobStore mobStore;
private final List<MobCell> referencedMobCells;
+ private final Set<Path> mobFilesRead = new HashSet<>();
public MobStoreScanner(HStore store, ScanInfo scanInfo, Scan scan,
final NavigableSet<byte[]> columns, long readPt) throws IOException {
@@ -95,10 +100,23 @@ public class MobStoreScanner extends StoreScanner {
private void freeAllReferencedMobCells() throws IOException {
for (MobCell cell : referencedMobCells) {
cell.close();
+ mobFilesRead.addAll(cell.getFilesRead());
}
referencedMobCells.clear();
}
+ /**
+ * Returns the set of store file paths and MOB file paths successfully read
by this scanner.
+ * Combines paths from the underlying store scanner with paths from resolved
MOB cells (populated
+ * when referenced mob cells are closed, e.g. in close() or shipped()).
+ */
+ @Override
+ public Set<Path> getFilesRead() {
+ Set<Path> allFiles = new HashSet<>(super.getFilesRead());
+ allFiles.addAll(mobFilesRead);
+ return Collections.unmodifiableSet(allFiles);
+ }
+
@Override
public void shipped() throws IOException {
super.shipped();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
index f55bbcc639d..79e5cf91e33 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
import java.util.function.IntConsumer;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.fs.Path;
@@ -76,6 +78,15 @@ public abstract class NonLazyKeyValueScanner implements
KeyValueScanner {
return null;
}
+ /**
+ * Returns the set of store file paths successfully read by this scanner.
Default implementation
+ * returns an empty set for non-file scanners (e.g. memstore).
+ */
+ @Override
+ public Set<Path> getFilesRead() {
+ return Collections.emptySet();
+ }
+
@Override
public ExtendedCell getNextIndexedKey() {
return null;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
index cea136a9a05..bd191a75eac 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -110,4 +112,10 @@ public interface RegionScanner extends InternalScanner {
* @throws IOException e
*/
boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws
IOException;
+
+ /**
+ * Returns the set of store file paths that were successfully read by this
scanner. Typically
+ * populated only after the scanner is closed.
+ */
+ Set<Path> getFilesRead();
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
index 4d1332351a8..0705b454de1 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
@@ -21,11 +21,15 @@ import java.io.IOException;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
@@ -95,6 +99,8 @@ public class RegionScannerImpl implements RegionScanner,
Shipper, RpcCallback {
private RegionServerServices rsServices;
+ private final Set<Path> filesRead = new HashSet<>();
+
@Override
public RegionInfo getRegionInfo() {
return region.getRegionInfo();
@@ -759,10 +765,12 @@ public class RegionScannerImpl implements RegionScanner,
Shipper, RpcCallback {
private void closeInternal() {
if (storeHeap != null) {
storeHeap.close();
+ filesRead.addAll(storeHeap.getFilesRead());
storeHeap = null;
}
if (joinedHeap != null) {
joinedHeap.close();
+ filesRead.addAll(joinedHeap.getFilesRead());
joinedHeap = null;
}
// no need to synchronize here.
@@ -775,6 +783,15 @@ public class RegionScannerImpl implements RegionScanner,
Shipper, RpcCallback {
TraceUtil.trace(this::closeInternal, () ->
region.createRegionSpan("RegionScanner.close"));
}
+ /**
+ * Returns the set of store file paths that were successfully read by this
scanner. Populated at
+ * close from the underlying store heap and joined heap (if any).
+ */
+ @Override
+ public Set<Path> getFilesRead() {
+ return Collections.unmodifiableSet(filesRead);
+ }
+
@Override
public synchronized boolean reseek(byte[] row) throws IOException {
return TraceUtil.trace(() -> {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
index 505cd5dedce..43724d67e93 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
@@ -19,9 +19,13 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.NavigableSet;
import org.apache.hadoop.hbase.Cell;
+import java.util.Set;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mob.MobCell;
@@ -43,6 +47,7 @@ public class ReversedMobStoreScanner extends
ReversedStoreScanner {
private boolean readEmptyValueOnMobCellMiss = false;
private final HMobStore mobStore;
private final List<MobCell> referencedMobCells;
+ private final Set<Path> mobFilesRead = new HashSet<>();
ReversedMobStoreScanner(HStore store, ScanInfo scanInfo, Scan scan,
NavigableSet<byte[]> columns,
long readPt) throws IOException {
@@ -94,10 +99,23 @@ public class ReversedMobStoreScanner extends
ReversedStoreScanner {
private void freeAllReferencedMobCells() throws IOException {
for (MobCell mobCell : referencedMobCells) {
mobCell.close();
+ mobFilesRead.addAll(mobCell.getFilesRead());
}
referencedMobCells.clear();
}
+ /**
+ * Returns the set of store file paths that were successfully read by this
scanner. Includes paths
+ * from the underlying store scanner and from resolved MOB cell references;
typically populated as
+ * scanners and referenced MOB cells are closed.
+ */
+ @Override
+ public Set<Path> getFilesRead() {
+ Set<Path> allFiles = new HashSet<>(super.getFilesRead());
+ allFiles.addAll(mobFilesRead);
+ return Collections.unmodifiableSet(allFiles);
+ }
+
@Override
public void shipped() throws IOException {
super.shipped();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
index 6a053b52669..1bb1a619fde 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.Collections;
import java.util.Iterator;
+import java.util.Set;
import java.util.SortedSet;
import java.util.function.IntConsumer;
import org.apache.commons.lang3.NotImplementedException;
@@ -302,6 +304,15 @@ public class SegmentScanner implements KeyValueScanner {
return null;
}
+ /**
+ * Returns the set of store file paths that were successfully read by this
scanner. This
+ * implementation always returns an empty set (segment scanners do not track
file paths).
+ */
+ @Override
+ public Set<Path> getFilesRead() {
+ return Collections.emptySet();
+ }
+
/**
* @return the next key in the index (the key to seek to the next block) if
known, or null
* otherwise Not relevant for in-memory scanner
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index 6ce1a3236c4..acd1ddd4583 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.PriorityQueue;
+import java.util.Set;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.IntConsumer;
import org.apache.hadoop.fs.Path;
@@ -83,6 +84,9 @@ public class StoreFileScanner implements KeyValueScanner {
// Higher values means scanner has newer data.
private final long scannerOrder;
+ // The single file path when this scanner is closed (successfully read).
+ private Path fileRead;
+
/**
* Implements a {@link KeyValueScanner} on top of the specified {@link
HFileScanner}
* @param useMVCC If true, scanner will filter out
updates with MVCC larger
@@ -313,11 +317,23 @@ public class StoreFileScanner implements KeyValueScanner {
cur = null;
this.hfs.close();
if (this.reader != null) {
+ if (this.reader.getHFileReader() != null) {
+ this.fileRead = this.reader.getHFileReader().getPath();
+ }
this.reader.readCompleted();
}
closed = true;
}
+ /**
+ * Returns the set of store file paths that were successfully read by this
scanner. Contains the
+ * single store file path if this scanner successfully read it; typically
set at close.
+ */
+ @Override
+ public Set<Path> getFilesRead() {
+ return fileRead != null ? Collections.singleton(fileRead) :
Collections.emptySet();
+ }
+
/** Returns false if not found or if k is after the end. */
public static boolean seekAtOrAfter(HFileScanner s, ExtendedCell k) throws
IOException {
int result = s.seekTo(k);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 2205c83a13c..993a14407ba 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -20,13 +20,17 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.NavigableSet;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.IntConsumer;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
@@ -107,6 +111,9 @@ public class StoreScanner extends
NonReversedNonLazyKeyValueScanner
// updates and the data will be corrupt.
private final List<KeyValueScanner> scannersForDelayedClose = new
ArrayList<>();
+ // Tracks file paths successfully read (scanners closed) by this store
scanner.
+ private final Set<Path> filesRead = new HashSet<>();
+
/**
* The number of KVs seen by the scanner. Includes explicitly skipped KVs,
but not KVs skipped via
* seeking to next row/column. TODO: estimate them?
@@ -270,6 +277,7 @@ public class StoreScanner extends
NonReversedNonLazyKeyValueScanner
// key does not exist, then to the start of the next matching Row).
// Always check bloom filter to optimize the top row seek for delete
// family marker.
+
seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery &&
lazySeekEnabledGlobally,
parallelSeekEnabled);
@@ -282,7 +290,7 @@ public class StoreScanner extends
NonReversedNonLazyKeyValueScanner
// Combine all seeked scanners with a heap
resetKVHeap(scanners, comparator);
} catch (IOException e) {
- clearAndClose(scanners);
+ clearAndClose(scanners, false); // do not track files when closing due
to exception
// remove us from the HStore#changedReaderObservers here or we'll have
no chance to
// and might cause memory leak
store.deleteChangedReaderObserver(this);
@@ -466,7 +474,6 @@ public class StoreScanner extends
NonReversedNonLazyKeyValueScanner
memOnly = false;
filesOnly = false;
}
-
List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size());
// We can only exclude store files based on TTL if minVersions is set to 0.
@@ -478,6 +485,7 @@ public class StoreScanner extends
NonReversedNonLazyKeyValueScanner
boolean isFile = kvs.isFileScanner();
if ((!isFile && filesOnly) || (isFile && memOnly)) {
kvs.close();
+ filesRead.addAll(kvs.getFilesRead());
continue;
}
@@ -485,6 +493,7 @@ public class StoreScanner extends
NonReversedNonLazyKeyValueScanner
scanners.add(kvs);
} else {
kvs.close();
+ filesRead.addAll(kvs.getFilesRead());
}
}
return scanners;
@@ -527,6 +536,7 @@ public class StoreScanner extends
NonReversedNonLazyKeyValueScanner
clearAndClose(flushedstoreFileScanners);
if (this.heap != null) {
this.heap.close();
+ this.filesRead.addAll(this.heap.getFilesRead());
this.currentScanners.clear();
this.heap = null; // CLOSED!
}
@@ -973,12 +983,19 @@ public class StoreScanner extends
NonReversedNonLazyKeyValueScanner
return this.readPt;
}
- private static void clearAndClose(List<KeyValueScanner> scanners) {
+ private void clearAndClose(List<KeyValueScanner> scanners) {
+ clearAndClose(scanners, true);
+ }
+
+ private void clearAndClose(List<KeyValueScanner> scanners, boolean
trackFiles) {
if (scanners == null) {
return;
}
for (KeyValueScanner s : scanners) {
s.close();
+ if (trackFiles) {
+ this.filesRead.addAll(s.getFilesRead());
+ }
}
scanners.clear();
}
@@ -1185,7 +1202,10 @@ public class StoreScanner extends
NonReversedNonLazyKeyValueScanner
addCurrentScanners(newCurrentScanners);
this.heap = newHeap;
resetQueryMatcher(lastTop);
- scannersToClose.forEach(KeyValueScanner::close);
+ for (KeyValueScanner scanner : scannersToClose) {
+ scanner.close();
+ this.filesRead.addAll(scanner.getFilesRead());
+ }
if (hasSwitchedToStreamRead != null) {
hasSwitchedToStreamRead.set(true);
}
@@ -1267,6 +1287,15 @@ public class StoreScanner extends
NonReversedNonLazyKeyValueScanner
return this.kvsScanned;
}
+ /**
+ * Returns the set of store file paths that were successfully read by this
scanner. Populated at
+ * close from the key-value heap and any closed child scanners.
+ */
+ @Override
+ public Set<Path> getFilesRead() {
+ return Collections.unmodifiableSet(filesRead);
+ }
+
@Override
public ExtendedCell getNextIndexedKey() {
return this.heap.getNextIndexedKey();
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
index a639dc12320..62aed0f00d2 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
@@ -26,12 +26,14 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -1510,6 +1512,11 @@ public class TestBlockEvictionFromClient {
return nextRaw;
}
+ @Override
+ public Set<Path> getFilesRead() {
+ return delegate.getFilesRead();
+ }
+
@Override
public void close() throws IOException {
delegate.close();
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java
index 78041f8b972..5659c304b01 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java
@@ -30,7 +30,9 @@ import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -44,6 +46,8 @@ import
org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.IndexOnlyLruBlockCache;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -54,8 +58,10 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
@Category({ SmallTests.class, ClientTests.class })
public class TestClientSideRegionScanner {
@@ -74,6 +80,9 @@ public class TestClientSideRegionScanner {
private RegionInfo hri;
private Scan scan;
+ @Rule
+ public TestName name = new TestName();
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(1);
@@ -264,6 +273,75 @@ public class TestClientSideRegionScanner {
testScanMetricByRegion(new ScanMetrics());
}
+ @Test
+ public void testGetFilesRead() throws Exception {
+ // Create a table and add some data
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ try (Table table = TEST_UTIL.createTable(tableName, new byte[][] {
FAM_NAME })) {
+ TableDescriptor tableHtd = TEST_UTIL.getAdmin().getDescriptor(tableName);
+ RegionInfo tableHri = TEST_UTIL.getAdmin().getRegions(tableName).get(0);
+
+ // Add some data
+ for (int i = 0; i < 5; i++) {
+ byte[] row = Bytes.toBytes(i);
+ Put put = new Put(row);
+ put.addColumn(FAM_NAME, row, row);
+ table.put(put);
+ }
+
+ // Flush contents to disk so we can scan the fs
+ TEST_UTIL.getAdmin().flush(tableName);
+
+ // Create ClientSideRegionScanner with the correct table descriptor and
region info
+ Configuration copyConf = new Configuration(conf);
+ Scan tableScan = new Scan();
+ ClientSideRegionScanner clientSideRegionScanner =
+ new ClientSideRegionScanner(copyConf, fs, rootDir, tableHtd, tableHri,
tableScan, null);
+
+ // Get expected file paths from the region before closing
+ // (after closing, the region will be closed too)
+ Set<Path> expectedFilePaths = new HashSet<>();
+ HStore store = clientSideRegionScanner.getRegion().getStore(FAM_NAME);
+ for (HStoreFile storeFile : store.getStorefiles()) {
+ Path qualifiedPath = fs.makeQualified(storeFile.getPath());
+ expectedFilePaths.add(qualifiedPath);
+ }
+ int expectedFileCount = expectedFilePaths.size();
+ assertTrue("Should have at least one store file after flush",
expectedFileCount >= 1);
+
+ // Before closing, should return empty set
+ Set<Path> filesReadBeforeClose = clientSideRegionScanner.getFilesRead();
+ assertTrue("Should return empty set before closing",
filesReadBeforeClose.isEmpty());
+
+ // Scan through some results
+ Result result;
+ int count = 0;
+ while ((result = clientSideRegionScanner.next()) != null && count < 3) {
+ assertNotNull("Result should not be null", result);
+ count++;
+ }
+
+ // Still should return empty set before closing
+ filesReadBeforeClose = clientSideRegionScanner.getFilesRead();
+ assertTrue("Should return empty set before closing even after scanning",
+ filesReadBeforeClose.isEmpty());
+
+ // Close the scanner - this should collect files from the underlying
scanner
+ clientSideRegionScanner.close();
+
+ // After closing, should return files from the underlying scanner
+ Set<Path> filesReadAfterClose = clientSideRegionScanner.getFilesRead();
+ // Verify exact file count
+ assertEquals("Should have exact file count after closing",
expectedFileCount,
+ filesReadAfterClose.size());
+ // Verify exact file names match
+ assertEquals("Should contain all expected file paths", expectedFilePaths,
+ filesReadAfterClose);
+ } finally {
+ TEST_UTIL.deleteTable(tableName);
+ }
+ }
+
private static Put createPut(int rowAsInt) {
byte[] row = Bytes.toBytes(rowAsInt);
Put put = new Put(row);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
index ee858ac5e68..6421174bd13 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -111,6 +112,11 @@ public class TestCoprocessorInterface {
return delegate.nextRaw(result, context);
}
+ @Override
+ public Set<Path> getFilesRead() {
+ return delegate.getFilesRead();
+ }
+
@Override
public void close() throws IOException {
delegate.close();
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
index 5ce88048531..8f3c858d75f 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.Set;
import java.util.function.IntConsumer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ExtendedCell;
@@ -96,6 +97,11 @@ public class DelegatingKeyValueScanner implements
KeyValueScanner {
return delegate.getFilePath();
}
+ @Override
+ public Set<Path> getFilesRead() {
+ return delegate.getFilesRead();
+ }
+
@Override
public boolean backwardSeek(ExtendedCell key) throws IOException {
return delegate.backwardSeek(key);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
index a221c049783..9d86d581fb3 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
@@ -25,13 +25,16 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import javax.crypto.spec.SecretKeySpec;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ArrayBackedTag;
@@ -446,6 +449,103 @@ public class TestHMobStore {
Assert.assertEquals(Bytes.toString(value2),
Bytes.toString(CellUtil.cloneValue(resultCell3)));
}
+ @Test
+ public void testMobStoreScannerGetFilesRead() throws IOException {
+ doTestMobStoreScannerGetFilesRead(false);
+ }
+
+ @Test
+ public void testReversedMobStoreScannerGetFilesRead() throws IOException {
+ doTestMobStoreScannerGetFilesRead(true);
+ }
+
+ /**
+ * Utility method for getFilesRead tests on MOB store scanners. Uses values
above mob threshold so
+ * DefaultMobStoreFlusher creates the mob file and refs.
+ */
+ private void doTestMobStoreScannerGetFilesRead(boolean reversed) throws
IOException {
+ // Setup: conf, root dir, and MOB store init (mob threshold causes large
values to go to MOB).
+ final Configuration conf = HBaseConfiguration.create();
+ Path basedir = new Path(DIR + name.getMethodName());
+ CommonFSUtils.setRootDir(conf, basedir);
+ init(name.getMethodName(), conf, false);
+
+ // Add values above MOB threshold and flush so DefaultMobStoreFlusher
creates mob file and refs.
+ byte[] valueAboveThreshold = Bytes.toBytes("value"); // threshold in setup
is 3 bytes
+ this.store.add(new KeyValue(row, family, qf1, 1, valueAboveThreshold),
null);
+ this.store.add(new KeyValue(row, family, qf2, 1, valueAboveThreshold),
null);
+ this.store.add(new KeyValue(row2, family, qf3, 1, valueAboveThreshold),
null);
+ flush(1);
+
+ // Collect expected paths: store files (refs) plus actual MOB files under
mob family path.
+ FileSystem storeFs = store.getFileSystem();
+ Set<Path> expectedFilePaths = new HashSet<>();
+ for (HStoreFile storeFile : this.store.getStorefiles()) {
+ expectedFilePaths.add(storeFs.makeQualified(storeFile.getPath()));
+ }
+ Path mobFamilyPath =
+ MobUtils.getMobFamilyPath(conf, TableName.valueOf(table),
Bytes.toString(family));
+ if (storeFs.exists(mobFamilyPath)) {
+ FileStatus[] mobFiles = storeFs.listStatus(mobFamilyPath);
+ for (FileStatus f : mobFiles) {
+ if (!f.isDirectory()) {
+ expectedFilePaths.add(storeFs.makeQualified(f.getPath()));
+ }
+ }
+ }
+ Assert.assertTrue("Should have at least one store file and one mob file",
+ expectedFilePaths.size() >= 2);
+
+ // Build scan (optionally reversed) and target columns; get store scanner
and verify type.
+ Scan scan = new Scan();
+ if (reversed) {
+ scan.setReversed(true);
+ }
+ scan.addColumn(family, qf1);
+ scan.addColumn(family, qf2);
+ scan.addColumn(family, qf3);
+ NavigableSet<byte[]> targetCols = new
ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR);
+ targetCols.add(qf1);
+ targetCols.add(qf2);
+ targetCols.add(qf3);
+
+ KeyValueScanner kvScanner = store.getScanner(scan, targetCols, 0);
+ if (reversed) {
+ Assert.assertTrue("Store scanner should be ReversedMobStoreScanner",
+ kvScanner instanceof ReversedMobStoreScanner);
+ } else {
+ Assert.assertTrue("Store scanner should be MobStoreScanner",
+ kvScanner instanceof MobStoreScanner);
+ }
+
+ // Before close: getFilesRead must be empty; then drain scanner to resolve
MOB refs.
+ try {
+ Set<Path> filesReadBeforeClose = kvScanner.getFilesRead();
+ Assert.assertTrue("Should return empty set before closing",
filesReadBeforeClose.isEmpty());
+ Assert.assertEquals("Should have 0 files before closing", 0,
filesReadBeforeClose.size());
+
+ List<Cell> results = new ArrayList<>();
+ InternalScanner storeScanner = (InternalScanner) kvScanner;
+ while (storeScanner.next(results)) {
+ results.clear();
+ }
+
+ // Still before close: set must remain empty until scanner is closed.
+ filesReadBeforeClose = kvScanner.getFilesRead();
+ Assert.assertTrue("Should return empty set before closing even after
reading",
+ filesReadBeforeClose.isEmpty());
+ } finally {
+ kvScanner.close();
+ }
+
+ // After close: set must contain exactly the expected store + MOB file
paths.
+ Set<Path> filesReadAfterClose = kvScanner.getFilesRead();
+ Assert.assertEquals("Should have exact file count after closing",
expectedFilePaths.size(),
+ filesReadAfterClose.size());
+ Assert.assertEquals("Should contain all expected file paths",
expectedFilePaths,
+ filesReadAfterClose);
+ }
+
/**
* Flush the memstore
*/
@@ -499,7 +599,7 @@ public class TestHMobStore {
// Scan the values
Scan scan = new Scan(get);
- InternalScanner scanner = (InternalScanner) store.getScanner(scan,
+ StoreScanner scanner = (StoreScanner) store.getScanner(scan,
scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0);
List<Cell> results = new ArrayList<>();
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 68b6c4919a2..9032792ee36 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -50,6 +50,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@@ -3750,6 +3751,60 @@ public class TestHRegion {
}
}
+ @Test
+ public void testRegionScanner_getFilesRead() throws IOException {
+ // Setup: init region with one family; put two rows and flush to create
store files.
+ byte[] family = Bytes.toBytes("fam1");
+ byte[][] families = { family };
+ this.region = initHRegion(tableName, method, CONF, families);
+ Put put = new Put(Bytes.toBytes("row1"));
+ put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
+ region.put(put);
+ put = new Put(Bytes.toBytes("row2"));
+ put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
+ region.put(put);
+ region.flush(true);
+
+ // Collect expected store file paths from all stores (before opening the
scanner).
+ Set<Path> expectedFilePaths = new HashSet<>();
+ FileSystem fs = region.getFilesystem();
+ for (HStore store : region.getStores()) {
+ for (HStoreFile storeFile : store.getStorefiles()) {
+ expectedFilePaths.add(fs.makeQualified(storeFile.getPath()));
+ }
+ }
+ assertTrue("Should have at least one store file after flush",
expectedFilePaths.size() >= 1);
+
+ // Get region scanner; before close getFilesRead must be empty.
+ RegionScannerImpl scanner = region.getScanner(new Scan());
+
+ Set<Path> filesReadBeforeClose = scanner.getFilesRead();
+ assertTrue("Should return empty set before closing",
filesReadBeforeClose.isEmpty());
+
+ // Drain scanner (next up to two rows) to exercise store heap reads.
+ List<Cell> cells = new ArrayList<>();
+ int count = 0;
+ while (count < 2) {
+ if (!scanner.next(cells)) {
+ break;
+ }
+ cells.clear();
+ count++;
+ }
+
+ // Still before close: set must remain empty until scanner is closed.
+ filesReadBeforeClose = scanner.getFilesRead();
+ assertTrue("Should return empty set before closing even after scanning",
+ filesReadBeforeClose.isEmpty());
+ scanner.close();
+
+ // After close: set must contain exactly the expected store file paths.
+ Set<Path> filesReadAfterClose = scanner.getFilesRead();
+ assertEquals("Should have exact file count after closing",
expectedFilePaths.size(),
+ filesReadAfterClose.size());
+ assertEquals("Should contain all expected file paths", expectedFilePaths,
filesReadAfterClose);
+ }
+
@Test
public void testRegionScanner_Next() throws IOException {
byte[] row1 = Bytes.toBytes("row1");
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
index fea25b424e1..2ee0645f157 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
@@ -24,7 +24,10 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.ExtendedCell;
@@ -209,6 +212,57 @@ public class TestKeyValueHeap {
assertCells(expected, Arrays.asList(scan1, scan2));
}
+ @Test
+ public void testGetFilesRead() throws IOException {
+ // Create test scanners with file paths
+ Path file1 = new Path("/test/file1");
+ Path file2 = new Path("/test/file2");
+ Path file3 = new Path("/test/file3");
+
+ FileTrackingScanner scanner1 =
+ new FileTrackingScanner(Arrays.asList(kv115, kv211, kv212), file1);
+ FileTrackingScanner scanner2 = new
FileTrackingScanner(Arrays.asList(kv111, kv112), file2);
+ FileTrackingScanner scanner3 =
+ new FileTrackingScanner(Arrays.asList(kv113, kv114, kv121, kv122,
kv213), file3);
+
+ // Add a non-file-based scanner (e.g., memstore scanner) that doesn't
return files
+ TestScanner memStoreScanner = new TestScanner(Arrays.asList(kv114));
+
+ List<KeyValueScanner> scanners =
+ new ArrayList<>(Arrays.asList(scanner1, scanner2, scanner3,
memStoreScanner));
+
+ // Create KeyValueHeap and scan through all cells
+ KeyValueHeap keyValueHeap = new KeyValueHeap(scanners,
CellComparatorImpl.COMPARATOR);
+
+ // Before closing, should return empty set even after scanning
+ // Scan through all cells first
+ while (keyValueHeap.peek() != null) {
+ keyValueHeap.next();
+ }
+
+ // Verify that before closing, files are not returned
+ Set<Path> filesReadBeforeClose = keyValueHeap.getFilesRead();
+ assertTrue("Should return empty set before closing heap",
filesReadBeforeClose.isEmpty());
+ assertEquals("Should have 0 files before closing", 0,
filesReadBeforeClose.size());
+
+ // Now close the heap
+ keyValueHeap.close();
+
+ // After closing, should return all files from file-based scanners only
+ // Non-file-based scanners (like memstore) should not contribute files
+ Set<Path> filesReadAfterClose = keyValueHeap.getFilesRead();
+ assertEquals("Should return set with 3 file paths after closing (excluding
non-file scanner)",
+ 3, filesReadAfterClose.size());
+ assertTrue("Should contain file1", filesReadAfterClose.contains(file1));
+ assertTrue("Should contain file2", filesReadAfterClose.contains(file2));
+ assertTrue("Should contain file3", filesReadAfterClose.contains(file3));
+
+ // Verify that non-file-based scanner doesn't contribute any files
+ // (memStoreScanner.getFilesRead() should return empty set)
+ Set<Path> memStoreFiles = memStoreScanner.getFilesRead();
+ assertTrue("Non-file-based scanner should return empty set",
memStoreFiles.isEmpty());
+ }
+
private static class TestScanner extends CollectionBackedScanner {
private boolean closed = false;
private long scannerOrder = 0;
@@ -269,4 +323,26 @@ public class TestKeyValueHeap {
throw new IOException("enforceSeek must not be called on a " + "non-lazy
scanner");
}
}
+
+ private static class FileTrackingScanner extends TestScanner {
+ private final Path filePath;
+ private boolean closed = false;
+
+ public FileTrackingScanner(List<ExtendedCell> list, Path filePath) {
+ super(list);
+ this.filePath = filePath;
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ closed = true;
+ }
+
+ @Override
+ public Set<Path> getFilesRead() {
+ // Only return the file path after the scanner is closed
+ return closed ? Collections.singleton(filePath) : Collections.emptySet();
+ }
+ }
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScanner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScanner.java
new file mode 100644
index 00000000000..75e7beb3d72
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScanner.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
+import
org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+/**
+ * Test StoreFileScanner
+ */
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestStoreFileScanner {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestStoreFileScanner.class);
+
+ private static final HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
+ private static final String TEST_FAMILY = "cf";
+
+ @Rule
+ public TestName name = new TestName();
+
+ private Configuration conf;
+ private Path testDir;
+ private FileSystem fs;
+ private CacheConfig cacheConf;
+
+ @Before
+ public void setUp() throws IOException {
+ conf = TEST_UTIL.getConfiguration();
+ testDir = TEST_UTIL.getDataTestDir(name.getMethodName());
+ fs = testDir.getFileSystem(conf);
+ cacheConf = new CacheConfig(conf);
+ }
+
+ private void writeStoreFile(final StoreFileWriter writer) throws IOException
{
+ long now = EnvironmentEdgeManager.currentTime();
+ byte[] family = Bytes.toBytes(TEST_FAMILY);
+ byte[] qualifier = Bytes.toBytes("col");
+ for (char d = 'a'; d <= 'z'; d++) {
+ for (char e = 'a'; e <= 'z'; e++) {
+ byte[] row = new byte[] { (byte) d, (byte) e };
+ writer.append(new KeyValue(row, family, qualifier, now, row));
+ }
+ }
+ }
+
+ @Test
+ public void testGetFilesRead() throws Exception {
+ // Setup: region info, region fs, and HFile context; create store file and
write data.
+ final RegionInfo hri =
+
RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())).build();
+ HRegionFileSystem regionFs =
HRegionFileSystem.createRegionOnFileSystem(conf, fs,
+ new Path(testDir, hri.getTable().getNameAsString()), hri);
+ HFileContext hFileContext = new HFileContextBuilder().withBlockSize(8 *
1024).build();
+
+ StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs)
+
.withFilePath(regionFs.createTempName()).withFileContext(hFileContext).build();
+ writeStoreFile(writer);
+ Path hsfPath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath());
+ writer.close();
+
+ // Open HStoreFile and reader; get qualified path and create
StoreFileScanner.
+ StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false,
+ StoreContext.getBuilder()
+ .withFamilyStoreDirectoryPath(new Path(regionFs.getRegionDir(),
TEST_FAMILY))
+
.withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of(TEST_FAMILY))
+ .withRegionFileSystem(regionFs).build());
+ HStoreFile file = new HStoreFile(fs, hsfPath, conf, cacheConf,
BloomType.NONE, true, sft);
+ file.initReader();
+ StoreFileReader r = file.getReader();
+ assertNotNull(r);
+ Path qualifiedPath = fs.makeQualified(hsfPath);
+ StoreFileScanner scanner = r.getStoreFileScanner(false, false, false, 0,
0, false);
+
+ // Before close: getFilesRead must be empty.
+ Set<Path> filesRead = scanner.getFilesRead();
+ assertTrue("Should return empty set before closing scanner",
filesRead.isEmpty());
+
+ scanner.close();
+
+ // After close: set must contain the single qualified store file path.
+ filesRead = scanner.getFilesRead();
+ assertEquals("Should return set with one file path after closing", 1,
filesRead.size());
+ assertTrue("Should contain the qualified file path",
filesRead.contains(qualifiedPath));
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
index 253fcc99bd0..fd327ebd013 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.KeyValueTestUtil.create;
import static
org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixture;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@@ -31,9 +32,12 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
+import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellComparator;
@@ -48,12 +52,21 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
+import
org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -67,6 +80,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1092,4 +1106,201 @@ public class TestStoreScanner {
assertFalse(memStoreScanner.closed);
}
}
+
+ @Test
+ public void testGetFilesRead() throws Exception {
+ // Setup: test util, conf, fs, cache, region fs, and HFile context.
+ HBaseTestingUtility testUtil = new HBaseTestingUtility();
+ Configuration conf = testUtil.getConfiguration();
+ Path testDir = testUtil.getDataTestDir(name.getMethodName() +
"_directory");
+ FileSystem fs = testDir.getFileSystem(conf);
+ CacheConfig cacheConf = new CacheConfig(conf);
+ final String TEST_FAMILY = "cf";
+
+ final RegionInfo hri =
+
RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())).build();
+ HRegionFileSystem regionFs =
HRegionFileSystem.createRegionOnFileSystem(conf, fs,
+ new Path(testDir, hri.getTable().getNameAsString()), hri);
+ HFileContext hFileContext = new HFileContextBuilder().withBlockSize(8 *
1024).build();
+
+ StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false,
+ StoreContext.getBuilder()
+ .withFamilyStoreDirectoryPath(new Path(regionFs.getRegionDir(),
TEST_FAMILY))
+
.withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of(TEST_FAMILY))
+ .withRegionFileSystem(regionFs).build());
+
+ long now = EnvironmentEdgeManager.currentTime();
+ List<Path> filePaths = new ArrayList<>();
+ List<HStoreFile> storeFiles = new ArrayList<>();
+
+ // File 1: rows "row01" to "row05" - in scan key range, fresh timestamp
+ StoreFileWriter writer1 = new StoreFileWriter.Builder(conf, cacheConf, fs)
+
.withFilePath(regionFs.createTempName()).withFileContext(hFileContext).build();
+ for (int i = 1; i <= 5; i++) {
+ writer1.append(new KeyValue(Bytes.toBytes(String.format("row%02d", i)),
CF,
+ Bytes.toBytes("col"), now, Bytes.toBytes("value" + i)));
+ }
+ Path path1 = regionFs.commitStoreFile(TEST_FAMILY, writer1.getPath());
+ writer1.close();
+ filePaths.add(fs.makeQualified(path1));
+ HStoreFile file1 = new HStoreFile(fs, path1, conf, cacheConf,
BloomType.NONE, true, sft);
+ file1.initReader();
+ storeFiles.add(file1);
+
+ // File 2: rows "row06" to "row10" - in scan key range, fresh timestamp
+ StoreFileWriter writer2 = new StoreFileWriter.Builder(conf, cacheConf, fs)
+
.withFilePath(regionFs.createTempName()).withFileContext(hFileContext).build();
+ for (int i = 6; i <= 10; i++) {
+ writer2.append(new KeyValue(Bytes.toBytes(String.format("row%02d", i)),
CF,
+ Bytes.toBytes("col"), now, Bytes.toBytes("value" + i)));
+ }
+ Path path2 = regionFs.commitStoreFile(TEST_FAMILY, writer2.getPath());
+ writer2.close();
+ filePaths.add(fs.makeQualified(path2));
+ HStoreFile file2 = new HStoreFile(fs, path2, conf, cacheConf,
BloomType.NONE, true, sft);
+ file2.initReader();
+ storeFiles.add(file2);
+
+ // File 3: rows "row20" to "row25" - OUT of scan key range (after stop row)
+ StoreFileWriter writer3 = new StoreFileWriter.Builder(conf, cacheConf, fs)
+
.withFilePath(regionFs.createTempName()).withFileContext(hFileContext).build();
+ for (int i = 20; i <= 25; i++) {
+ writer3.append(new KeyValue(Bytes.toBytes(String.format("row%02d", i)),
CF,
+ Bytes.toBytes("col"), now, Bytes.toBytes("value" + i)));
+ }
+ Path path3 = regionFs.commitStoreFile(TEST_FAMILY, writer3.getPath());
+ writer3.close();
+ filePaths.add(fs.makeQualified(path3));
+ HStoreFile file3 = new HStoreFile(fs, path3, conf, cacheConf,
BloomType.NONE, true, sft);
+ file3.initReader();
+ storeFiles.add(file3);
+
+ // File 4: row "row00" - OUT of key range (before start row)
+ StoreFileWriter writer4 = new StoreFileWriter.Builder(conf, cacheConf, fs)
+
.withFilePath(regionFs.createTempName()).withFileContext(hFileContext).build();
+ writer4.append(
+ new KeyValue(Bytes.toBytes("row00"), CF, Bytes.toBytes("col"), now,
Bytes.toBytes("value0")));
+ Path path4 = regionFs.commitStoreFile(TEST_FAMILY, writer4.getPath());
+ writer4.close();
+ filePaths.add(fs.makeQualified(path4));
+ HStoreFile file4 = new HStoreFile(fs, path4, conf, cacheConf,
BloomType.NONE, true, sft);
+ file4.initReader();
+ storeFiles.add(file4);
+
+ // File 5: row "row11" with expired timestamp (1 hour ago); TTL-filtered
but still tracked.
+ long expiredTime = now - (1000 * 60 * 60);
+ StoreFileWriter writer5 = new StoreFileWriter.Builder(conf, cacheConf, fs)
+
.withFilePath(regionFs.createTempName()).withFileContext(hFileContext).build();
+ writer5.append(new KeyValue(Bytes.toBytes("row11"), CF,
Bytes.toBytes("col"), expiredTime,
+ Bytes.toBytes("expired_value")));
+ Path path5 = regionFs.commitStoreFile(TEST_FAMILY, writer5.getPath());
+ writer5.close();
+ filePaths.add(fs.makeQualified(path5));
+ HStoreFile file5 = new HStoreFile(fs, path5, conf, cacheConf,
BloomType.NONE, true, sft);
+ file5.initReader();
+ storeFiles.add(file5);
+
+ // Create StoreFileScanners from all five files.
+ List<KeyValueScanner> scanners = new ArrayList<>();
+ for (HStoreFile storeFile : storeFiles) {
+ StoreFileReader reader = storeFile.getReader();
+ StoreFileScanner scanner = reader.getStoreFileScanner(false, false,
false, 0, 0, false);
+ scanners.add(scanner);
+ }
+
+ // Scan row01–row15 with 30-minute TTL so file 5's expired cell is
filtered after read.
+ Scan scan =
+ new
Scan().withStartRow(Bytes.toBytes("row01")).withStopRow(Bytes.toBytes("row15"),
false);
+ long ttl = 30 * 60 * 1000;
+ ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, ttl,
KeepDeletedCells.FALSE,
+ HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false);
+
+ // Create StoreScanner; drain with next(), then close.
+ StoreScanner storeScanner = new StoreScanner(scan, scanInfo, null,
scanners);
+
+ List<Cell> results = new ArrayList<>();
+ while (storeScanner.next(results)) {
+ results.clear();
+ }
+ storeScanner.close();
+
+ // After close: all 5 files must be tracked (in-range, out-of-range, and
TTL-expired).
+ Set<Path> filesRead = storeScanner.getFilesRead();
+
+ assertTrue("File 1 (in range) should be tracked",
filesRead.contains(filePaths.get(0)));
+ assertTrue("File 2 (in range) should be tracked",
filesRead.contains(filePaths.get(1)));
+ assertTrue("File 3 (out of key range) should be tracked",
filesRead.contains(filePaths.get(2)));
+ assertTrue("File 4 (before start row) should be tracked",
filesRead.contains(filePaths.get(3)));
+ assertTrue("File 5 (expired TTL, filtered after read) should be tracked",
+ filesRead.contains(filePaths.get(4)));
+ assertEquals("Should have all 5 files read", 5, filesRead.size());
+ }
+
+ /**
+ * Test that when StoreScanner initialization fails after scanners are
created, files are not
+ * tracked
+ */
+ @Test
+ public void testGetFilesReadOnInitializationFailure() throws Exception {
+ HStore mockStore = Mockito.mock(HStore.class);
+ ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE,
Long.MAX_VALUE,
+ KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0,
CellComparator.getInstance(), false);
+ Scan scan = new Scan();
+ NavigableSet<byte[]> columns = null;
+ long readPt = 100L;
+
+ // Create mock scanners that will be returned by getScanners
+ KeyValueScanner mockScanner1 = Mockito.mock(StoreFileScanner.class);
+ KeyValueScanner mockScanner2 = Mockito.mock(StoreFileScanner.class);
+ Path filePath1 = new Path("/test/file1");
+ Path filePath2 = new Path("/test/file2");
+ Mockito.when(mockScanner1.isFileScanner()).thenReturn(true);
+ Mockito.when(mockScanner2.isFileScanner()).thenReturn(true);
+ Mockito.doReturn(true).when(mockScanner1).shouldUseScanner(Mockito.any(),
Mockito.any(),
+ Mockito.anyLong());
+ Mockito.doReturn(true).when(mockScanner2).shouldUseScanner(Mockito.any(),
Mockito.any(),
+ Mockito.anyLong());
+
Mockito.when(mockScanner1.getFilesRead()).thenReturn(Collections.singleton(filePath1));
+
Mockito.when(mockScanner2.getFilesRead()).thenReturn(Collections.singleton(filePath2));
+
+ List<KeyValueScanner> mockScanners = new ArrayList<>();
+ mockScanners.add(mockScanner1);
+ mockScanners.add(mockScanner2);
+
+ // Make getScanners return the mock scanners
+ Mockito.when(mockStore.getScanners(Mockito.anyBoolean(),
Mockito.anyBoolean(),
+ Mockito.anyBoolean(), Mockito.any(), Mockito.any(),
Mockito.anyBoolean(), Mockito.any(),
+ Mockito.anyBoolean(), Mockito.anyLong(),
Mockito.anyBoolean())).thenReturn(mockScanners);
+
+ Mockito.when(mockStore.getCoprocessorHost()).thenReturn(null);
+
+ // Make seek throw IOException on one scanner to simulate failure during
seekScanners
+ Mockito.doThrow(new IOException("Test seek
failure")).when(mockScanner1).seek(Mockito.any());
+
+ // Verify that IOException is thrown during construction
+ StoreScanner storeScanner = null;
+ IOException caughtException = null;
+ try {
+ storeScanner = new StoreScanner(mockStore, scanInfo, scan, columns,
readPt);
+ } catch (IOException e) {
+ caughtException = e;
+ }
+
+ // Verify that exception was thrown
+ assertNotNull("Should have thrown IOException during initialization",
caughtException);
+
+ // Verify that store methods were called (cleanup happened in catch block)
+ Mockito.verify(mockStore,
Mockito.times(1)).addChangedReaderObserver(Mockito.any());
+ Mockito.verify(mockStore,
Mockito.times(1)).deleteChangedReaderObserver(Mockito.any());
+
+ // Verify that scanners were closed (clearAndClose was called in catch
block)
+ Mockito.verify(mockScanner1, Mockito.times(1)).close();
+ Mockito.verify(mockScanner2, Mockito.times(1)).close();
+
+ // Verify that getFilesRead was NOT called on the scanners
+ // (because trackFiles=false was passed to clearAndClose, so files weren't
tracked)
+ Mockito.verify(mockScanner1, Mockito.never()).getFilesRead();
+ Mockito.verify(mockScanner2, Mockito.never()).getFilesRead();
+ }
+
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
index 5c1ac952186..c53873214d5 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
@@ -19,15 +19,21 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
@@ -257,4 +263,88 @@ public class TestSwitchToStreamRead {
public void testFilterRow() throws IOException {
testFilter(new MatchLastRowFilterRowFilter());
}
+
+ /**
+ * Verifies that when the store scanner switches from pread to stream read
successfully, all store
+ * files that were read (including those closed during the switch) are
reported by
+ * {@link StoreScanner#getFilesRead()} after close.
+ */
+ @Test
+ public void testGetFilesReadOnTrySwitchToStreamRead() throws Exception {
+ HStore store = REGION.getStore(FAMILY);
+ FileSystem fs = REGION.getFilesystem();
+
+ // Set a very small preadMaxBytes so that trySwitchToStreamRead is
triggered during scan.
+ long originalPreadMaxBytes =
+
UTIL.getConfiguration().getLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES,
2048);
+ try {
+
UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 10L);
+
+ ScanInfo scanInfo =
+ new ScanInfo(UTIL.getConfiguration(), FAMILY, 0, Integer.MAX_VALUE,
Long.MAX_VALUE,
+ org.apache.hadoop.hbase.KeepDeletedCells.FALSE,
HConstants.DEFAULT_BLOCKSIZE, 0,
+ org.apache.hadoop.hbase.CellComparator.getInstance(), false);
+ Scan scan = new Scan().setReadType(Scan.ReadType.DEFAULT);
+ long readPt =
+
REGION.getReadPoint(org.apache.hadoop.hbase.client.IsolationLevel.READ_COMMITTED);
+
+ StoreScanner storeScanner = new StoreScanner(store, scanInfo, scan,
null, readPt);
+
+ // Collect expected store file paths (qualified) for assertion after
close.
+ Set<Path> expectedFilePaths = new HashSet<>();
+ for (HStoreFile sf : store.getStorefiles()) {
+ expectedFilePaths.add(fs.makeQualified(sf.getPath()));
+ }
+ assertFalse("Should have at least one store file",
expectedFilePaths.isEmpty());
+
+ // Verify scanners start in PREAD mode before the switch.
+ for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
+ if (kvs instanceof StoreFileScanner) {
+ StoreFileScanner sfScanner = (StoreFileScanner) kvs;
+ assertSame("Scanner should start in PREAD mode", ReaderType.PREAD,
+ sfScanner.getReader().getReaderContext().getReaderType());
+ }
+ }
+
+ // Scan a few rows and call shipped() to trigger trySwitchToStreamRead.
+ List<Cell> results = new ArrayList<>();
+ ScannerContext scannerContext = ScannerContext.newBuilder().build();
+ boolean switchVerified = false;
+ while (storeScanner.next(results, scannerContext)) {
+ results.clear();
+ storeScanner.shipped();
+
+ // Check mid-scan, whether the switch happened.
+ if (!switchVerified) {
+ boolean allSwitched = true;
+ for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
+ if (kvs instanceof StoreFileScanner) {
+ StoreFileScanner sfScanner = (StoreFileScanner) kvs;
+ if (sfScanner.getReader().getReaderContext().getReaderType() ==
ReaderType.PREAD) {
+ allSwitched = false;
+ break;
+ }
+ }
+ }
+ if (allSwitched) {
+ switchVerified = true;
+ }
+ }
+ }
+ assertTrue("trySwitchToStreamRead should have been invoked and scanners
switched to stream",
+ switchVerified);
+
+ // Not closing the scanners explicitly, because those must be closed
during
+ // trySwitchToStreamRead
+
+ // After close: files that were read (including those closed during
switch) must be tracked.
+ Set<Path> filesRead = storeScanner.getFilesRead();
+ assertEquals("Should have exact file count after close",
expectedFilePaths.size(),
+ filesRead.size());
+ assertEquals("Should contain all expected store file paths",
expectedFilePaths, filesRead);
+ } finally {
+
UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES,
+ originalPreadMaxBytes);
+ }
+ }
}