This is an automated email from the ASF dual-hosted git repository. rmattingly pushed a commit to branch HBASE-28440-branch-2.6 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit aaf7ece9ed331463ab82f32d40288ab62f395108 Author: Hernan Romer <[email protected]> AuthorDate: Wed Sep 24 14:53:32 2025 -0400 HBASE-28440 Add support for using mapreduce sort in HFileOutputFormat2 (#7295) Co-authored-by: Hernan Gelaf-Romer <[email protected]> Signed-off-by: Ray Mattingly <[email protected]> --- .../backup/impl/IncrementalTableBackupClient.java | 12 ++ .../mapreduce/MapReduceHFileSplitterJob.java | 36 ++++- .../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 32 +++- .../org/apache/hadoop/hbase/mapreduce/Import.java | 4 + .../hbase/mapreduce/KeyOnlyCellComparable.java | 94 ++++++++++++ .../hbase/mapreduce/PreSortedCellsReducer.java | 46 ++++++ .../apache/hadoop/hbase/mapreduce/WALPlayer.java | 37 ++++- .../hbase/mapreduce/TestCellBasedWALPlayer2.java | 3 +- .../hadoop/hbase/mapreduce/TestWALPlayer.java | 167 +++++++++++++++------ 9 files changed, 373 insertions(+), 58 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index d51f1f47151..ae32a8dbeb5 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -335,6 +335,7 @@ public class IncrementalTableBackupClient extends TableBackupClient { } protected void incrementalCopyHFiles(String[] files, String backupDest) throws IOException { + boolean diskBasedSortingOriginalValue = HFileOutputFormat2.diskBasedSortingEnabled(conf); try { LOG.debug("Incremental copy HFiles is starting. dest=" + backupDest); // set overall backup phase: incremental_copy @@ -349,6 +350,7 @@ public class IncrementalTableBackupClient extends TableBackupClient { LOG.debug("Setting incremental copy HFiles job name to : " + jobname); } conf.set(JOB_NAME_CONF_KEY, jobname); + conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true); BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf); int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr); @@ -361,6 +363,8 @@ public class IncrementalTableBackupClient extends TableBackupClient { + " finished."); } finally { deleteBulkLoadDirectory(); + conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, + diskBasedSortingOriginalValue); } } @@ -415,6 +419,9 @@ public class IncrementalTableBackupClient extends TableBackupClient { conf.setBoolean(HFileOutputFormat2.TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, true); conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); conf.set(JOB_NAME_CONF_KEY, jobname); + + boolean diskBasedSortingEnabledOriginalValue = HFileOutputFormat2.diskBasedSortingEnabled(conf); + conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true); String[] playerArgs = { dirs, StringUtils.join(tableList, ",") }; try { @@ -430,6 +437,11 @@ public class IncrementalTableBackupClient extends TableBackupClient { } catch (Exception ee) { throw new IOException("Can not convert from directory " + dirs + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee); + } finally { + conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, + diskBasedSortingEnabledOriginalValue); + conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY); + conf.unset(JOB_NAME_CONF_KEY); } } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java index 28db0c605f7..4f3a7f925c6 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; @@ -33,11 +34,14 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.CellSortReducer; import org.apache.hadoop.hbase.mapreduce.HFileInputFormat; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; +import org.apache.hadoop.hbase.mapreduce.KeyOnlyCellComparable; +import org.apache.hadoop.hbase.mapreduce.PreSortedCellsReducer; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.MapReduceExtendedCell; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -71,18 +75,28 @@ public class MapReduceHFileSplitterJob extends Configured implements Tool { /** * A mapper that just writes out cells. This one can be used together with {@link CellSortReducer} */ - static class HFileCellMapper extends Mapper<NullWritable, Cell, ImmutableBytesWritable, Cell> { + static class HFileCellMapper extends Mapper<NullWritable, Cell, WritableComparable<?>, Cell> { + + private boolean diskBasedSortingEnabled = false; @Override public void map(NullWritable key, Cell value, Context context) throws IOException, InterruptedException { - context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), - new MapReduceExtendedCell(value)); + ExtendedCell extendedCell = (ExtendedCell) value; + context.write(wrap(extendedCell), new MapReduceExtendedCell(extendedCell)); } @Override public void setup(Context context) throws IOException { - // do nothing + diskBasedSortingEnabled = + HFileOutputFormat2.diskBasedSortingEnabled(context.getConfiguration()); + } + + private WritableComparable<?> wrap(ExtendedCell cell) { + if (diskBasedSortingEnabled) { + return new KeyOnlyCellComparable(cell); + } + return new ImmutableBytesWritable(CellUtil.cloneRow(cell)); } } @@ -106,13 +120,23 @@ public class MapReduceHFileSplitterJob extends Configured implements Tool { true); job.setJarByClass(MapReduceHFileSplitterJob.class); job.setInputFormatClass(HFileInputFormat.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); + boolean diskBasedSortingEnabled = HFileOutputFormat2.diskBasedSortingEnabled(conf); + if (diskBasedSortingEnabled) { + job.setMapOutputKeyClass(KeyOnlyCellComparable.class); + job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class); + } else { + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + } if (hfileOutPath != null) { LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs); TableName tableName = TableName.valueOf(tabName); job.setMapperClass(HFileCellMapper.class); - job.setReducerClass(CellSortReducer.class); + if (diskBasedSortingEnabled) { + job.setReducerClass(PreSortedCellsReducer.class); + } else { + job.setReducerClass(CellSortReducer.class); + } Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputValueClass(MapReduceExtendedCell.class); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index cb2c6260171..2906238edc7 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; @@ -83,6 +84,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; @@ -194,6 +196,11 @@ public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, "hbase.mapreduce.hfileoutputformat.extendedcell.enabled"; static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false; + @InterfaceAudience.Private + public static final String DISK_BASED_SORTING_ENABLED_KEY = + "hbase.mapreduce.hfileoutputformat.disk.based.sorting.enabled"; + private static final boolean DISK_BASED_SORTING_ENABLED_DEFAULT = false; + public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster."; public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY = REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum"; @@ -579,12 +586,19 @@ public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, // Write the actual file FileSystem fs = partitionsPath.getFileSystem(conf); - SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, partitionsPath, - ImmutableBytesWritable.class, NullWritable.class); + boolean diskBasedSortingEnabled = diskBasedSortingEnabled(conf); + Class<? extends Writable> keyClass = + diskBasedSortingEnabled ? KeyOnlyCellComparable.class : ImmutableBytesWritable.class; + SequenceFile.Writer writer = + SequenceFile.createWriter(fs, conf, partitionsPath, keyClass, NullWritable.class); try { for (ImmutableBytesWritable startKey : sorted) { - writer.append(startKey, NullWritable.get()); + Writable writable = diskBasedSortingEnabled + ? new KeyOnlyCellComparable(KeyValueUtil.createFirstOnRow(startKey.get())) + : startKey; + + writer.append(writable, NullWritable.get()); } } finally { writer.close(); @@ -631,6 +645,10 @@ public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class); } + public static boolean diskBasedSortingEnabled(Configuration conf) { + return conf.getBoolean(DISK_BASED_SORTING_ENABLED_KEY, DISK_BASED_SORTING_ENABLED_DEFAULT); + } + static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, Class<? extends OutputFormat<?, ?>> cls) throws IOException { Configuration conf = job.getConfiguration(); @@ -652,7 +670,13 @@ public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, // Based on the configured map output class, set the correct reducer to properly // sort the incoming values. // TODO it would be nice to pick one or the other of these formats. - if ( + boolean diskBasedSorting = diskBasedSortingEnabled(conf); + + if (diskBasedSorting) { + job.setMapOutputKeyClass(KeyOnlyCellComparable.class); + job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class); + job.setReducerClass(PreSortedCellsReducer.class); + } else if ( KeyValue.class.equals(job.getMapOutputValueClass()) || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass()) ) { diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 4adcfbfcd3f..03abcf15975 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -200,6 +200,10 @@ public class Import extends Configured implements Tool { this.kv = kv; } + public Cell getCell() { + return kv; + } + @Override public void write(DataOutput out) throws IOException { int keyLen = PrivateCellUtil.estimatedSerializedSizeOfKey(kv); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyOnlyCellComparable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyOnlyCellComparable.java new file mode 100644 index 00000000000..d9b28f8a689 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyOnlyCellComparable.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.yetus.audience.InterfaceAudience; + [email protected] +public class KeyOnlyCellComparable implements WritableComparable<KeyOnlyCellComparable> { + + static { + WritableComparator.define(KeyOnlyCellComparable.class, new KeyOnlyCellComparator()); + } + + private ExtendedCell cell = null; + + public KeyOnlyCellComparable() { + } + + public KeyOnlyCellComparable(ExtendedCell cell) { + this.cell = cell; + } + + public ExtendedCell getCell() { + return cell; + } + + @Override + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS", + justification = "This is wrong, yes, but we should be purging Writables, not fixing them") + public int compareTo(KeyOnlyCellComparable o) { + return CellComparator.getInstance().compare(cell, o.cell); + } + + @Override + public void write(DataOutput out) throws IOException { + int keyLen = PrivateCellUtil.estimatedSerializedSizeOfKey(cell); + int valueLen = 0; // We avoid writing value here. So just serialize as if an empty value. + out.writeInt(keyLen + valueLen + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE); + out.writeInt(keyLen); + out.writeInt(valueLen); + PrivateCellUtil.writeFlatKey(cell, out); + out.writeLong(cell.getSequenceId()); + } + + @Override + public void readFields(DataInput in) throws IOException { + cell = KeyValue.create(in); + long seqId = in.readLong(); + cell.setSequenceId(seqId); + } + + public static class KeyOnlyCellComparator extends WritableComparator { + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + try (DataInputStream d1 = new DataInputStream(new ByteArrayInputStream(b1, s1, l1)); + DataInputStream d2 = new DataInputStream(new ByteArrayInputStream(b2, s2, l2))) { + KeyOnlyCellComparable kv1 = new KeyOnlyCellComparable(); + kv1.readFields(d1); + KeyOnlyCellComparable kv2 = new KeyOnlyCellComparable(); + kv2.readFields(d2); + return compare(kv1, kv2); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PreSortedCellsReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PreSortedCellsReducer.java new file mode 100644 index 00000000000..81871ffb59c --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PreSortedCellsReducer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.MapReduceExtendedCell; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.yetus.audience.InterfaceAudience; + [email protected] +public class PreSortedCellsReducer + extends Reducer<KeyOnlyCellComparable, Cell, ImmutableBytesWritable, Cell> { + + @Override + protected void reduce(KeyOnlyCellComparable key, Iterable<Cell> values, Context context) + throws IOException, InterruptedException { + + int index = 0; + for (Cell cell : values) { + context.write(new ImmutableBytesWritable(CellUtil.cloneRow(key.getCell())), + new MapReduceExtendedCell(cell)); + + if (++index % 100 == 0) { + context.setStatus("Wrote " + index + " cells"); + } + } + } +} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index e06300848f6..cc5820c99b5 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; @@ -53,6 +54,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.MapReduceExtendedCell; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -138,9 +140,10 @@ public class WALPlayer extends Configured implements Tool { /** * A mapper that just writes out Cells. This one can be used together with {@link CellSortReducer} */ - static class WALCellMapper extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Cell> { + static class WALCellMapper extends Mapper<WALKey, WALEdit, WritableComparable<?>, Cell> { private Set<String> tableSet = new HashSet<>(); private boolean multiTableSupport = false; + private boolean diskBasedSortingEnabled = false; @Override public void map(WALKey key, WALEdit value, Context context) throws IOException { @@ -161,7 +164,8 @@ public class WALPlayer extends Configured implements Tool { byte[] outKey = multiTableSupport ? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator), CellUtil.cloneRow(cell)) : CellUtil.cloneRow(cell); - context.write(new ImmutableBytesWritable(outKey), new MapReduceExtendedCell(cell)); + ExtendedCell extendedCell = (ExtendedCell) cell; + context.write(wrapKey(outKey, extendedCell), new MapReduceExtendedCell(extendedCell)); } } } catch (InterruptedException e) { @@ -174,8 +178,23 @@ public class WALPlayer extends Configured implements Tool { Configuration conf = context.getConfiguration(); String[] tables = conf.getStrings(TABLES_KEY); this.multiTableSupport = conf.getBoolean(MULTI_TABLES_SUPPORT, false); + this.diskBasedSortingEnabled = HFileOutputFormat2.diskBasedSortingEnabled(conf); Collections.addAll(tableSet, tables); } + + private WritableComparable<?> wrapKey(byte[] key, ExtendedCell cell) { + if (this.diskBasedSortingEnabled) { + // Important to build a new cell with the updated key to maintain multi-table support + KeyValue kv = new KeyValue(key, 0, key.length, cell.getFamilyArray(), + cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), + cell.getQualifierOffset(), cell.getQualifierLength(), cell.getTimestamp(), + KeyValue.Type.codeToType(cell.getTypeByte()), null, 0, 0); + kv.setSequenceId(cell.getSequenceId()); + return new KeyOnlyCellComparable(kv); + } else { + return new ImmutableBytesWritable(key); + } + } } /** @@ -353,7 +372,13 @@ public class WALPlayer extends Configured implements Tool { job.setJarByClass(WALPlayer.class); job.setInputFormatClass(WALInputFormat.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); + boolean diskBasedSortingEnabled = HFileOutputFormat2.diskBasedSortingEnabled(conf); + if (diskBasedSortingEnabled) { + job.setMapOutputKeyClass(KeyOnlyCellComparable.class); + job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class); + } else { + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + } String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); if (hfileOutPath != null) { @@ -372,7 +397,11 @@ public class WALPlayer extends Configured implements Tool { List<TableName> tableNames = getTableNameList(tables); job.setMapperClass(WALCellMapper.class); - job.setReducerClass(CellSortReducer.class); + if (diskBasedSortingEnabled) { + job.setReducerClass(PreSortedCellsReducer.class); + } else { + job.setReducerClass(CellSortReducer.class); + } Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputValueClass(MapReduceExtendedCell.class); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellBasedWALPlayer2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellBasedWALPlayer2.java index 283acbabf6e..d6c4b623ad4 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellBasedWALPlayer2.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellBasedWALPlayer2.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.util.MapReduceExtendedCell; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.util.ToolRunner; @@ -172,7 +173,7 @@ public class TestCellBasedWALPlayer2 { WALKey key = mock(WALKey.class); when(key.getTableName()).thenReturn(TableName.valueOf("table")); @SuppressWarnings("unchecked") - Mapper<WALKey, WALEdit, ImmutableBytesWritable, Cell>.Context context = mock(Context.class); + Mapper<WALKey, WALEdit, WritableComparable<?>, Cell>.Context context = mock(Context.class); when(context.getConfiguration()).thenReturn(configuration); WALEdit value = mock(WALEdit.class); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index c6e51eee40f..d2d9cc831da 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -114,6 +114,50 @@ public class TestWALPlayer { logFs.delete(walRootDir, true); } + @Test + public void testDiskBasedSortingEnabled() throws Exception { + final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); + final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); + final byte[] FAMILY = Bytes.toBytes("family"); + final byte[] COLUMN1 = Bytes.toBytes("c1"); + final byte[] COLUMN2 = Bytes.toBytes("c2"); + final byte[] ROW = Bytes.toBytes("row"); + Table t1 = TEST_UTIL.createTable(tableName1, FAMILY); + Table t2 = TEST_UTIL.createTable(tableName2, FAMILY); + + // put a row into the first table + Put p = new Put(ROW); + p.addColumn(FAMILY, COLUMN1, COLUMN1); + p.addColumn(FAMILY, COLUMN2, COLUMN2); + t1.put(p); + // delete one column + Delete d = new Delete(ROW); + d.addColumns(FAMILY, COLUMN1); + t1.delete(d); + + // replay the WAL, map table 1 to table 2 + WAL log = cluster.getRegionServer(0).getWAL(null); + log.rollWriter(); + String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(), + HConstants.HREGION_LOGDIR_NAME).toString(); + + Configuration configuration = TEST_UTIL.getConfiguration(); + configuration.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true); + WALPlayer player = new WALPlayer(configuration); + String optionName = "_test_.name"; + configuration.set(optionName, "1000"); + player.setupTime(configuration, optionName); + assertEquals(1000, configuration.getLong(optionName, 0)); + assertEquals(0, ToolRunner.run(configuration, player, + new String[] { walInputDir, tableName1.getNameAsString(), tableName2.getNameAsString() })); + + // verify the WAL was player into table 2 + Get g = new Get(ROW); + Result r = t2.get(g); + assertEquals(1, r.size()); + assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2)); + } + /** * Test that WALPlayer can replay recovered.edits files. */ @@ -123,19 +167,22 @@ public class TestWALPlayer { TEST_UTIL.createTable(tn, TestRecoveredEdits.RECOVEREDEDITS_COLUMNFAMILY); // Copy testing recovered.edits file that is over under hbase-server test resources // up into a dir in our little hdfs cluster here. - String hbaseServerTestResourcesEdits = - System.getProperty("test.build.classes") + "/../../../hbase-server/src/test/resources/" - + TestRecoveredEdits.RECOVEREDEDITS_PATH.getName(); - assertTrue(new File(hbaseServerTestResourcesEdits).exists()); - FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem(); - // Target dir. - Path targetDir = new Path("edits").makeQualified(dfs.getUri(), dfs.getHomeDirectory()); - assertTrue(dfs.mkdirs(targetDir)); - dfs.copyFromLocalFile(new Path(hbaseServerTestResourcesEdits), targetDir); - assertEquals(0, - ToolRunner.run(new WALPlayer(this.conf), new String[] { targetDir.toString() })); - // I don't know how many edits are in this file for this table... so just check more than 1. - assertTrue(TEST_UTIL.countRows(tn) > 0); + runWithDiskBasedSortingDisabledAndEnabled(() -> { + String hbaseServerTestResourcesEdits = + System.getProperty("test.build.classes") + "/../../../hbase-server/src/test/resources/" + + TestRecoveredEdits.RECOVEREDEDITS_PATH.getName(); + assertTrue(new File(hbaseServerTestResourcesEdits).exists()); + FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem(); + // Target dir. + Path targetDir = new Path("edits").makeQualified(dfs.getUri(), dfs.getHomeDirectory()); + assertTrue(dfs.mkdirs(targetDir)); + dfs.copyFromLocalFile(new Path(hbaseServerTestResourcesEdits), targetDir); + assertEquals(0, + ToolRunner.run(new WALPlayer(this.conf), new String[] { targetDir.toString() })); + // I don't know how many edits are in this file for this table... so just check more than 1. + assertTrue(TEST_UTIL.countRows(tn) > 0); + dfs.delete(targetDir, true); + }); } /** @@ -150,7 +197,7 @@ public class TestWALPlayer { final byte[] column1 = Bytes.toBytes("c1"); final byte[] column2 = Bytes.toBytes("c2"); final byte[] row = Bytes.toBytes("row"); - Table table = TEST_UTIL.createTable(tableName, family); + final Table table = TEST_UTIL.createTable(tableName, family); long now = EnvironmentEdgeManager.currentTime(); // put a row into the first table @@ -187,29 +234,38 @@ public class TestWALPlayer { configuration.set(WALPlayer.BULK_OUTPUT_CONF_KEY, outPath); configuration.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); - WALPlayer player = new WALPlayer(configuration); - assertEquals(0, ToolRunner.run(configuration, player, - new String[] { walInputDir, tableName.getNameAsString() })); + final byte[] finalLastVal = lastVal; + + runWithDiskBasedSortingDisabledAndEnabled(() -> { + WALPlayer player = new WALPlayer(configuration); + assertEquals(0, ToolRunner.run(configuration, player, + new String[] { walInputDir, tableName.getNameAsString() })); - Get g = new Get(row); - Result result = table.get(g); - byte[] value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1)); - assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(lastVal))); + Get g = new Get(row); + Result result = table.get(g); + byte[] value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1)); + assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(finalLastVal))); - table = TEST_UTIL.truncateTable(tableName); - g = new Get(row); - result = table.get(g); - assertThat(result.listCells(), nullValue()); + TEST_UTIL.truncateTable(tableName); + g = new Get(row); + result = table.get(g); + assertThat(result.listCells(), nullValue()); - BulkLoadHFiles.create(configuration).bulkLoad(tableName, - new Path(outPath, tableName.getNameAsString())); + BulkLoadHFiles.create(configuration).bulkLoad(tableName, + new Path(outPath, tableName.getNameAsString())); - g = new Get(row); - result = table.get(g); - value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1)); + g = new Get(row); + result = table.get(g); + value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1)); - assertThat(result.listCells(), notNullValue()); - assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(lastVal))); + assertThat(result.listCells(), notNullValue()); + assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(finalLastVal))); + + // cleanup + Path out = new Path(outPath); + FileSystem fs = out.getFileSystem(configuration); + assertTrue(fs.delete(out, true)); + }); } /** @@ -244,18 +300,21 @@ public class TestWALPlayer { Configuration configuration = TEST_UTIL.getConfiguration(); WALPlayer player = new WALPlayer(configuration); - String optionName = "_test_.name"; - configuration.set(optionName, "1000"); - player.setupTime(configuration, optionName); - assertEquals(1000, configuration.getLong(optionName, 0)); - assertEquals(0, ToolRunner.run(configuration, player, - new String[] { walInputDir, tableName1.getNameAsString(), tableName2.getNameAsString() })); - // verify the WAL was player into table 2 - Get g = new Get(ROW); - Result r = t2.get(g); - assertEquals(1, r.size()); - assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2)); + runWithDiskBasedSortingDisabledAndEnabled(() -> { + String optionName = "_test_.name"; + configuration.set(optionName, "1000"); + player.setupTime(configuration, optionName); + assertEquals(1000, configuration.getLong(optionName, 0)); + assertEquals(0, ToolRunner.run(configuration, player, + new String[] { walInputDir, tableName1.getNameAsString(), tableName2.getNameAsString() })); + + // verify the WAL was player into table 2 + Get g = new Get(ROW); + Result r = t2.get(g); + assertEquals(1, r.size()); + assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2)); + }); } /** @@ -335,7 +394,29 @@ public class TestWALPlayer { System.setErr(oldPrintStream); System.setSecurityManager(SECURITY_MANAGER); } + } + + private static void runWithDiskBasedSortingDisabledAndEnabled(TestMethod method) + throws Exception { + TEST_UTIL.getConfiguration().setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, + false); + try { + method.run(); + } finally { + TEST_UTIL.getConfiguration().unset(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY); + } + + TEST_UTIL.getConfiguration().setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, + true); + try { + method.run(); + } finally { + TEST_UTIL.getConfiguration().unset(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY); + } + } + private interface TestMethod { + void run() throws Exception; } }
