Repository: flume Updated Branches: refs/heads/trunk 09472ba12 -> 0cba73698
FLUME-2397: HBase-98 compatibility (Hari Shreedharan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/0cba7369 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/0cba7369 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/0cba7369 Branch: refs/heads/trunk Commit: 0cba73698dbba6b78d0a2cd7b469f4377723470a Parents: 09472ba Author: Jarek Jarcec Cecho <[email protected]> Authored: Thu Jun 5 09:28:02 2014 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Thu Jun 5 09:28:02 2014 -0700 ---------------------------------------------------------------------- flume-ng-sinks/flume-hdfs-sink/pom.xml | 30 +++ flume-ng-sinks/flume-ng-hbase-sink/pom.xml | 114 ++++++++++-- .../apache/flume/sink/hbase/AsyncHBaseSink.java | 28 ++- .../flume/sink/hbase/TestAsyncHBaseSink.java | 1 + pom.xml | 182 +++++++++++++++++-- 5 files changed, 322 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/0cba7369/flume-ng-sinks/flume-hdfs-sink/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/pom.xml b/flume-ng-sinks/flume-hdfs-sink/pom.xml index e0760ae..83f8bec 100644 --- a/flume-ng-sinks/flume-hdfs-sink/pom.xml +++ b/flume-ng-sinks/flume-hdfs-sink/pom.xml @@ -161,6 +161,36 @@ limitations under the License. </dependencies> </profile> + <profile> + <id>hbase-98</id> + <activation> + <property> + <name>hadoop.profile</name> + <value>hbase-98</value> + </property> + </activation> + <dependencies> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + </profile> </profiles> </project> http://git-wip-us.apache.org/repos/asf/flume/blob/0cba7369/flume-ng-sinks/flume-ng-hbase-sink/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/pom.xml b/flume-ng-sinks/flume-ng-hbase-sink/pom.xml index ddb1163..cc2bbee 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/pom.xml +++ b/flume-ng-sinks/flume-ng-hbase-sink/pom.xml @@ -56,18 +56,6 @@ <artifactId>guava</artifactId> </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase</artifactId> - <optional>true</optional> - </dependency> - - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase</artifactId> - <classifier>tests</classifier> - <scope>test</scope> - </dependency> <dependency> <groupId>org.hbase</groupId> @@ -142,6 +130,24 @@ <artifactId>jersey-core</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase</artifactId> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <scope>test</scope> + </dependency> </dependencies> </profile> <profile> @@ -158,6 +164,90 @@ <artifactId>hadoop-minicluster</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase</artifactId> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + </profile> + <profile> + <id>hbase-98</id> + <activation> + <property> + <name>hadoop.profile</name> + <value>hbase-98</value> + </property> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <!-- There should be no need for Flume to include the following two + artifacts, but HBase pom has a bug which causes these to not get + pulled in. So we have to pull it in. Ideally this should be optional, + but making it optional causes build to fail. + --> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> + <optional>true</optional> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-testing-util</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <scope>test</scope> + </dependency> </dependencies> </profile> </profiles> http://git-wip-us.apache.org/repos/asf/flume/blob/0cba7369/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java index 2d03271..1666be4 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java @@ -47,6 +47,8 @@ import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.hbase.async.AtomicIncrementRequest; import org.hbase.async.HBaseClient; import org.hbase.async.PutRequest; +import org.jboss.netty.channel.socket.nio + .NioClientSocketChannelFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -409,13 +411,17 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { + "before calling start on an old instance."); sinkCounter.start(); sinkCounter.incrementConnectionCreatedCount(); - if (!isTimeoutTest) { sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder() .setNameFormat(this.getName() + " HBase Call Pool").build()); + logger.info("Callback pool created"); + if(!isTimeoutTest) { + client = new HBaseClient(zkQuorum, zkBaseDir, sinkCallbackPool); } else { - sinkCallbackPool = Executors.newSingleThreadExecutor(); + client = new HBaseClient(zkQuorum, zkBaseDir, + new NioClientSocketChannelFactory(Executors + .newSingleThreadExecutor(), + Executors.newSingleThreadExecutor())); } - client = new HBaseClient(zkQuorum, zkBaseDir, sinkCallbackPool); final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean fail = new AtomicBoolean(false); client.ensureTableFamilyExists( @@ -424,6 +430,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { @Override public Object call(Object arg) throws Exception { latch.countDown(); + logger.info("table found"); return null; } }, @@ -437,7 +444,9 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { }); try { + logger.info("waiting on callback"); latch.await(); + logger.info("callback received"); } catch (InterruptedException e) { sinkCounter.incrementConnectionFailedCount(); throw new FlumeException( @@ -465,15 +474,20 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { } sinkCounter.incrementConnectionClosedCount(); sinkCounter.stop(); - sinkCallbackPool.shutdown(); + try { - if(!sinkCallbackPool.awaitTermination(5, TimeUnit.SECONDS)) { - sinkCallbackPool.shutdownNow(); + if (sinkCallbackPool != null) { + sinkCallbackPool.shutdown(); + if (!sinkCallbackPool.awaitTermination(5, TimeUnit.SECONDS)) { + sinkCallbackPool.shutdownNow(); + } } } catch (InterruptedException e) { logger.error("Interrupted while waiting for asynchbase sink pool to " + "die", e); - sinkCallbackPool.shutdownNow(); + if (sinkCallbackPool != null) { + sinkCallbackPool.shutdownNow(); + } } sinkCallbackPool = null; client = null; http://git-wip-us.apache.org/repos/asf/flume/blob/0cba7369/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java index ccbc086..af90f99 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java @@ -209,6 +209,7 @@ public class TestAsyncHBaseSink { Channel channel = new MemoryChannel(); Configurables.configure(channel, ctx); sink.setChannel(channel); + channel.start(); sink.start(); Transaction tx = channel.getTransaction(); tx.begin(); http://git-wip-us.apache.org/repos/asf/flume/blob/0cba7369/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d4b7660..5d31d4c 100644 --- a/pom.xml +++ b/pom.xml @@ -97,6 +97,25 @@ limitations under the License. <artifactId>hadoop-test</artifactId> <version>${hadoop.version}</version> </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase</artifactId> + <version>${hbase.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase</artifactId> + <version>${hbase.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>3.4.5</version> + <scope>test</scope> + </dependency> </dependencies> </dependencyManagement> </profile> @@ -148,6 +167,27 @@ limitations under the License. <version>${hadoop.version}</version> </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase</artifactId> + <version>${hbase.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase</artifactId> + <version>${hbase.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>3.4.5</version> + <scope>test</scope> + </dependency> + <!-- only compatible with hadoop-2 --> <dependency> <groupId>org.apache.flume.flume-ng-sinks</groupId> @@ -159,6 +199,134 @@ limitations under the License. </profile> <profile> + <id>hbase-98</id> + <activation> + <property> + <name>hadoop.profile</name> + <value>hbase-98</value> + </property> + </activation> + <properties> + <hadoop.version>${hadoop2.version}</hadoop.version> + <hbase.version>0.98.2-hadoop2</hbase.version> + <hadoop.common.artifact.id>hadoop-common</hadoop.common.artifact.id> + <thrift.version>0.8.0</thrift.version> + </properties> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>${hadoop.common.artifact.id}</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <version>${hadoop.version}</version> + <scope>test</scope> + </dependency> + + <!-- Ideally this should be optional, but making it optional causes + build to fail. + --> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <version>${hbase.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <version>${hbase.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-testing-util</artifactId> + <version>${hbase.version}</version> + </dependency> + + <!-- There should be no need for Flume to include the following two + artifacts, but HBase pom has a bug which causes these to not get + pulled in. So we have to pull it in. Ideally this should be optional, + but making it optional causes build to fail. + --> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> + <version>${hbase.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> + <version>${hbase.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${hbase.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${hbase.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>3.4.5</version> + <scope>test</scope> + </dependency> + + <!-- only compatible with hadoop-2 --> + <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-dataset-sink</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + </profile> + + <profile> <id>compileThriftLegacy</id> <activation> <activeByDefault>false</activeByDefault> @@ -859,20 +1027,6 @@ limitations under the License. </dependency> <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase</artifactId> - <version>${hbase.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase</artifactId> - <version>${hbase.version}</version> - <classifier>tests</classifier> - <scope>test</scope> - </dependency> - - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop</artifactId> <version>${hadoop.version}</version>
