nsivabalan commented on code in PR #5208:
URL: https://github.com/apache/hudi/pull/5208#discussion_r841319780
##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java:
##########
@@ -26,33 +26,41 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.util.Option;
public interface HoodieFileReader<R extends IndexedRecord> extends
AutoCloseable {
- public String[] readMinMaxRecordKeys();
+ String[] readMinMaxRecordKeys();
- public BloomFilter readBloomFilter();
+ BloomFilter readBloomFilter();
- public Set<String> filterRowKeys(Set<String> candidateRowKeys);
+ Set<String> filterRowKeys(Set<String> candidateRowKeys);
default Map<String, R> getRecordsByKeys(List<String> rowKeys) throws
IOException {
throw new UnsupportedOperationException();
}
- public Iterator<R> getRecordIterator(Schema readerSchema) throws IOException;
+ default Map<String, R> getRecordsByKeyPrefixes(List<String> keyPrefixes,
HFileScanner hFileScanner, Schema readerSchema, Option<Schema.Field>
keyFieldSchema) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ Map<String, R> getRecordsByKeyPrefixes(List<String> keyPrefixes) throws
IOException;
+
+ Iterator<R> getRecordIterator(Schema readerSchema) throws IOException;
default Iterator<R> getRecordIterator() throws IOException {
return getRecordIterator(getSchema());
}
- default Option<R> getRecordByKey(String key, Schema readerSchema) throws
IOException {
+ default Option<R> getRecordByKey(String key, Schema readerSchema,
HFileScanner hFileScanner, Option<Schema.Field> keyFieldSchema) throws
IOException {
Review Comment:
had to change these apis so that each caller uses its own HFileScanner. We
have removed the class instance HFileScanner so that concurrent readers don't
overstep each other and hence we could remove synchronized block within actual
read methods.
##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java:
##########
@@ -329,39 +410,85 @@ public R next() {
throw new HoodieIOException("unable to read next record from parquet
file ", io);
}
}
+
+ @Override
+ public void close() {
+ hFileScanner.close();
+ }
};
}
- private boolean isKeyAvailable(String key) throws IOException {
+ private boolean isKeyAvailable(String key, HFileScanner keyScanner) throws
IOException {
final KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
+ return keyScanner.seekTo(kv) == 0;
+ }
+
+ @Override
+ public Map<String, R> getRecordsByKeyPrefixes(List<String> keyPrefixes)
throws IOException {
+ Schema readerSchema = getSchema();
+ Option<Schema.Field> keyFieldSchema =
Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME));
synchronized (this) {
- if (keyScanner == null) {
- keyScanner = reader.getScanner(false, false);
- }
- if (keyScanner.seekTo(kv) == 0) {
- return true;
+ HFileScanner hFileScanner = reader.getScanner(false, false);
+ hFileScanner.seekTo(); // seek to beginning of file.
+ return getRecordsByKeyPrefixes(keyPrefixes, hFileScanner, readerSchema,
keyFieldSchema);
+ }
+ }
+
+ @Override
+ public Map<String, R> getRecordsByKeyPrefixes(List<String> keyPrefixes,
HFileScanner hFileScanner, Schema readerSchema, Option<Schema.Field>
keyFieldSchema) throws IOException {
+ // NOTE: It's always beneficial to sort keys being sought to by HFile
reader
+ // to avoid seeking back and forth
+ Collections.sort(keyPrefixes);
+ List<Pair<byte[], byte[]>> keyRecordsBytes = new
ArrayList<>(keyPrefixes.size());
+ for (String keyPrefix : keyPrefixes) {
+ KeyValue kv = new KeyValue(keyPrefix.getBytes(), null, null, null);
+ int val = hFileScanner.seekTo(kv);
+ // what does seekTo() does:
+ // eg entries in file. [key01, key02, key03, key04,..., key20]
+ // when keyPrefix is "key", seekTo will return -1 and place the cursor
just before key01. getCel() will return key01 entry
+ // when keyPrefix is ""key03", seekTo will return 0 and place the cursor
just before key01. getCell() will return key03 entry
+ // when keyPrefix is ""key1", seekTo will return 1 and place the cursor
just before key10(i.e. key09). call next() and then call getCell() to see key10
entry
+ // when keyPrefix is "key99", seekTo will return 1 and place the cursor
just before last entry, ie. key04. getCell() will return key04 entry.
+
+ if (val == 1) { // move to next entry if return value is 1
+ if (!hFileScanner.next()) {
+ // we have reached the end of file. we can skip proceeding further
+ break;
+ }
}
+ do {
+ Cell c = hFileScanner.getCell();
+ byte[] keyBytes = Arrays.copyOfRange(c.getRowArray(),
c.getRowOffset(), c.getRowOffset() + c.getRowLength());
+ String key = new String(keyBytes);
+ // Check whether we're still reading records corresponding to the
key-prefix
+ if (!key.startsWith(keyPrefix)) {
+ break;
+ }
+
+ // Extract the byte value before releasing the lock since we cannot
hold on to the returned cell afterwards
+ byte[] valueBytes = Arrays.copyOfRange(c.getValueArray(),
c.getValueOffset(), c.getValueOffset() + c.getValueLength());
+ keyRecordsBytes.add(Pair.newPair(keyBytes, valueBytes));
+ } while (hFileScanner.next());
}
- return false;
+
+ // Use tree map so that entries are in sorted in the map being returned.
+ Map<String, R> values = new TreeMap<String, R>();
+ for (Pair<byte[], byte[]> kv : keyRecordsBytes) {
+ R record = deserialize(kv.getFirst(), kv.getSecond(), readerSchema,
readerSchema, keyFieldSchema);
+ values.put(new String(kv.getFirst()), record);
+ }
+ return values;
}
@Override
- public Option getRecordByKey(String key, Schema readerSchema) throws
IOException {
+ public Option<R> getRecordByKey(String key, Schema readerSchema,
HFileScanner hFileScanner, Option<Schema.Field> keyFieldSchema) throws
IOException {
byte[] value = null;
- final Option<Schema.Field> keyFieldSchema =
Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME));
- ValidationUtils.checkState(keyFieldSchema != null);
KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
- synchronized (this) {
- if (keyScanner == null) {
Review Comment:
removed the class instance HFileScanner so that concurrent readers can use
their own HfileScanner. and hence no synchronization required.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -239,6 +322,43 @@ private void initIfNeeded() {
return result;
}
+ private List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
readFromBaseAndMergeWithLogRecordsForKeyPrefixes(HoodieFileReader
baseFileReader,
Review Comment:
@alexeykudinkin : I tried to unify this and the other method. but there are
quite a few places where I had to do if else branching. so, have left it as is.
If you dont mind, can you take a stab at unifying. I have addressed every other
feedback we discussed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]