This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 6ea0f1c93dc CAMEL-18817: camel-hbase - Support append and increment 
operations in the producer (#8905)
6ea0f1c93dc is described below

commit 6ea0f1c93dcc68914d71b9e20467d5db11bd41eb
Author: Kengo Seki <sek...@apache.org>
AuthorDate: Thu Dec 15 14:45:04 2022 +0900

    CAMEL-18817: camel-hbase - Support append and increment operations in the 
producer (#8905)
---
 .../camel-hbase/src/main/docs/hbase-component.adoc |  2 +
 .../camel/component/hbase/HBaseConstants.java      |  2 +
 .../camel/component/hbase/HBaseProducer.java       | 67 ++++++++++++++++++++++
 .../camel/component/hbase/HBaseProducerIT.java     | 54 +++++++++++++++++
 4 files changed, 125 insertions(+)

diff --git a/components/camel-hbase/src/main/docs/hbase-component.adoc 
b/components/camel-hbase/src/main/docs/hbase-component.adoc
index c238c1e40be..d298aff62f7 100644
--- a/components/camel-hbase/src/main/docs/hbase-component.adoc
+++ b/components/camel-hbase/src/main/docs/hbase-component.adoc
@@ -102,6 +102,8 @@ where *table* is the table name.
 The supported operations are:
 
 * Put
+* Append
+* Increment
 * Get
 * Delete
 * Scan
diff --git 
a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java
 
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java
index 5a902d8d365..4b0aafbeca9 100644
--- 
a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java
+++ 
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java
@@ -24,6 +24,8 @@ public interface HBaseConstants {
     String OPERATION = "CamelHBaseOperation";
 
     String PUT = "CamelHBasePut";
+    String APPEND = "CamelHBaseAppend";
+    String INCREMENT = "CamelHBaseIncrement";
     String GET = "CamelHBaseGet";
     String SCAN = "CamelHBaseScan";
     String DELETE = "CamelHBaseDelete";
diff --git 
a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
 
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
index 962ee4fa567..157e3d7cc38 100644
--- 
a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
+++ 
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
@@ -30,8 +30,10 @@ import org.apache.camel.support.DefaultProducer;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -69,6 +71,8 @@ public class HBaseProducer extends DefaultProducer {
             HBaseData data = mappingStrategy.resolveModel(exchange.getIn());
 
             List<Put> putOperations = new LinkedList<>();
+            List<Append> appendOperations = new LinkedList<>();
+            List<Increment> incrementOperations = new LinkedList<>();
             List<Delete> deleteOperations = new LinkedList<>();
             List<HBaseRow> getOperationResult = new LinkedList<>();
             List<HBaseRow> scanOperationResult = new LinkedList<>();
@@ -77,6 +81,10 @@ public class HBaseProducer extends DefaultProducer {
                 hRow.apply(rowModel);
                 if (HBaseConstants.PUT.equals(operation)) {
                     putOperations.add(createPut(hRow));
+                } else if (HBaseConstants.APPEND.equals(operation)) {
+                    appendOperations.add(createAppend(hRow));
+                } else if (HBaseConstants.INCREMENT.equals(operation)) {
+                    incrementOperations.add(createIncrement(hRow));
                 } else if (HBaseConstants.GET.equals(operation)) {
                     HBaseRow getResultRow = getCells(table, hRow);
                     getOperationResult.add(getResultRow);
@@ -90,6 +98,14 @@ public class HBaseProducer extends DefaultProducer {
             //Check if we have something to add.
             if (!putOperations.isEmpty()) {
                 table.put(putOperations);
+            } else if (!appendOperations.isEmpty()) {
+                for (Append appendOperation : appendOperations) {
+                    table.append(appendOperation);
+                }
+            } else if (!incrementOperations.isEmpty()) {
+                for (Increment incrementOperation : incrementOperations) {
+                    table.increment(incrementOperation);
+                }
             } else if (!deleteOperations.isEmpty()) {
                 table.delete(deleteOperations);
             } else if (!getOperationResult.isEmpty()) {
@@ -125,6 +141,57 @@ public class HBaseProducer extends DefaultProducer {
         return put;
     }
 
+    /**
+     * Creates an HBase {@link Append} on a specific row, using a collection 
of values (family/column/value pairs).
+     */
+    private Append createAppend(HBaseRow hRow) {
+        ObjectHelper.notNull(hRow, "HBase row");
+        ObjectHelper.notNull(hRow.getId(), "HBase row id");
+        ObjectHelper.notNull(hRow.getCells(), "HBase cells");
+
+        Append append = new 
Append(endpoint.getCamelContext().getTypeConverter().convertTo(byte[].class, 
hRow.getId()));
+        Set<HBaseCell> cells = hRow.getCells();
+        for (HBaseCell cell : cells) {
+            String family = cell.getFamily();
+            String column = cell.getQualifier();
+            Object value = cell.getValue();
+
+            ObjectHelper.notNull(family, "HBase column family", cell);
+            ObjectHelper.notNull(column, "HBase column", cell);
+            append.addColumn(
+                    HBaseHelper.getHBaseFieldAsBytes(family),
+                    HBaseHelper.getHBaseFieldAsBytes(column),
+                    
endpoint.getCamelContext().getTypeConverter().convertTo(byte[].class, value));
+        }
+        return append;
+    }
+
+    /**
+     * Creates an HBase {@link Increment} on a specific row, using a 
collection of values (family/column/value pairs).
+     */
+    private Increment createIncrement(HBaseRow hRow) {
+        ObjectHelper.notNull(hRow, "HBase row");
+        ObjectHelper.notNull(hRow.getId(), "HBase row id");
+        ObjectHelper.notNull(hRow.getCells(), "HBase cells");
+
+        Increment increment
+                = new 
Increment(endpoint.getCamelContext().getTypeConverter().convertTo(byte[].class, 
hRow.getId()));
+        Set<HBaseCell> cells = hRow.getCells();
+        for (HBaseCell cell : cells) {
+            String family = cell.getFamily();
+            String column = cell.getQualifier();
+            Object value = cell.getValue();
+
+            ObjectHelper.notNull(family, "HBase column family", cell);
+            ObjectHelper.notNull(column, "HBase column", cell);
+            increment.addColumn(
+                    HBaseHelper.getHBaseFieldAsBytes(family),
+                    HBaseHelper.getHBaseFieldAsBytes(column),
+                    
endpoint.getCamelContext().getTypeConverter().convertTo(Long.class, value));
+        }
+        return increment;
+    }
+
     /**
      * Performs an HBase {@link Get} on a specific row, using a collection of 
values (family/column/value pairs). The
      * result is
diff --git 
a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerIT.java
 
b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerIT.java
index e4aa69c5c9d..fd961d20cc2 100644
--- 
a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerIT.java
+++ 
b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerIT.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.hbase;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -63,6 +64,59 @@ public class HBaseProducerIT extends CamelHBaseTestSupport {
         IOHelper.close(table);
     }
 
+    @Test
+    public void testPutAndAppend() throws Exception {
+        Map<String, Object> headers = new HashMap<>();
+        headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+        headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+        headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]);
+        headers.put(HBaseAttribute.HBASE_VALUE.asHeader(), body[0][0][0]);
+        headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
+        template.sendBodyAndHeaders("direct:start", null, headers);
+
+        headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][1]);
+        headers.put(HBaseAttribute.HBASE_VALUE.asHeader(), body[0][0][1]);
+        headers.put(HBaseConstants.OPERATION, HBaseConstants.APPEND);
+        template.sendBodyAndHeaders("direct:start", null, headers);
+
+        Connection connection = connectHBase();
+        Table table = 
connection.getTable(TableName.valueOf(PERSON_TABLE.getBytes()));
+
+        Get get = new Get(key[0].getBytes());
+        get.addColumn(family[0].getBytes(), column[0][0].getBytes());
+        get.addColumn(family[0].getBytes(), column[0][1].getBytes());
+        Result result = table.get(get);
+        assertArrayEquals(body[0][0][0].getBytes(), 
result.getValue(family[0].getBytes(), column[0][0].getBytes()));
+        assertArrayEquals(body[0][0][1].getBytes(), 
result.getValue(family[0].getBytes(), column[0][1].getBytes()));
+
+        IOHelper.close(table);
+    }
+
+    @Test
+    public void testPutAndIncrement() throws Exception {
+        Map<String, Object> headers = new HashMap<>();
+        headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+        headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(), family[1]);
+        headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[1][2]);
+        headers.put(HBaseAttribute.HBASE_VALUE.asHeader(), 
Long.valueOf(body[0][1][2]));
+        headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT);
+        template.sendBodyAndHeaders("direct:start", null, headers);
+
+        headers.put(HBaseAttribute.HBASE_VALUE.asHeader(), 1L);
+        headers.put(HBaseConstants.OPERATION, HBaseConstants.INCREMENT);
+        template.sendBodyAndHeaders("direct:start", null, headers);
+
+        Connection connection = connectHBase();
+        Table table = 
connection.getTable(TableName.valueOf(PERSON_TABLE.getBytes()));
+
+        Get get = new Get(key[0].getBytes());
+        get.addColumn(family[1].getBytes(), column[1][2].getBytes());
+        Result result = table.get(get);
+        assertEquals(Long.valueOf(body[0][1][2]) + 1, 
ByteBuffer.wrap(result.value()).getLong());
+
+        IOHelper.close(table);
+    }
+
     @Test
     public void testPutAndGet() throws Exception {
         testPut();

Reply via email to