This is an automated email from the ASF dual-hosted git repository.
pankajkumar pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new 788c063c50e HBASE-25839 Bulk Import fails with java.io.IOException:
Type mismatch in value from map (#6602)
788c063c50e is described below
commit 788c063c50eddcacd0ae7267e9dc6eb41b809a41
Author: Sreenivasulu <[email protected]>
AuthorDate: Wed Mar 12 23:58:07 2025 +0530
HBASE-25839 Bulk Import fails with java.io.IOException: Type mismatch in
value from map (#6602)
Signed-off-by: Pankaj Kumar<[email protected]>
(cherry picked from commit 7c21e713432916fbb45d1a2180d6cbbedc072ca3)
---
.../org/apache/hadoop/hbase/mapreduce/Import.java | 10 ++++--
.../hadoop/hbase/mapreduce/TestImportExport.java | 41 ++++++++++++++++++++++
2 files changed, 48 insertions(+), 3 deletions(-)
diff --git
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
index ee09a7dc397..4e15f8e117d 100644
---
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
+++
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
@@ -132,8 +132,11 @@ public class Import extends Configured implements Tool {
@Override
public void write(DataOutput out) throws IOException {
- out.writeInt(PrivateCellUtil.estimatedSerializedSizeOfKey(kv));
- out.writeInt(0);
+ int keyLen = PrivateCellUtil.estimatedSerializedSizeOfKey(kv);
+ int valueLen = 0; // We avoid writing value here. So just serialize as
if an empty value.
+ out.writeInt(keyLen + valueLen + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE);
+ out.writeInt(keyLen);
+ out.writeInt(valueLen);
PrivateCellUtil.writeFlatKey(kv, out);
}
@@ -212,7 +215,8 @@ public class Import extends Configured implements Tool {
continue;
}
Cell ret = convertKv(kv, cfRenameMap);
- context.write(new CellWritableComparable(ret), ret);
+ context.write(new CellWritableComparable(ret),
+ new MapReduceExtendedCell((ExtendedCell) ret));
}
}
} catch (InterruptedException e) {
diff --git
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
index 3830a23b7ec..d4ccac90143 100644
---
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
+++
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
@@ -507,6 +507,47 @@ public class TestImportExport {
importTable.close();
}
+ /**
+ * Create a simple table, run an Export Job on it, Import with bulk output
and enable largeResult
+ */
+ @Test
+ public void testBulkImportAndLargeResult() throws Throwable {
+ // Create simple table to export
+ TableDescriptor desc = TableDescriptorBuilder
+ .newBuilder(TableName.valueOf(name.getMethodName()))
+
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build())
+ .build();
+ UTIL.getAdmin().createTable(desc);
+ Table exportTable = UTIL.getConnection().getTable(desc.getTableName());
+
+ Put p1 = new Put(ROW1);
+ p1.addColumn(FAMILYA, QUAL, now, QUAL);
+
+ // Having another row would actually test the filter.
+ Put p2 = new Put(ROW2);
+ p2.addColumn(FAMILYA, QUAL, now, QUAL);
+
+ exportTable.put(Arrays.asList(p1, p2));
+
+ // Export the simple table
+ String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000"
};
+ assertTrue(runExport(args));
+
+ // Import to a new table
+ final String IMPORT_TABLE = name.getMethodName() + "import";
+ desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(IMPORT_TABLE))
+
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build())
+ .build();
+ UTIL.getAdmin().createTable(desc);
+
+ String O_OUTPUT_DIR =
+ new Path(OUTPUT_DIR +
1).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
+
+ args = new String[] { "-D" + Import.BULK_OUTPUT_CONF_KEY + "=" +
O_OUTPUT_DIR,
+ "-D" + Import.HAS_LARGE_RESULT + "=" + true, IMPORT_TABLE,
FQ_OUTPUT_DIR, "1000" };
+ assertTrue(runImport(args));
+ }
+
/**
* Count the number of keyvalues in the specified table with the given filter
* @param table the table to scan