This is an automated email from the ASF dual-hosted git repository.
pankajkumar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 7cdfe0fea77 HBASE-29477 Add configuration support for custom
OutputCommitter in TableOutputFormat (#7177)
7cdfe0fea77 is described below
commit 7cdfe0fea77e348958b2d5aad83c61705b94e809
Author: Prathyush <[email protected]>
AuthorDate: Mon Aug 4 22:25:34 2025 +0530
HBASE-29477 Add configuration support for custom OutputCommitter in
TableOutputFormat (#7177)
Signed-off-by: Viraj Jasani <[email protected]>
Signed-off-by: Pankaj Kumar <[email protected]>
Reviewed-by: Ujjawal <[email protected]>
Reviewed-by: Kevin Geiszler <[email protected]>
---
.../hadoop/hbase/mapreduce/TableOutputFormat.java | 24 ++++++++++++-
.../hbase/mapreduce/TestTableOutputFormat.java | 41 ++++++++++++++++++++++
2 files changed, 64 insertions(+), 1 deletion(-)
diff --git
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
index c74ecf90030..d6119d03518 100644
---
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
+++
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -74,6 +75,16 @@ public class TableOutputFormat<KEY> extends
OutputFormat<KEY, Mutation> implemen
*/
public static final String OUTPUT_CLUSTER = "hbase.mapred.outputcluster";
+ /**
+ * The configuration key for specifying a custom
+ * {@link org.apache.hadoop.mapreduce.OutputCommitter} implementation to be
used by
+ * {@link TableOutputFormat}. The value for this property should be the
fully qualified class name
+ * of the custom committer. If this property is not set, {@link
TableOutputCommitter} will be used
+ * by default.
+ */
+ public static final String OUTPUT_COMMITTER_CLASS =
+ "hbase.mapreduce.tableoutputformat.output.committer.class";
+
/**
* Prefix for configuration property overrides to apply in {@link
#setConf(Configuration)}. For
* keys matching this prefix, the prefix is stripped, and the value is set
in the configuration
@@ -252,7 +263,18 @@ public class TableOutputFormat<KEY> extends
OutputFormat<KEY, Mutation> implemen
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException {
- return new TableOutputCommitter();
+ Configuration hConf = getConf();
+ if (hConf == null) {
+ hConf = context.getConfiguration();
+ }
+
+ try {
+ Class<? extends OutputCommitter> outputCommitter =
+ hConf.getClass(OUTPUT_COMMITTER_CLASS, TableOutputCommitter.class,
OutputCommitter.class);
+ return ReflectionUtils.newInstance(outputCommitter);
+ } catch (Exception e) {
+ throw new IOException("Could not create the configured OutputCommitter",
e);
+ }
}
@Override
diff --git
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableOutputFormat.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableOutputFormat.java
index 56390b16cec..52c7321617a 100644
---
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableOutputFormat.java
+++
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableOutputFormat.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.junit.After;
@@ -127,4 +129,43 @@ public class TestTableOutputFormat {
Assert.assertEquals("Durability of the mutation should be SKIP_WAL",
Durability.SKIP_WAL,
delete.getDurability());
}
+
+ @Test
+ public void testOutputCommitterConfiguration() throws IOException,
InterruptedException {
+ // 1. Verify it returns the default committer when the property is not set.
+ conf.unset(TableOutputFormat.OUTPUT_COMMITTER_CLASS);
+ tableOutputFormat.setConf(conf);
+ Assert.assertEquals("Should use default committer",
TableOutputCommitter.class,
+ tableOutputFormat.getOutputCommitter(context).getClass());
+
+ // 2. Verify it returns the custom committer when the property is set.
+ conf.set(TableOutputFormat.OUTPUT_COMMITTER_CLASS,
DummyCommitter.class.getName());
+ tableOutputFormat.setConf(conf);
+ Assert.assertEquals("Should use custom committer", DummyCommitter.class,
+ tableOutputFormat.getOutputCommitter(context).getClass());
+ }
+
+ // Simple dummy committer for testing
+ public static class DummyCommitter extends OutputCommitter {
+ @Override
+ public void setupJob(JobContext jobContext) {
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskContext) {
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+ return false;
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskContext) {
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskContext) {
+ }
+ }
}