This is an automated email from the ASF dual-hosted git repository.

pankajkumar pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new c26953584d8 HBASE-29477 Add configuration support for custom 
OutputCommitter in TableOutputFormat (#7185)
c26953584d8 is described below

commit c26953584d8fd316e5f1385a4b82c7d645cff8a1
Author: Prathyush <[email protected]>
AuthorDate: Mon Aug 4 22:34:29 2025 +0530

    HBASE-29477 Add configuration support for custom OutputCommitter in 
TableOutputFormat (#7185)
    
    Signed-off-by: Viraj Jasani <[email protected]>
    Signed-off-by: Pankaj Kumar <[email protected]>
    Reviewed-by: Ujjawal <[email protected]>
    Reviewed-by: Kevin Geiszler <[email protected]>
---
 .../hadoop/hbase/mapreduce/TableOutputFormat.java  | 24 ++++++++++++-
 .../hbase/mapreduce/TestTableOutputFormat.java     | 41 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 1 deletion(-)

diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
index 109aeef4ce0..54a357f5bef 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -72,6 +73,16 @@ public class TableOutputFormat<KEY> extends 
OutputFormat<KEY, Mutation> implemen
    */
   public static final String OUTPUT_CONF_PREFIX = "hbase.mapred.output.";
 
+  /**
+   * The configuration key for specifying a custom
+   * {@link org.apache.hadoop.mapreduce.OutputCommitter} implementation to be 
used by
+   * {@link TableOutputFormat}. The value for this property should be the 
fully qualified class name
+   * of the custom committer. If this property is not set, {@link 
TableOutputCommitter} will be used
+   * by default.
+   */
+  public static final String OUTPUT_COMMITTER_CLASS =
+    "hbase.mapreduce.tableoutputformat.output.committer.class";
+
   /**
    * Optional job parameter to specify a peer cluster. Used specifying remote 
cluster when copying
    * between hbase clusters (the source is picked up from 
<code>hbase-site.xml</code>).
@@ -216,7 +227,18 @@ public class TableOutputFormat<KEY> extends 
OutputFormat<KEY, Mutation> implemen
   @Override
   public OutputCommitter getOutputCommitter(TaskAttemptContext context)
     throws IOException, InterruptedException {
-    return new TableOutputCommitter();
+    Configuration hConf = getConf();
+    if (hConf == null) {
+      hConf = context.getConfiguration();
+    }
+
+    try {
+      Class<? extends OutputCommitter> outputCommitter =
+        hConf.getClass(OUTPUT_COMMITTER_CLASS, TableOutputCommitter.class, 
OutputCommitter.class);
+      return ReflectionUtils.newInstance(outputCommitter);
+    } catch (Exception e) {
+      throw new IOException("Could not create the configured OutputCommitter", 
e);
+    }
   }
 
   @Override
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableOutputFormat.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableOutputFormat.java
index 801099819b7..9170b26564d 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableOutputFormat.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableOutputFormat.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.junit.After;
@@ -127,4 +129,43 @@ public class TestTableOutputFormat {
     Assert.assertEquals("Durability of the mutation should be SKIP_WAL", 
Durability.SKIP_WAL,
       delete.getDurability());
   }
+
+  @Test
+  public void testOutputCommitterConfiguration() throws IOException, 
InterruptedException {
+    // 1. Verify it returns the default committer when the property is not set.
+    conf.unset(TableOutputFormat.OUTPUT_COMMITTER_CLASS);
+    tableOutputFormat.setConf(conf);
+    Assert.assertEquals("Should use default committer", 
TableOutputCommitter.class,
+      tableOutputFormat.getOutputCommitter(context).getClass());
+
+    // 2. Verify it returns the custom committer when the property is set.
+    conf.set(TableOutputFormat.OUTPUT_COMMITTER_CLASS, 
DummyCommitter.class.getName());
+    tableOutputFormat.setConf(conf);
+    Assert.assertEquals("Should use custom committer", DummyCommitter.class,
+      tableOutputFormat.getOutputCommitter(context).getClass());
+  }
+
+  // Simple dummy committer for testing
+  public static class DummyCommitter extends OutputCommitter {
+    @Override
+    public void setupJob(JobContext jobContext) {
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskContext) {
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+      return false;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskContext) {
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskContext) {
+    }
+  }
 }

Reply via email to