This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push: new b7def4ff9b9 HBASE-28459 HFileOutputFormat2 ClassCastException with s3 magic committer (#5858) b7def4ff9b9 is described below commit b7def4ff9b98164152ea4ccff6206bfe3d17bac6 Author: Sravishtta Kommineni <49591501+ksravi...@users.noreply.github.com> AuthorDate: Mon May 6 04:17:09 2024 -0400 HBASE-28459 HFileOutputFormat2 ClassCastException with s3 magic committer (#5858) Co-authored-by: Sravi Kommineni <skommin...@hubspot.com> Signed-off-by: Duo Zhang <zhang...@apache.org> --- .../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 8 +++- .../hbase/mapreduce/TestHFileOutputFormat2.java | 56 ++++++++++++++++++++++ 2 files changed, 62 insertions(+), 2 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 98951667cbe..43dd4a7160e 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.MapReduceExtendedCell; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; @@ -87,7 +88,6 @@ import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; import org.apache.yetus.audience.InterfaceAudience; @@ -215,11 +215,15 @@ public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable, return combineTableNameSuffix(tableName, family); } + protected static Path getWorkPath(final OutputCommitter committer) { + return (Path) ReflectionUtils.invokeMethod(committer, "getWorkPath"); + } + static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter( final TaskAttemptContext context, final OutputCommitter committer) throws IOException { // Get the path of the temporary output file - final Path outputDir = ((FileOutputCommitter) committer).getWorkPath(); + final Path outputDir = getWorkPath(committer); final Configuration conf = context.getConfiguration(); final boolean writeMultipleTables = conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 128d690a9a1..3c486a8a52f 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -112,9 +112,12 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.junit.ClassRule; import org.junit.Ignore; @@ -1599,6 +1602,59 @@ public class TestHFileOutputFormat2 { } } + @Test + public void itGetsWorkPathHadoop2() throws Exception { + Configuration conf = new Configuration(this.util.getConfiguration()); + Job job = new Job(conf); + FileOutputCommitter committer = + new FileOutputCommitter(new Path("/test"), createTestTaskAttemptContext(job)); + assertEquals(committer.getWorkPath(), HFileOutputFormat2.getWorkPath(committer)); + } + + @Test + public void itGetsWorkPathHadoo3() { + Hadoop3TestOutputCommitter committer = new Hadoop3TestOutputCommitter(new Path("/test")); + assertEquals(committer.getWorkPath(), HFileOutputFormat2.getWorkPath(committer)); + } + + static class Hadoop3TestOutputCommitter extends OutputCommitter { + + Path path; + + Hadoop3TestOutputCommitter(Path path) { + this.path = path; + } + + public Path getWorkPath() { + return path; + } + + @Override + public void setupJob(JobContext jobContext) throws IOException { + + } + + @Override + public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException { + + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException { + return false; + } + + @Override + public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException { + + } + + @Override + public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException { + + } + } + private static class ConfigurationCaptorConnection implements Connection { private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid";