This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4074c5e Fixed HUDI-116 : Handle duplicate record keys across
partitions
4074c5e is described below
commit 4074c5eb234f643ed0d79efff090138b50ad99ea
Author: Vinoth Chandar <[email protected]>
AuthorDate: Tue May 21 18:59:10 2019 -0700
Fixed HUDI-116 : Handle duplicate record keys across partitions
- Join based on HoodieKey and not RecordKey during tagging
- Unit tests changed to run with duplicate keys
- Special casing GlobalBloom to still join by recordkey
---
.../uber/hoodie/index/bloom/HoodieBloomIndex.java | 74 +++++++++++-----------
.../index/bloom/HoodieBloomIndexCheckFunction.java | 4 +-
.../hoodie/index/bloom/HoodieGlobalBloomIndex.java | 23 ++++++-
.../uber/hoodie/index/bloom/KeyLookupResult.java | 8 ++-
.../hoodie/index/bloom/TestHoodieBloomIndex.java | 63 ++++--------------
5 files changed, 82 insertions(+), 90 deletions(-)
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java
index 8dc15c1..d8b08f7 100644
---
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java
+++
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java
@@ -44,7 +44,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -86,25 +85,25 @@ public class HoodieBloomIndex<T extends
HoodieRecordPayload> extends HoodieIndex
.mapToPair(record -> new Tuple2<>(record.getPartitionPath(),
record.getRecordKey()));
// Lookup indexes for all the partition/recordkey pair
- JavaPairRDD<String, String> rowKeyFilenamePairRDD =
lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
+ JavaPairRDD<HoodieKey, String> keyFilenamePairRDD =
lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
// Cache the result, for subsequent stages.
if (config.getBloomIndexUseCaching()) {
- rowKeyFilenamePairRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
+ keyFilenamePairRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
}
if (logger.isDebugEnabled()) {
- long totalTaggedRecords = rowKeyFilenamePairRDD.count();
+ long totalTaggedRecords = keyFilenamePairRDD.count();
logger.debug("Number of update records (ones tagged with a fileID): " +
totalTaggedRecords);
}
// Step 4: Tag the incoming records, as inserts or updates, by joining
with existing record keys
// Cost: 4 sec.
- JavaRDD<HoodieRecord<T>> taggedRecordRDD =
tagLocationBacktoRecords(rowKeyFilenamePairRDD,
+ JavaRDD<HoodieRecord<T>> taggedRecordRDD =
tagLocationBacktoRecords(keyFilenamePairRDD,
recordRDD);
if (config.getBloomIndexUseCaching()) {
recordRDD.unpersist(); // unpersist the input Record RDD
- rowKeyFilenamePairRDD.unpersist();
+ keyFilenamePairRDD.unpersist();
}
return taggedRecordRDD;
@@ -116,23 +115,21 @@ public class HoodieBloomIndex<T extends
HoodieRecordPayload> extends HoodieIndex
.mapToPair(key -> new Tuple2<>(key.getPartitionPath(),
key.getRecordKey()));
// Lookup indexes for all the partition/recordkey pair
- JavaPairRDD<String, String> rowKeyFilenamePairRDD =
lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
-
- JavaPairRDD<String, HoodieKey> rowKeyHoodieKeyPairRDD = hoodieKeys
- .mapToPair(key -> new Tuple2<>(key.getRecordKey(), key));
+ JavaPairRDD<HoodieKey, String> keyFilenamePairRDD =
lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
+ JavaPairRDD<HoodieKey, String> keyHoodieKeyPairRDD =
hoodieKeys.mapToPair(key -> new Tuple2<>(key, null));
- return
rowKeyHoodieKeyPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).mapToPair(keyPathTuple
-> {
+ return
keyHoodieKeyPairRDD.leftOuterJoin(keyFilenamePairRDD).mapToPair(keyPathTuple ->
{
Optional<String> recordLocationPath;
if (keyPathTuple._2._2.isPresent()) {
String fileName = keyPathTuple._2._2.get();
- String partitionPath = keyPathTuple._2._1.getPartitionPath();
+ String partitionPath = keyPathTuple._1.getPartitionPath();
recordLocationPath = Optional
.of(new Path(new Path(hoodieTable.getMetaClient().getBasePath(),
partitionPath), fileName)
.toUri().getPath());
} else {
recordLocationPath = Optional.absent();
}
- return new Tuple2<>(keyPathTuple._2._1, recordLocationPath);
+ return new Tuple2<>(keyPathTuple._1, recordLocationPath);
});
}
@@ -140,7 +137,7 @@ public class HoodieBloomIndex<T extends
HoodieRecordPayload> extends HoodieIndex
* Lookup the location for each record key and return the
pair<record_key,location> for all record keys already
* present and drop the record keys if not present
*/
- private JavaPairRDD<String, String> lookupIndex(
+ private JavaPairRDD<HoodieKey, String> lookupIndex(
JavaPairRDD<String, String> partitionRecordKeyPairRDD, final
JavaSparkContext
jsc, final HoodieTable hoodieTable) {
// Obtain records per partition, in the incoming records
@@ -327,7 +324,7 @@ public class HoodieBloomIndex<T extends
HoodieRecordPayload> extends HoodieIndex
* parallelism for tagging location
*/
@VisibleForTesting
- JavaPairRDD<String, String> findMatchingFilesForRecordKeys(
+ JavaPairRDD<HoodieKey, String> findMatchingFilesForRecordKeys(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD, int
shuffleParallelism, HoodieTableMetaClient metaClient,
Map<String, Long> fileGroupToComparisons) {
@@ -354,38 +351,41 @@ public class HoodieBloomIndex<T extends
HoodieRecordPayload> extends HoodieIndex
.flatMap(List::iterator)
.filter(lr -> lr.getMatchingRecordKeys().size() > 0)
.flatMapToPair(lookupResult ->
lookupResult.getMatchingRecordKeys().stream()
- .map(recordKey -> new Tuple2<>(recordKey,
lookupResult.getFileName()))
+ .map(recordKey -> new Tuple2<>(new HoodieKey(recordKey,
lookupResult.getPartitionPath()),
+ lookupResult.getFileName()))
.collect(Collectors.toList())
.iterator());
}
+ HoodieRecord<T> getTaggedRecord(HoodieRecord<T> inputRecord,
org.apache.spark.api.java.Optional<String> location) {
+ HoodieRecord<T> record = inputRecord;
+ if (location.isPresent()) {
+ // When you have a record in multiple files in the same partition, then
rowKeyRecordPairRDD
+ // will have 2 entries with the same exact in memory copy of the
HoodieRecord and the 2
+ // separate filenames that the record is found in. This will result in
setting
+ // currentLocation 2 times and it will fail the second time. So creating
a new in memory
+ // copy of the hoodie record.
+ record = new HoodieRecord<>(inputRecord);
+ String filename = location.get();
+ if (filename != null && !filename.isEmpty()) {
+ record.setCurrentLocation(new
HoodieRecordLocation(FSUtils.getCommitTime(filename),
+ FSUtils.getFileId(filename)));
+ }
+ }
+ return record;
+ }
+
/**
* Tag the <rowKey, filename> back to the original HoodieRecord RDD.
*/
- private JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
- JavaPairRDD<String, String> rowKeyFilenamePairRDD,
JavaRDD<HoodieRecord<T>> recordRDD) {
- JavaPairRDD<String, HoodieRecord<T>> rowKeyRecordPairRDD = recordRDD
- .mapToPair(record -> new Tuple2<>(record.getRecordKey(), record));
+ protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
+ JavaPairRDD<HoodieKey, String> keyFilenamePairRDD,
JavaRDD<HoodieRecord<T>> recordRDD) {
+ JavaPairRDD<HoodieKey, HoodieRecord<T>> keyRecordPairRDD = recordRDD
+ .mapToPair(record -> new Tuple2<>(record.getKey(), record));
// Here as the recordRDD might have more data than rowKeyRDD (some
rowKeys' fileId is null),
// so we do left outer join.
- return
rowKeyRecordPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).values().map(v1 -> {
- HoodieRecord<T> record = v1._1();
- if (v1._2().isPresent()) {
- // When you have a record in multiple files in the same partition,
then rowKeyRecordPairRDD
- // will have 2 entries with the same exact in memory copy of the
HoodieRecord and the 2
- // separate filenames that the record is found in. This will result in
setting
- // currentLocation 2 times and it will fail the second time. So
creating a new in memory
- // copy of the hoodie record.
- record = new HoodieRecord<>(v1._1());
- String filename = v1._2().get();
- if (filename != null && !filename.isEmpty()) {
- record.setCurrentLocation(new
HoodieRecordLocation(FSUtils.getCommitTime(filename),
- FSUtils.getFileId(filename)));
- }
- }
- return record;
- });
+ return keyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD).values().map(v1
-> getTaggedRecord(v1._1, v1._2));
}
@Override
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java
index 4595658..818e66b 100644
---
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java
+++
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java
@@ -177,7 +177,7 @@ public class HoodieBloomIndexCheckFunction implements
checkAndAddCandidates(recordKey);
} else {
// do the actual checking of file & break out
- ret.add(new KeyLookupResult(currentFile,
checkAgainstCurrentFile()));
+ ret.add(new KeyLookupResult(currentFile, currentPartitionPath,
checkAgainstCurrentFile()));
initState(fileName, partitionPath);
checkAndAddCandidates(recordKey);
break;
@@ -186,7 +186,7 @@ public class HoodieBloomIndexCheckFunction implements
// handle case, where we ran out of input, close pending work, update
return val
if (!inputItr.hasNext()) {
- ret.add(new KeyLookupResult(currentFile, checkAgainstCurrentFile()));
+ ret.add(new KeyLookupResult(currentFile, currentPartitionPath,
checkAgainstCurrentFile()));
}
} catch (Throwable e) {
if (e instanceof HoodieException) {
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java
index 7bf85cb..bb09b0e 100644
---
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java
+++
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java
@@ -20,6 +20,7 @@ package com.uber.hoodie.index.bloom;
import com.google.common.annotations.VisibleForTesting;
import com.uber.hoodie.common.model.HoodieKey;
+import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.FSUtils;
@@ -32,7 +33,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
-
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -99,4 +99,25 @@ public class HoodieGlobalBloomIndex<T extends
HoodieRecordPayload> extends Hoodi
.collect(Collectors.toList());
}).flatMap(List::iterator);
}
+
+
+ /**
+ * Tagging for global index should only consider the record key
+ */
+ @Override
+ protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
+ JavaPairRDD<HoodieKey, String> keyFilenamePairRDD,
JavaRDD<HoodieRecord<T>> recordRDD) {
+ JavaPairRDD<String, HoodieRecord<T>> rowKeyRecordPairRDD = recordRDD
+ .mapToPair(record -> new Tuple2<>(record.getRecordKey(), record));
+
+ // Here as the recordRDD might have more data than rowKeyRDD (some
rowKeys' fileId is null),
+ // so we do left outer join.
+ return rowKeyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD.mapToPair(p ->
new Tuple2<>(p._1.getRecordKey(), p._2)))
+ .values().map(value -> getTaggedRecord(value._1, value._2));
+ }
+
+ @Override
+ public boolean isGlobal() {
+ return true;
+ }
}
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/KeyLookupResult.java
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/KeyLookupResult.java
index e713c8b..6fc2703 100644
---
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/KeyLookupResult.java
+++
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/KeyLookupResult.java
@@ -27,9 +27,11 @@ public class KeyLookupResult {
private final String fileName;
private final List<String> matchingRecordKeys;
+ private final String partitionPath;
- public KeyLookupResult(String fileName, List<String> matchingRecordKeys) {
+ public KeyLookupResult(String fileName, String partitionPath, List<String>
matchingRecordKeys) {
this.fileName = fileName;
+ this.partitionPath = partitionPath;
this.matchingRecordKeys = matchingRecordKeys;
}
@@ -40,4 +42,8 @@ public class KeyLookupResult {
public List<String> getMatchingRecordKeys() {
return matchingRecordKeys;
}
+
+ public String getPartitionPath() {
+ return partitionPath;
+ }
}
diff --git
a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java
b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java
index 277c9a3..6be5329 100644
---
a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java
+++
b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java
@@ -128,43 +128,6 @@ public class TestHoodieBloomIndex {
}
@Test
- public void testLoadUUIDsInMemory() throws IOException {
- // Create one RDD of hoodie record
- String recordStr1 =
"{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
- + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
- String recordStr2 =
"{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
- + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
- String recordStr3 =
"{\"_row_key\":\"3eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
- + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
- String recordStr4 =
"{\"_row_key\":\"4eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
- + "\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}";
-
- TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1);
- HoodieRecord record1 = new HoodieRecord(new
HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()),
- rowChange1);
- TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2);
- HoodieRecord record2 = new HoodieRecord(new
HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()),
- rowChange2);
- TestRawTripPayload rowChange3 = new TestRawTripPayload(recordStr3);
- HoodieRecord record3 = new HoodieRecord(new
HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()),
- rowChange3);
- TestRawTripPayload rowChange4 = new TestRawTripPayload(recordStr4);
- HoodieRecord record4 = new HoodieRecord(new
HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()),
- rowChange4);
-
- JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1,
record2, record3, record4));
-
- // Load to memory
- Map<String, Iterable<String>> map = recordRDD.mapToPair(
- record -> new Tuple2<>(record.getPartitionPath(),
record.getRecordKey())).groupByKey().collectAsMap();
- assertEquals(map.size(), 2);
- List<String> list1 = Lists.newArrayList(map.get("2016/01/31"));
- List<String> list2 = Lists.newArrayList(map.get("2015/01/31"));
- assertEquals(list1.size(), 3);
- assertEquals(list2.size(), 1);
- }
-
- @Test
public void testLoadInvolvedFiles() throws IOException {
HoodieWriteConfig config = makeConfig();
HoodieBloomIndex index = new HoodieBloomIndex(config);
@@ -354,14 +317,14 @@ public class TestHoodieBloomIndex {
String rowKey1 = UUID.randomUUID().toString();
String rowKey2 = UUID.randomUUID().toString();
String rowKey3 = UUID.randomUUID().toString();
- String rowKey4 = UUID.randomUUID().toString();
String recordStr1 = "{\"_row_key\":\"" + rowKey1 + "\","
+ "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
String recordStr2 = "{\"_row_key\":\"" + rowKey2 + "\","
+ "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
String recordStr3 = "{\"_row_key\":\"" + rowKey3 + "\","
+ "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
- String recordStr4 = "{\"_row_key\":\"" + rowKey4 + "\","
+ // place same row key under a different partition.
+ String recordStr4 = "{\"_row_key\":\"" + rowKey1 + "\","
+ "\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}";
TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1);
HoodieRecord record1 = new HoodieRecord(new
HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()),
@@ -408,13 +371,15 @@ public class TestHoodieBloomIndex {
// Check results
for (HoodieRecord record : taggedRecordRDD.collect()) {
if (record.getRecordKey().equals(rowKey1)) {
-
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename1)));
+ if (record.getPartitionPath().equals("2015/01/31")) {
+
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3)));
+ } else {
+
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename1)));
+ }
} else if (record.getRecordKey().equals(rowKey2)) {
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename2)));
} else if (record.getRecordKey().equals(rowKey3)) {
assertTrue(!record.isCurrentLocationKnown());
- } else if (record.getRecordKey().equals(rowKey4)) {
-
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3)));
}
}
}
@@ -429,7 +394,8 @@ public class TestHoodieBloomIndex {
+ "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
String recordStr3 =
"{\"_row_key\":\"3eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
+ "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
- String recordStr4 =
"{\"_row_key\":\"4eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
+ // record key same as recordStr2
+ String recordStr4 =
"{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
+ "\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}";
TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1);
HoodieKey key1 = new HoodieKey(rowChange1.getRowKey(),
rowChange1.getPartitionPath());
@@ -439,7 +405,6 @@ public class TestHoodieBloomIndex {
HoodieRecord record2 = new HoodieRecord(key2, rowChange2);
TestRawTripPayload rowChange3 = new TestRawTripPayload(recordStr3);
HoodieKey key3 = new HoodieKey(rowChange3.getRowKey(),
rowChange3.getPartitionPath());
- HoodieRecord record3 = new HoodieRecord(key3, rowChange3);
TestRawTripPayload rowChange4 = new TestRawTripPayload(recordStr4);
HoodieKey key4 = new HoodieKey(rowChange4.getRowKey(),
rowChange4.getPartitionPath());
HoodieRecord record4 = new HoodieRecord(key4, rowChange4);
@@ -481,13 +446,13 @@ public class TestHoodieBloomIndex {
} else if
(record._1.getRecordKey().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")) {
assertTrue(record._2.isPresent());
Path path2 = new Path(record._2.get());
- assertEquals(FSUtils.getFileId(filename2),
FSUtils.getFileId(path2.getName()));
+ if (record._1.getPartitionPath().equals("2015/01/31")) {
+ assertEquals(FSUtils.getFileId(filename3),
FSUtils.getFileId(path2.getName()));
+ } else {
+ assertEquals(FSUtils.getFileId(filename2),
FSUtils.getFileId(path2.getName()));
+ }
} else if
(record._1.getRecordKey().equals("3eb5b87c-1fej-4edd-87b4-6ec96dc405a0")) {
assertTrue(!record._2.isPresent());
- } else if
(record._1.getRecordKey().equals("4eb5b87c-1fej-4edd-87b4-6ec96dc405a0")) {
- assertTrue(record._2.isPresent());
- Path path3 = new Path(record._2.get());
- assertEquals(FSUtils.getFileId(filename3),
FSUtils.getFileId(path3.getName()));
}
}
}