This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 6ea0f1c93dc CAMEL-18817: camel-hbase - Support append and increment operations in the producer (#8905) 6ea0f1c93dc is described below commit 6ea0f1c93dcc68914d71b9e20467d5db11bd41eb Author: Kengo Seki <sek...@apache.org> AuthorDate: Thu Dec 15 14:45:04 2022 +0900 CAMEL-18817: camel-hbase - Support append and increment operations in the producer (#8905) --- .../camel-hbase/src/main/docs/hbase-component.adoc | 2 + .../camel/component/hbase/HBaseConstants.java | 2 + .../camel/component/hbase/HBaseProducer.java | 67 ++++++++++++++++++++++ .../camel/component/hbase/HBaseProducerIT.java | 54 +++++++++++++++++ 4 files changed, 125 insertions(+) diff --git a/components/camel-hbase/src/main/docs/hbase-component.adoc b/components/camel-hbase/src/main/docs/hbase-component.adoc index c238c1e40be..d298aff62f7 100644 --- a/components/camel-hbase/src/main/docs/hbase-component.adoc +++ b/components/camel-hbase/src/main/docs/hbase-component.adoc @@ -102,6 +102,8 @@ where *table* is the table name. The supported operations are: * Put +* Append +* Increment * Get * Delete * Scan diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java index 5a902d8d365..4b0aafbeca9 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java @@ -24,6 +24,8 @@ public interface HBaseConstants { String OPERATION = "CamelHBaseOperation"; String PUT = "CamelHBasePut"; + String APPEND = "CamelHBaseAppend"; + String INCREMENT = "CamelHBaseIncrement"; String GET = "CamelHBaseGet"; String SCAN = "CamelHBaseScan"; String DELETE = "CamelHBaseDelete"; diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java index 962ee4fa567..157e3d7cc38 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java @@ -30,8 +30,10 @@ import org.apache.camel.support.DefaultProducer; import org.apache.camel.util.ObjectHelper; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -69,6 +71,8 @@ public class HBaseProducer extends DefaultProducer { HBaseData data = mappingStrategy.resolveModel(exchange.getIn()); List<Put> putOperations = new LinkedList<>(); + List<Append> appendOperations = new LinkedList<>(); + List<Increment> incrementOperations = new LinkedList<>(); List<Delete> deleteOperations = new LinkedList<>(); List<HBaseRow> getOperationResult = new LinkedList<>(); List<HBaseRow> scanOperationResult = new LinkedList<>(); @@ -77,6 +81,10 @@ public class HBaseProducer extends DefaultProducer { hRow.apply(rowModel); if (HBaseConstants.PUT.equals(operation)) { putOperations.add(createPut(hRow)); + } else if (HBaseConstants.APPEND.equals(operation)) { + appendOperations.add(createAppend(hRow)); + } else if (HBaseConstants.INCREMENT.equals(operation)) { + incrementOperations.add(createIncrement(hRow)); } else if (HBaseConstants.GET.equals(operation)) { HBaseRow getResultRow = getCells(table, hRow); getOperationResult.add(getResultRow); @@ -90,6 +98,14 @@ public class HBaseProducer extends DefaultProducer { //Check if we have something to add. if (!putOperations.isEmpty()) { table.put(putOperations); + } else if (!appendOperations.isEmpty()) { + for (Append appendOperation : appendOperations) { + table.append(appendOperation); + } + } else if (!incrementOperations.isEmpty()) { + for (Increment incrementOperation : incrementOperations) { + table.increment(incrementOperation); + } } else if (!deleteOperations.isEmpty()) { table.delete(deleteOperations); } else if (!getOperationResult.isEmpty()) { @@ -125,6 +141,57 @@ public class HBaseProducer extends DefaultProducer { return put; } + /** + * Creates an HBase {@link Append} on a specific row, using a collection of values (family/column/value pairs). + */ + private Append createAppend(HBaseRow hRow) { + ObjectHelper.notNull(hRow, "HBase row"); + ObjectHelper.notNull(hRow.getId(), "HBase row id"); + ObjectHelper.notNull(hRow.getCells(), "HBase cells"); + + Append append = new Append(endpoint.getCamelContext().getTypeConverter().convertTo(byte[].class, hRow.getId())); + Set<HBaseCell> cells = hRow.getCells(); + for (HBaseCell cell : cells) { + String family = cell.getFamily(); + String column = cell.getQualifier(); + Object value = cell.getValue(); + + ObjectHelper.notNull(family, "HBase column family", cell); + ObjectHelper.notNull(column, "HBase column", cell); + append.addColumn( + HBaseHelper.getHBaseFieldAsBytes(family), + HBaseHelper.getHBaseFieldAsBytes(column), + endpoint.getCamelContext().getTypeConverter().convertTo(byte[].class, value)); + } + return append; + } + + /** + * Creates an HBase {@link Increment} on a specific row, using a collection of values (family/column/value pairs). + */ + private Increment createIncrement(HBaseRow hRow) { + ObjectHelper.notNull(hRow, "HBase row"); + ObjectHelper.notNull(hRow.getId(), "HBase row id"); + ObjectHelper.notNull(hRow.getCells(), "HBase cells"); + + Increment increment + = new Increment(endpoint.getCamelContext().getTypeConverter().convertTo(byte[].class, hRow.getId())); + Set<HBaseCell> cells = hRow.getCells(); + for (HBaseCell cell : cells) { + String family = cell.getFamily(); + String column = cell.getQualifier(); + Object value = cell.getValue(); + + ObjectHelper.notNull(family, "HBase column family", cell); + ObjectHelper.notNull(column, "HBase column", cell); + increment.addColumn( + HBaseHelper.getHBaseFieldAsBytes(family), + HBaseHelper.getHBaseFieldAsBytes(column), + endpoint.getCamelContext().getTypeConverter().convertTo(Long.class, value)); + } + return increment; + } + /** * Performs an HBase {@link Get} on a specific row, using a collection of values (family/column/value pairs). The * result is diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerIT.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerIT.java index e4aa69c5c9d..fd961d20cc2 100644 --- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerIT.java +++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerIT.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.hbase; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -63,6 +64,59 @@ public class HBaseProducerIT extends CamelHBaseTestSupport { IOHelper.close(table); } + @Test + public void testPutAndAppend() throws Exception { + Map<String, Object> headers = new HashMap<>(); + headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]); + headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]); + headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]); + headers.put(HBaseAttribute.HBASE_VALUE.asHeader(), body[0][0][0]); + headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT); + template.sendBodyAndHeaders("direct:start", null, headers); + + headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][1]); + headers.put(HBaseAttribute.HBASE_VALUE.asHeader(), body[0][0][1]); + headers.put(HBaseConstants.OPERATION, HBaseConstants.APPEND); + template.sendBodyAndHeaders("direct:start", null, headers); + + Connection connection = connectHBase(); + Table table = connection.getTable(TableName.valueOf(PERSON_TABLE.getBytes())); + + Get get = new Get(key[0].getBytes()); + get.addColumn(family[0].getBytes(), column[0][0].getBytes()); + get.addColumn(family[0].getBytes(), column[0][1].getBytes()); + Result result = table.get(get); + assertArrayEquals(body[0][0][0].getBytes(), result.getValue(family[0].getBytes(), column[0][0].getBytes())); + assertArrayEquals(body[0][0][1].getBytes(), result.getValue(family[0].getBytes(), column[0][1].getBytes())); + + IOHelper.close(table); + } + + @Test + public void testPutAndIncrement() throws Exception { + Map<String, Object> headers = new HashMap<>(); + headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]); + headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(), family[1]); + headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[1][2]); + headers.put(HBaseAttribute.HBASE_VALUE.asHeader(), Long.valueOf(body[0][1][2])); + headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT); + template.sendBodyAndHeaders("direct:start", null, headers); + + headers.put(HBaseAttribute.HBASE_VALUE.asHeader(), 1L); + headers.put(HBaseConstants.OPERATION, HBaseConstants.INCREMENT); + template.sendBodyAndHeaders("direct:start", null, headers); + + Connection connection = connectHBase(); + Table table = connection.getTable(TableName.valueOf(PERSON_TABLE.getBytes())); + + Get get = new Get(key[0].getBytes()); + get.addColumn(family[1].getBytes(), column[1][2].getBytes()); + Result result = table.get(get); + assertEquals(Long.valueOf(body[0][1][2]) + 1, ByteBuffer.wrap(result.value()).getLong()); + + IOHelper.close(table); + } + @Test public void testPutAndGet() throws Exception { testPut();