Repository: camel Updated Branches: refs/heads/master 745aa5ce4 -> fa9b6e5ef
polished the code a bit, added some more tests and removed a deprecated API call Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fa9b6e5e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fa9b6e5e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fa9b6e5e Branch: refs/heads/master Commit: fa9b6e5efbbff588d2a2c2f982c6e26cdebf39be Parents: 745aa5c Author: cmueller <cmuel...@apache.org> Authored: Fri Jun 27 11:27:29 2014 +0200 Committer: cmueller <cmuel...@apache.org> Committed: Fri Jun 27 11:27:48 2014 +0200 ---------------------------------------------------------------------- .../camel/component/hbase/HBaseConsumer.java | 3 -- .../camel/component/hbase/HBaseProducer.java | 8 ++--- .../idempotent/HBaseIdempotentRepository.java | 1 + .../component/hbase/CamelHBaseFilterTest.java | 31 +++------------- .../component/hbase/CamelHBaseTestSupport.java | 29 +++++++++++++++ .../component/hbase/HBaseConsumerTest.java | 30 ++-------------- .../component/hbase/HBaseConvertionsTest.java | 30 ++-------------- .../component/hbase/HBaseProducerTest.java | 37 +++----------------- .../HBaseIdempotentRepositoryTest.java | 16 +++++++-- 9 files changed, 59 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/fa9b6e5e/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java index 67a3050..c803418 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java @@ -149,7 +149,6 @@ public class HBaseConsumer extends ScheduledBatchPollingConsumer { } } - @Override public int processBatch(Queue<Object> exchanges) throws Exception { int total = exchanges.size(); @@ -206,5 +205,3 @@ public class HBaseConsumer extends ScheduledBatchPollingConsumer { this.rowModel = rowModel; } } - - http://git-wip-us.apache.org/repos/asf/camel/blob/fa9b6e5e/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java index 7d4ab3d..b9c5ebb 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.filter.FilterList; * The HBase producer. */ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { + private HBaseEndpoint endpoint; private String tableName; private final HTablePool tablePool; @@ -59,7 +60,6 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { this.rowModel = endpoint.getRowModel(); } - public void process(Exchange exchange) throws Exception { HTableInterface table = tablePool.getTable(tableName.getBytes()); try { @@ -101,11 +101,10 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { mappingStrategy.applyScanResults(exchange.getOut(), new HBaseData(scanOperationResult)); } } finally { - tablePool.putTable(table); + table.close(); } } - /** * Creates an HBase {@link Put} on a specific row, using a collection of values (family/column/value pairs). * @@ -179,7 +178,6 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { return resultRow; } - /** * Creates an HBase {@link Delete} on a specific row, using a collection of values (family/column/value pairs). * @@ -192,7 +190,6 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { return new Delete(endpoint.getCamelContext().getTypeConverter().convertTo(byte[].class, hRow.getId())); } - /** * Perfoms an HBase {@link Get} on a specific row, using a collection of values (family/column/value pairs). * The result is <p>the most recent entry</p> for each column. @@ -247,7 +244,6 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { return rowSet; } - /** * This methods fill possible gaps in the {@link Exchange} headers, with values passed from the Endpoint. */ http://git-wip-us.apache.org/repos/asf/camel/blob/fa9b6e5e/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java index 4e385b0..4459bba 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java @@ -33,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class HBaseIdempotentRepository extends ServiceSupport implements IdempotentRepository<Object> { + private static final Logger LOG = LoggerFactory.getLogger(HBaseIdempotentRepository.class); private final String tableName; http://git-wip-us.apache.org/repos/asf/camel/blob/fa9b6e5e/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java index 3b9517e..eb3c050 100644 --- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java +++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java @@ -18,6 +18,7 @@ package org.apache.camel.component.hbase; import java.util.LinkedList; import java.util.List; + import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; @@ -26,37 +27,13 @@ import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.hbase.filters.ModelAwareColumnMatchingFilter; import org.apache.camel.impl.JndiRegistry; -import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.filter.Filter; -import org.junit.After; -import org.junit.Before; import org.junit.Test; public class CamelHBaseFilterTest extends CamelHBaseTestSupport { List<Filter> filters = new LinkedList<Filter>(); - @Before - public void setUp() throws Exception { - if (systemReady) { - try { - hbaseUtil.createTable(HBaseHelper.getHBaseFieldAsBytes(PERSON_TABLE), families); - } catch (TableExistsException ex) { - //Ignore if table exists - } - - super.setUp(); - } - } - - @After - public void tearDown() throws Exception { - if (systemReady) { - hbaseUtil.deleteTable(PERSON_TABLE.getBytes()); - super.tearDown(); - } - } - @Override protected JndiRegistry createRegistry() throws Exception { JndiRegistry jndi = super.createRegistry(); @@ -92,11 +69,11 @@ public class CamelHBaseFilterTest extends CamelHBaseTestSupport { @Override public void configure() { from("direct:start") - .to("hbase://" + PERSON_TABLE); + .to("hbase://" + PERSON_TABLE); + from("direct:scan") - .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2&filters=#myFilters"); + .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2&filters=#myFilters"); } }; } - } http://git-wip-us.apache.org/repos/asf/camel/blob/fa9b6e5e/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java index e246264..26ff675 100644 --- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java +++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java @@ -24,10 +24,15 @@ import org.apache.camel.test.junit4.CamelTestSupport; import org.apache.camel.util.IOHelper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class CamelHBaseTestSupport extends CamelTestSupport { @@ -40,6 +45,8 @@ public abstract class CamelHBaseTestSupport extends CamelTestSupport { protected static final String PERSON_TABLE = "person"; protected static final String INFO_FAMILY = "info"; + private static final Logger LOG = LoggerFactory.getLogger(CamelHBaseTestSupport.class); + protected String[] key = {"1", "2", "3"}; protected final String[] family = {"info", "birthdate", "address"}; //comlumn[family][column] @@ -66,6 +73,7 @@ public abstract class CamelHBaseTestSupport extends CamelTestSupport { try { hbaseUtil.startMiniCluster(numServers); } catch (Exception e) { + LOG.error("couldn't start HBase cluster.", e); systemReady = false; } } @@ -77,6 +85,27 @@ public abstract class CamelHBaseTestSupport extends CamelTestSupport { } } + @Before + public void setUp() throws Exception { + if (systemReady) { + try { + hbaseUtil.createTable(HBaseHelper.getHBaseFieldAsBytes(PERSON_TABLE), families); + } catch (TableExistsException ex) { + //Ignore if table exists + } + + super.setUp(); + } + } + + @After + public void tearDown() throws Exception { + if (systemReady) { + hbaseUtil.deleteTable(PERSON_TABLE.getBytes()); + super.tearDown(); + } + } + @Override public CamelContext createCamelContext() throws Exception { CamelContext context = new DefaultCamelContext(createRegistry()); http://git-wip-us.apache.org/repos/asf/camel/blob/fa9b6e5e/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java index d8149c2..deda182 100644 --- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java +++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java @@ -18,43 +18,19 @@ package org.apache.camel.component.hbase; import java.util.HashMap; import java.util.Map; -import org.apache.camel.ProducerTemplate; + import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.hadoop.hbase.TableExistsException; -import org.junit.After; -import org.junit.Before; import org.junit.Test; public class HBaseConsumerTest extends CamelHBaseTestSupport { - @Before - public void setUp() throws Exception { - if (systemReady) { - try { - hbaseUtil.createTable(HBaseHelper.getHBaseFieldAsBytes(PERSON_TABLE), families); - } catch (TableExistsException ex) { - //Ignore if table exists - } - - super.setUp(); - } - } - - @After - public void tearDown() throws Exception { - if (systemReady) { - super.tearDown(); - } - } - @Test public void testPutMultiRowsAndConsume() throws Exception { if (systemReady) { MockEndpoint mockEndpoint = getMockEndpoint("mock:result"); mockEndpoint.expectedMessageCount(3); - ProducerTemplate template = context.createProducerTemplate(); Map<String, Object> headers = new HashMap<String, Object>(); for (int row = 0; row < key.length; row++) { @@ -81,10 +57,10 @@ public class HBaseConsumerTest extends CamelHBaseTestSupport { @Override public void configure() { from("direct:start") - .to("hbase://" + PERSON_TABLE); + .to("hbase://" + PERSON_TABLE); from("hbase://" + PERSON_TABLE) - .to("mock:result"); + .to("mock:result"); } }; } http://git-wip-us.apache.org/repos/asf/camel/blob/fa9b6e5e/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java index ab75381..1313936 100644 --- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java +++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java @@ -18,17 +18,15 @@ package org.apache.camel.component.hbase; import java.util.HashMap; import java.util.Map; + import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.util.IOHelper; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; -import org.junit.After; -import org.junit.Before; import org.junit.Test; public class HBaseConvertionsTest extends CamelHBaseTestSupport { @@ -38,27 +36,6 @@ public class HBaseConvertionsTest extends CamelHBaseTestSupport { protected final String[] column = {"DEFAULTCOLUMN"}; protected final byte[][] families = {INFO_FAMILY.getBytes()}; - @Before - public void setUp() throws Exception { - if (systemReady) { - try { - hbaseUtil.createTable(HBaseHelper.getHBaseFieldAsBytes(PERSON_TABLE), families); - } catch (TableExistsException ex) { - //Ignore if table exists - } - - super.setUp(); - } - } - - @After - public void tearDown() throws Exception { - if (systemReady) { - hbaseUtil.deleteTable(PERSON_TABLE.getBytes()); - super.tearDown(); - } - } - @Test public void testPutMultiRows() throws Exception { if (systemReady) { @@ -111,7 +88,6 @@ public class HBaseConvertionsTest extends CamelHBaseTestSupport { } } - /** * Factory method which derived classes can use to create a {@link org.apache.camel.builder.RouteBuilder} * to define the routes for testing @@ -122,10 +98,10 @@ public class HBaseConvertionsTest extends CamelHBaseTestSupport { @Override public void configure() { from("direct:start") - .to("hbase://" + PERSON_TABLE); + .to("hbase://" + PERSON_TABLE); from("direct:scan") - .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2&family=family1&qualifier=column1"); + .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2&family=family1&qualifier=column1"); } }; } http://git-wip-us.apache.org/repos/asf/camel/blob/fa9b6e5e/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java index 6a259b1..c66b993 100644 --- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java +++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; + import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; @@ -27,41 +28,16 @@ import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.util.IOHelper; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; -import org.junit.After; -import org.junit.Before; import org.junit.Test; public class HBaseProducerTest extends CamelHBaseTestSupport { - @Before - public void setUp() throws Exception { - if (systemReady) { - try { - hbaseUtil.createTable(HBaseHelper.getHBaseFieldAsBytes(PERSON_TABLE), families); - } catch (TableExistsException ex) { - //Ignore if table exists - } - - super.setUp(); - } - } - - @After - public void tearDown() throws Exception { - if (systemReady) { - hbaseUtil.deleteTable(PERSON_TABLE.getBytes()); - super.tearDown(); - } - } - @Test public void testPut() throws Exception { if (systemReady) { - ProducerTemplate template = context.createProducerTemplate(); Map<String, Object> headers = new HashMap<String, Object>(); headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]); headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]); @@ -83,7 +59,6 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { } } - @Test public void testPutAndGet() throws Exception { testPut(); @@ -132,7 +107,6 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { } } - @Test public void testPutMultiRows() throws Exception { if (systemReady) { @@ -185,7 +159,6 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { } } - @Test public void testPutMultiColumns() throws Exception { if (systemReady) { @@ -217,7 +190,6 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { } } - @Test public void testPutAndGetMultiColumns() throws Exception { testPutMultiColumns(); @@ -239,7 +211,6 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { } } - @Test public void testPutAndGetAndDeleteMultiRows() throws Exception { testPutMultiRows(); @@ -297,13 +268,13 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { @Override public void configure() { from("direct:start") - .to("hbase://" + PERSON_TABLE); + .to("hbase://" + PERSON_TABLE); from("direct:start-with-model") - .to("hbase://" + PERSON_TABLE + "?family=info&qualifier=firstName&family2=birthdate&qualifier2=year"); + .to("hbase://" + PERSON_TABLE + "?family=info&qualifier=firstName&family2=birthdate&qualifier2=year"); from("direct:scan") - .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2"); + .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2"); } }; } http://git-wip-us.apache.org/repos/asf/camel/blob/fa9b6e5e/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java index 20e3766..352c59c 100644 --- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java +++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java @@ -69,6 +69,9 @@ public class HBaseIdempotentRepositoryTest extends CamelHBaseTestSupport { // try to add an other one assertTrue(repository.add(key02)); assertTrue(repository.contains(key02)); + + // try to add the first key again + assertFalse(repository.add(key01)); } } @@ -101,6 +104,14 @@ public class HBaseIdempotentRepositoryTest extends CamelHBaseTestSupport { } @Test + public void testConfirm() throws Exception { + if (systemReady) { + // it always return true + assertTrue(repository.confirm(key01)); + } + } + + @Test public void testRepositoryInRoute() throws Exception { if (systemReady) { MockEndpoint mock = (MockEndpoint) context.getEndpoint("mock:out"); @@ -127,10 +138,9 @@ public class HBaseIdempotentRepositoryTest extends CamelHBaseTestSupport { @Override public void configure() throws Exception { from("direct:in") - .idempotentConsumer(header("messageId"), repository) - .to("mock:out"); + .idempotentConsumer(header("messageId"), repository) + .to("mock:out"); } }; } - }