PHOENIX-3214 Kafka Phoenix Consumer (Kalyan Hadoop)

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/afdcca5c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/afdcca5c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/afdcca5c

Branch: refs/heads/encodecolumns2
Commit: afdcca5ccadcd4f609433491b2da5ea322f196e2
Parents: b2ebe1f
Author: Josh Mahonin <jmaho...@gmail.com>
Authored: Sun Feb 5 12:12:52 2017 -0500
Committer: Josh Mahonin <jmaho...@gmail.com>
Committed: Mon Feb 6 09:33:25 2017 -0500

----------------------------------------------------------------------
 phoenix-assembly/pom.xml                        |   4 +
 .../src/build/components/all-common-jars.xml    |   8 +
 phoenix-kafka/pom.xml                           | 435 +++++++++++++++++++
 .../apache/phoenix/kafka/PhoenixConsumerIT.java | 276 ++++++++++++
 phoenix-kafka/src/it/resources/consumer.props   |  12 +
 phoenix-kafka/src/it/resources/producer.props   |   4 +
 .../apache/phoenix/kafka/KafkaConstants.java    |  52 +++
 .../phoenix/kafka/consumer/PhoenixConsumer.java | 276 ++++++++++++
 .../kafka/consumer/PhoenixConsumerTool.java     | 107 +++++
 pom.xml                                         |   7 +
 10 files changed, 1181 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/afdcca5c/phoenix-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-assembly/pom.xml b/phoenix-assembly/pom.xml
index a221dca..0292926 100644
--- a/phoenix-assembly/pom.xml
+++ b/phoenix-assembly/pom.xml
@@ -108,6 +108,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.phoenix</groupId>
+      <artifactId>phoenix-kafka</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.phoenix</groupId>
       <artifactId>phoenix-pig</artifactId>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/afdcca5c/phoenix-assembly/src/build/components/all-common-jars.xml
----------------------------------------------------------------------
diff --git a/phoenix-assembly/src/build/components/all-common-jars.xml 
b/phoenix-assembly/src/build/components/all-common-jars.xml
index e68016f..3d27b26 100644
--- a/phoenix-assembly/src/build/components/all-common-jars.xml
+++ b/phoenix-assembly/src/build/components/all-common-jars.xml
@@ -99,6 +99,14 @@
       <fileMode>0644</fileMode>
     </fileSet>
     <fileSet>
+      <directory>${project.basedir}/../phoenix-kafka/target/</directory>
+      <outputDirectory>lib</outputDirectory>
+      <includes>
+        <include>phoenix-*.jar</include>
+      </includes>
+      <fileMode>0644</fileMode>
+    </fileSet>
+    <fileSet>
       <directory>${project.basedir}/../phoenix-core/target/</directory>
       <outputDirectory>lib</outputDirectory>
       <includes>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/afdcca5c/phoenix-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-kafka/pom.xml b/phoenix-kafka/pom.xml
new file mode 100644
index 0000000..042f54c
--- /dev/null
+++ b/phoenix-kafka/pom.xml
@@ -0,0 +1,435 @@
+<?xml version='1.0'?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+       <parent>
+               <groupId>org.apache.phoenix</groupId>
+               <artifactId>phoenix</artifactId>
+               <version>4.10.0-HBase-1.2-SNAPSHOT</version>
+       </parent>
+       <artifactId>phoenix-kafka</artifactId>
+       <name>Phoenix - Kafka</name>
+
+       <licenses>
+               <license>
+                       <name>The Apache Software License, Version 2.0</name>
+                       
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+                       <distribution>repo</distribution>
+                       <comments />
+               </license>
+       </licenses>
+
+       <organization>
+               <name>Apache Software Foundation</name>
+               <url>http://www.apache.org</url>
+       </organization>
+
+       <properties>
+               <top.dir>${project.basedir}/..</top.dir>
+       </properties>
+
+       <dependencies>
+               <!-- Transaction dependencies -->
+               <dependency>
+                       <groupId>org.apache.tephra</groupId>
+                       <artifactId>tephra-api</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.tephra</groupId>
+                       <artifactId>tephra-core</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.tephra</groupId>
+                       <artifactId>tephra-core</artifactId>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.tephra</groupId>
+                       <artifactId>tephra-hbase-compat-1.1</artifactId>
+               </dependency>
+
+               <!-- Make sure we have all the antlr dependencies -->
+               <dependency>
+                       <groupId>org.antlr</groupId>
+                       <artifactId>antlr-runtime</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>jline</groupId>
+                       <artifactId>jline</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>sqlline</groupId>
+                       <artifactId>sqlline</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>com.google.guava</groupId>
+                       <artifactId>guava</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>joda-time</groupId>
+                       <artifactId>joda-time</artifactId>
+               </dependency>
+               <!-- JSR-305 and jcip-annotations -->
+               <dependency>
+                       <groupId>com.github.stephenc.findbugs</groupId>
+                       <artifactId>findbugs-annotations</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>com.github.stephenc.jcip</groupId>
+                       <artifactId>jcip-annotations</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.codehaus.jackson</groupId>
+                       <artifactId>jackson-core-asl</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.codehaus.jackson</groupId>
+                       <artifactId>jackson-mapper-asl</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.codehaus.jackson</groupId>
+                       <artifactId>jackson-jaxrs</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.codehaus.jackson</groupId>
+                       <artifactId>jackson-xc</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>junit</groupId>
+                       <artifactId>junit</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.mockito</groupId>
+                       <artifactId>mockito-all</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>com.google.protobuf</groupId>
+                       <artifactId>protobuf-java</artifactId>
+                       <version>${protobuf-java.version}</version>
+               </dependency>
+               <!-- Intentionally avoid an dependencyManagement entry because 
of conflict with thin-client -->
+               <dependency>
+                       <groupId>org.apache.httpcomponents</groupId>
+                       <artifactId>httpclient</artifactId>
+                       <version>4.0.1</version>
+               </dependency>
+               <dependency>
+                       <groupId>log4j</groupId>
+                       <artifactId>log4j</artifactId>
+                       <version>${log4j.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>slf4j-api</artifactId>
+                       <version>${slf4j.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.iq80.snappy</groupId>
+                       <artifactId>snappy</artifactId>
+                       <version>${snappy.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>com.github.stephenc.high-scale-lib</groupId>
+                       <artifactId>high-scale-lib</artifactId>
+                       <version>1.1.1</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>com.yammer.metrics</groupId>
+                       <artifactId>metrics-core</artifactId>
+                       <version>2.1.2</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>com.lmax</groupId>
+                       <artifactId>disruptor</artifactId>
+                       <version>3.2.0</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.htrace</groupId>
+                       <artifactId>htrace-core</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>io.netty</groupId>
+                       <artifactId>netty-all</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>commons-codec</groupId>
+                       <artifactId>commons-codec</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>commons-collections</groupId>
+                       <artifactId>commons-collections</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.commons</groupId>
+                       <artifactId>commons-csv</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>com.google.code.findbugs</groupId>
+                       <artifactId>jsr305</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-testing-util</artifactId>
+                       <scope>test</scope>
+                       <optional>true</optional>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-it</artifactId>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-annotations</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-common</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-common</artifactId>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-protocol</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-client</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-server</artifactId>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>xom</groupId>
+                                       <artifactId>xom</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-server</artifactId>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-hadoop-compat</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-hadoop-compat</artifactId>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-hadoop2-compat</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-hadoop2-compat</artifactId>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-common</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-annotations</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-mapreduce-client-core</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-minicluster</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-minikdc</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.jruby.joni</groupId>
+                       <artifactId>joni</artifactId>
+                       <version>${joni.version}</version>
+               </dependency>
+
+               <!-- To work with kafka with phoenix -->
+               <dependency>
+                       <groupId>org.apache.phoenix</groupId>
+                       <artifactId>phoenix-core</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.phoenix</groupId>
+                       <artifactId>phoenix-core</artifactId>
+                       <classifier>tests</classifier>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka_2.11</artifactId>
+                       <version>${kafka.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka-clients</artifactId>
+                       <version>${kafka.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka_2.11</artifactId>
+                       <version>${kafka.version}</version>
+                       <classifier>test</classifier>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka-clients</artifactId>
+                       <version>${kafka.version}</version>
+                       <classifier>test</classifier>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka-tools</artifactId>
+                       <version>${kafka.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.phoenix</groupId>
+                       <artifactId>phoenix-flume</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flume</groupId>
+                       <artifactId>flume-ng-core</artifactId>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <!-- Add the ant-generated sources to the source path 
-->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-site-plugin</artifactId>
+                               <version>3.2</version>
+                               <dependencies>
+                                       <dependency>
+                                               
<groupId>org.apache.maven.doxia</groupId>
+                                               
<artifactId>doxia-module-markdown</artifactId>
+                                               <version>1.3</version>
+                                       </dependency>
+                                       <dependency>
+                                               
<groupId>lt.velykis.maven.skins</groupId>
+                                               
<artifactId>reflow-velocity-tools</artifactId>
+                                               <version>1.0.0</version>
+                                       </dependency>
+                                       <dependency>
+                                               
<groupId>org.apache.velocity</groupId>
+                                               
<artifactId>velocity</artifactId>
+                                               <version>1.7</version>
+                                       </dependency>
+                               </dependencies>
+                               <configuration>
+                                       <reportPlugins>
+                                               <plugin>
+                                                       
<groupId>org.codehaus.mojo</groupId>
+                                                       
<artifactId>findbugs-maven-plugin</artifactId>
+                                                       <version>2.5.2</version>
+                                               </plugin>
+                                       </reportPlugins>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Setup eclipse -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-eclipse-plugin</artifactId>
+                               <configuration>
+                                       <buildcommands>
+                                               
<buildcommand>org.jamon.project.templateBuilder</buildcommand>
+                                               
<buildcommand>org.eclipse.jdt.core.javabuilder</buildcommand>
+                                       </buildcommands>
+                               </configuration>
+                       </plugin>
+
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               
<artifactId>build-helper-maven-plugin</artifactId>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-failsafe-plugin</artifactId>
+                       </plugin>
+                       <plugin>
+                               <artifactId>maven-dependency-plugin</artifactId>
+                               
<version>${maven-dependency-plugin.version}</version>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-resources-plugin</artifactId>
+                       </plugin>
+
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<finalName>phoenix-kafka-${project.version}-minimal</finalName>
+                                                       
<shadedArtifactAttached>false</shadedArtifactAttached>
+                                                       
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+                                                       
<shadeTestJar>false</shadeTestJar>
+                                                       <artifactSet>
+                                                               <includes>
+                                                                       
<include>org.apache.phoenix:phoenix-kafka</include>
+                                                                       
<include>org.apache.kafka:kafka-clients</include>
+                                                                       
<include>org.apache.phoenix:phoenix-flume</include>
+                                                               </includes>
+                                                       </artifactSet>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/afdcca5c/phoenix-kafka/src/it/java/org/apache/phoenix/kafka/PhoenixConsumerIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-kafka/src/it/java/org/apache/phoenix/kafka/PhoenixConsumerIT.java 
b/phoenix-kafka/src/it/java/org/apache/phoenix/kafka/PhoenixConsumerIT.java
new file mode 100644
index 0000000..cfec391
--- /dev/null
+++ b/phoenix-kafka/src/it/java/org/apache/phoenix/kafka/PhoenixConsumerIT.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.kafka;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.flume.Context;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.flume.DefaultKeyGenerator;
+import org.apache.phoenix.flume.FlumeConstants;
+import org.apache.phoenix.flume.serializer.EventSerializers;
+import org.apache.phoenix.kafka.consumer.PhoenixConsumer;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.Resources;
+
+import kafka.admin.AdminUtils;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.Time;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+
+public class PhoenixConsumerIT extends BaseHBaseManagedTimeIT {
+    private static final String ZKHOST = "127.0.0.1";
+    private static final String BROKERHOST = "127.0.0.1";
+    private static final String BROKERPORT = "9092";
+    private static final String TOPIC = "topic1";
+    private KafkaServer kafkaServer;
+    private PhoenixConsumer pConsumer;
+    private EmbeddedZookeeper zkServer;
+    private ZkClient zkClient;
+    private Connection conn;
+
+    @Before
+    public void setUp() throws IOException, SQLException {
+        // setup Zookeeper
+        zkServer = new EmbeddedZookeeper();
+        String zkConnect = ZKHOST + ":" + zkServer.port();
+        zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
+        ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
+
+        // setup Broker
+        Properties brokerProps = new Properties();
+        brokerProps.setProperty("zookeeper.connect", zkConnect);
+        brokerProps.setProperty("broker.id", "0");
+        brokerProps.setProperty("log.dirs",
+            Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" 
+ BROKERPORT);
+        KafkaConfig config = new KafkaConfig(brokerProps);
+        Time mock = new MockTime();
+        kafkaServer = TestUtils.createServer(config, mock);
+        kafkaServer.startup();
+
+        // create topic
+        AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties());
+
+        pConsumer = new PhoenixConsumer();
+        
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        conn = DriverManager.getConnection(getUrl(), props);
+    }
+
+    @Test
+    public void testPhoenixConsumerWithFile() throws SQLException {
+        String consumerPath = "consumer.props";
+        PhoenixConsumerThread pConsumerThread = new 
PhoenixConsumerThread(pConsumer, consumerPath);
+        pConsumerThread.properties.setProperty(FlumeConstants.CONFIG_JDBC_URL, 
getUrl());
+        Thread phoenixConsumer = new Thread(pConsumerThread);
+
+        String producerPath = "producer.props";
+        KafkaProducerThread kProducerThread = new 
KafkaProducerThread(producerPath, TOPIC);
+        Thread kafkaProducer = new Thread(kProducerThread);
+
+        phoenixConsumer.start();
+
+        try {
+            phoenixConsumer.join(10000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        kafkaProducer.start();
+
+        try {
+            kafkaProducer.join();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        if (!kafkaProducer.isAlive()) {
+            System.out.println("kafka producer is not alive");
+            pConsumer.stop();
+        }
+        
+        // Verify our serializer wrote out data
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM 
SAMPLE1");
+        assertTrue(rs.next());
+        assertTrue(rs.getFetchSize() > 0);
+        rs.close();
+    }
+    
+    @Test
+    public void testPhoenixConsumerWithProperties() throws SQLException {
+        
+        final String fullTableName = "SAMPLE2";
+        final String ddl = "CREATE TABLE IF NOT EXISTS SAMPLE2(uid VARCHAR NOT 
NULL,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR CONSTRAINT pk PRIMARY KEY(uid))\n";
+        
+        Properties consumerProperties = new Properties();
+        consumerProperties.setProperty(FlumeConstants.CONFIG_TABLE, 
fullTableName);
+        consumerProperties.setProperty(FlumeConstants.CONFIG_JDBC_URL, 
getUrl());
+        
consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name());
+        consumerProperties.setProperty(FlumeConstants.CONFIG_TABLE_DDL, ddl);
+        consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CONFIG_REGULAR_EXPRESSION,"([^\\,]*),([^\\,]*),([^\\,]*)");
+        consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CONFIG_COLUMN_NAMES,"c1,c2,c3");
+        consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR, DefaultKeyGenerator.UUID.name());
+        consumerProperties.setProperty(KafkaConstants.BOOTSTRAP_SERVERS, 
"localhost:9092");
+        consumerProperties.setProperty(KafkaConstants.TOPICS, "topic1,topic2");
+        consumerProperties.setProperty(KafkaConstants.TIMEOUT, "100");
+        
+        PhoenixConsumerThread pConsumerThread = new 
PhoenixConsumerThread(pConsumer, consumerProperties);
+        Thread phoenixConsumer = new Thread(pConsumerThread);
+
+        Properties producerProperties = new Properties();
+        producerProperties.setProperty(KafkaConstants.BOOTSTRAP_SERVERS, 
"localhost:9092");
+        producerProperties.setProperty(KafkaConstants.KEY_SERIALIZER, 
KafkaConstants.DEFAULT_KEY_SERIALIZER);
+        producerProperties.setProperty(KafkaConstants.VALUE_SERIALIZER, 
KafkaConstants.DEFAULT_VALUE_SERIALIZER);
+        producerProperties.setProperty("auto.commit.interval.ms", "1000");
+        
+        KafkaProducerThread kProducerThread = new 
KafkaProducerThread(producerProperties, TOPIC);
+        Thread kafkaProducer = new Thread(kProducerThread);
+
+        phoenixConsumer.start();
+
+        try {
+            phoenixConsumer.join(10000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        kafkaProducer.start();
+
+        try {
+            kafkaProducer.join();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        if (!kafkaProducer.isAlive()) {
+            System.out.println("kafka producer is not alive");
+            pConsumer.stop();
+        }
+        
+        // Verify our serializer wrote out data
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM 
SAMPLE2");
+        assertTrue(rs.next());
+        assertTrue(rs.getFetchSize() > 0);
+        rs.close();
+    }
+
+    @After
+    public void cleanUp() throws Exception {
+        kafkaServer.shutdown();
+        zkClient.close();
+        zkServer.shutdown();
+        conn.close();
+    }
+
+    class PhoenixConsumerThread implements Runnable {
+        PhoenixConsumer pConsumer;
+        Properties properties;
+
+        PhoenixConsumerThread(PhoenixConsumer pConsumer, String path) {
+            this.pConsumer = pConsumer;
+            try (InputStream props = Resources.getResource(path).openStream()) 
{
+                Properties properties = new Properties();
+                properties.load(props);
+                this.properties = properties;
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+        
+        PhoenixConsumerThread(PhoenixConsumer pConsumer, Properties 
properties) {
+            this.pConsumer = pConsumer;
+            this.properties = properties;
+        }
+
+        @Override
+        public void run() {
+            // intialize the kafka
+            pConsumer.intializeKafka(properties);
+
+            // configure the phoenix
+            Context context = pConsumer.prepareContext();
+            pConsumer.configure(context);
+
+            // start the kafka consumer
+            pConsumer.start();
+
+            // process kafka messages
+            pConsumer.process();
+        }
+    }
+
+    class KafkaProducerThread implements Runnable {
+        KafkaProducer<String, String> producer;
+        String topic;
+
+        KafkaProducerThread(String path, String topic) {
+            this.topic = topic;
+            try (InputStream props = Resources.getResource(path).openStream()) 
{
+                Properties properties = new Properties();
+                properties.load(props);
+                producer = new KafkaProducer<>(properties);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+
+        KafkaProducerThread(Properties properties, String topic) {
+            this.topic = topic;
+            producer = new KafkaProducer<>(properties);
+        }
+
+        @Override
+        public void run() {
+            try {
+                for (int i = 1; i <= 10; i++) {
+                    String message = String.format("%s,%.3f,%d", "msg" + i, i 
* 2000f, i);
+                    producer.send(new ProducerRecord<String, String>(topic, 
message));
+                    producer.flush();
+                    Thread.sleep(100);
+                }
+            } catch (Throwable throwable) {
+                System.out.printf("%s", throwable.fillInStackTrace());
+            } finally {
+                producer.close();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/afdcca5c/phoenix-kafka/src/it/resources/consumer.props
----------------------------------------------------------------------
diff --git a/phoenix-kafka/src/it/resources/consumer.props 
b/phoenix-kafka/src/it/resources/consumer.props
new file mode 100644
index 0000000..eb490d7
--- /dev/null
+++ b/phoenix-kafka/src/it/resources/consumer.props
@@ -0,0 +1,12 @@
+serializer=regex
+serializer.rowkeyType=uuid
+serializer.regex=([^\,]*),([^\,]*),([^\,]*)
+serializer.columns=c1,c2,c3
+
+jdbcUrl=jdbc:phoenix:localhost
+table=SAMPLE1
+ddl=CREATE TABLE IF NOT EXISTS SAMPLE1(uid VARCHAR NOT NULL,c1 VARCHAR,c2 
VARCHAR,c3 VARCHAR CONSTRAINT pk PRIMARY KEY(uid))
+
+bootstrap.servers=localhost:9092
+topics=topic1,topic2
+poll.timeout.ms=100
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/afdcca5c/phoenix-kafka/src/it/resources/producer.props
----------------------------------------------------------------------
diff --git a/phoenix-kafka/src/it/resources/producer.props 
b/phoenix-kafka/src/it/resources/producer.props
new file mode 100644
index 0000000..31e7caa
--- /dev/null
+++ b/phoenix-kafka/src/it/resources/producer.props
@@ -0,0 +1,4 @@
+bootstrap.servers=localhost:9092
+auto.commit.interval.ms=1000
+key.serializer=org.apache.kafka.common.serialization.StringSerializer
+value.serializer=org.apache.kafka.common.serialization.StringSerializer
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/afdcca5c/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/KafkaConstants.java
----------------------------------------------------------------------
diff --git 
a/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/KafkaConstants.java 
b/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/KafkaConstants.java
new file mode 100644
index 0000000..cc1aa61
--- /dev/null
+++ b/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/KafkaConstants.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+public final class KafkaConstants {
+
+    public static final String BOOTSTRAP_SERVERS = 
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+    
+    public static final String KEY_SERIALIZER = 
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+
+    public static final String VALUE_SERIALIZER = 
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+
+    public static final String DEFAULT_KEY_SERIALIZER = 
StringSerializer.class.getName();
+
+    public static final String DEFAULT_VALUE_SERIALIZER = 
StringSerializer.class.getName();
+    
+    public static final String KEY_DESERIALIZER = 
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+
+    public static final String VALUE_DESERIALIZER = 
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+    public static final String DEFAULT_KEY_DESERIALIZER = 
StringDeserializer.class.getName();
+
+    public static final String DEFAULT_VALUE_DESERIALIZER = 
StringDeserializer.class.getName();
+
+    public static final String TOPICS = "topics";
+
+    public static final String GROUP_ID = ConsumerConfig.GROUP_ID_CONFIG;
+    
+    public static final String TIMEOUT = "poll.timeout.ms";
+    
+    public static final long DEFAULT_TIMEOUT = 100;
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/afdcca5c/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumer.java
 
b/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumer.java
new file mode 100644
index 0000000..1759cec
--- /dev/null
+++ 
b/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumer.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.kafka.consumer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.phoenix.flume.FlumeConstants;
+import org.apache.phoenix.flume.serializer.EventSerializer;
+import org.apache.phoenix.flume.serializer.EventSerializers;
+import org.apache.phoenix.kafka.KafkaConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+
+public class PhoenixConsumer {
+    private static final Logger logger = 
LoggerFactory.getLogger(PhoenixConsumer.class);
+
+    private KafkaConsumer<String, String> consumer = null;
+    private Properties properties = new Properties();
+    private Integer batchSize;
+    private long timeout;
+    private EventSerializer serializer;
+    private Boolean process = true;
+    
+    public PhoenixConsumer() {
+
+    }
+    
+    public PhoenixConsumer(Configuration conf) throws IOException {
+        // intialize the kafka
+        intializeKafka(conf);
+
+        // configure the phoenix
+        Context context = prepareContext();
+        configure(context);
+
+        // start the kafka consumer
+        start();
+
+        // process kafka messages
+        process();
+    }
+    
+    /**
+     * Initializes the kafka with properties file.
+     * @param path
+     * @throws IOException 
+     */
+    public void intializeKafka(Configuration conf) throws IOException {
+       // get the kafka consumer file
+       String file = conf.get("kafka.consumer.file");
+        Preconditions.checkNotNull(file,"File path cannot be empty, please 
specify in the arguments");
+        
+        Path path = new Path(file);
+        FileSystem fs = FileSystem.get(conf);
+        try (InputStream props = fs.open(path)) {
+            properties.load(props);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        
+        intializeKafka(properties);
+    }
+    
+    /**
+     * Initializes the kafka with properties.
+     * @param properties
+     */
+    public void intializeKafka(Properties properties) {
+        this.properties = properties;
+        
+        String servers = 
properties.getProperty(KafkaConstants.BOOTSTRAP_SERVERS);
+        Preconditions.checkNotNull(servers,"Bootstrap Servers cannot be empty, 
please specify in the configuration file");
+        properties.setProperty(KafkaConstants.BOOTSTRAP_SERVERS, servers);
+        
+        if (properties.getProperty(KafkaConstants.GROUP_ID) == null) {
+            properties.setProperty(KafkaConstants.GROUP_ID, "group-" + new 
Random().nextInt(100000));
+        }
+        
+        if (properties.getProperty(KafkaConstants.TIMEOUT) == null) {
+            properties.setProperty(KafkaConstants.TIMEOUT, 
String.valueOf(KafkaConstants.DEFAULT_TIMEOUT));
+        }
+
+        String topics = properties.getProperty(KafkaConstants.TOPICS);
+        Preconditions.checkNotNull(topics,"Topics cannot be empty, please 
specify in the configuration file");
+        
+        properties.setProperty(KafkaConstants.KEY_DESERIALIZER, 
KafkaConstants.DEFAULT_KEY_DESERIALIZER);
+        
+        properties.setProperty(KafkaConstants.VALUE_DESERIALIZER, 
KafkaConstants.DEFAULT_VALUE_DESERIALIZER);
+        
+        this.consumer = new KafkaConsumer<>(properties);
+        consumer.subscribe(Arrays.asList(topics.split(",")));
+    }
+  
+    /**
+     * Convert the properties to context
+     */
+    public Context prepareContext() {
+        Map<String, String> map = new HashMap<String, String>();
+        for (Entry<Object, Object> entry : properties.entrySet()) {
+            map.put((String) entry.getKey(), (String) entry.getValue());
+        }
+        return new Context(map);
+    }
+
+    /**
+     * Configure the context
+     */
+    public void configure(Context context){
+        this.timeout = context.getLong(KafkaConstants.TIMEOUT, 
KafkaConstants.DEFAULT_TIMEOUT);
+        this.batchSize = context.getInteger(FlumeConstants.CONFIG_BATCHSIZE, 
FlumeConstants.DEFAULT_BATCH_SIZE);
+        final String eventSerializerType = 
context.getString(FlumeConstants.CONFIG_SERIALIZER);
+        
+        Preconditions.checkNotNull(eventSerializerType,"Event serializer 
cannot be empty, please specify in the configuration file");
+        initializeSerializer(context,eventSerializerType);
+    }
+    
+    /**
+     * Process the kafka messages
+     */
+    public void process() {
+        int timeouts = 0;
+        // noinspection InfiniteLoopStatement
+        while (process) {
+            // read records with a short timeout.
+            // If we time out, we don't really care.
+            // Assuming only key & value text data
+            ConsumerRecords<String, String> records = 
consumer.poll(this.timeout);
+            if (records.count() == 0) {
+                timeouts++;
+            } else {
+                System.out.printf("Got %d records after %d timeouts\n", 
records.count(), timeouts);
+                timeouts = 0;
+            }
+
+            if (!records.isEmpty()) {
+                List<Event> events = 
Lists.newArrayListWithCapacity(records.count());
+                for (ConsumerRecord<String, String> record : records) {
+                    Event event = 
EventBuilder.withBody(Bytes.toBytes(record.value()));
+                    events.add(event);
+                }
+                // save to Hbase
+                try {
+                    serializer.upsertEvents(events);
+                } catch (SQLException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    /**
+     * start the serializer
+     */
+    public void start() {
+        logger.info("Starting consumer {} ", this.getClass());
+        try {
+            serializer.initialize();
+        } catch (Exception ex) {
+            logger.error("Error {} in initializing the serializer.", 
ex.getMessage());
+            Throwables.propagate(ex);
+        }
+    }
+
+    /**
+     * stop the consumer & serializer
+     */
+    public void stop() {
+        this.close();
+        consumer.close();
+        try {
+            serializer.close();
+        } catch (SQLException e) {
+            logger.error(" Error while closing connection {} for consumer.", 
e.getMessage());
+        }
+    }
+    
+    /**
+     * make the changes to stop in gracefully
+     */
+    public void close(){
+        this.process = false;
+        try {
+            Thread.sleep(30000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * Initializes the serializer for kafka messages.
+     * @param context
+     * @param eventSerializerType
+     */
+    private void initializeSerializer(final Context context, final String 
eventSerializerType) {
+        String serializerClazz = null;
+        EventSerializers eventSerializer = null;
+
+        try {
+            eventSerializer = 
EventSerializers.valueOf(eventSerializerType.toUpperCase());
+        } catch (IllegalArgumentException iae) {
+            serializerClazz = eventSerializerType;
+        }
+
+        final Context serializerContext = new Context();
+        
serializerContext.putAll(context.getSubProperties(FlumeConstants.CONFIG_SERIALIZER_PREFIX));
+        copyPropertiesToSerializerContext(context,serializerContext);
+        
+        try {
+            @SuppressWarnings("unchecked")
+            Class<? extends EventSerializer> clazz = null;
+            if (serializerClazz == null) {
+                clazz = (Class<? extends EventSerializer>) 
Class.forName(eventSerializer.getClassName());
+            } else {
+                clazz = (Class<? extends EventSerializer>) 
Class.forName(serializerClazz);
+            }
+
+            serializer = clazz.newInstance();
+            serializer.configure(serializerContext);
+        } catch (Exception e) {
+            logger.error("Could not instantiate event serializer.", e);
+            Throwables.propagate(e);
+        }
+    }
+    
+    /**
+     * Copy properties to serializer context.
+     * @param context
+     * @param serializerContext
+     */
+    private void copyPropertiesToSerializerContext(Context context, Context 
serializerContext) {
+        
serializerContext.put(FlumeConstants.CONFIG_TABLE_DDL,context.getString(FlumeConstants.CONFIG_TABLE_DDL));
+        
serializerContext.put(FlumeConstants.CONFIG_TABLE,context.getString(FlumeConstants.CONFIG_TABLE));
+        
serializerContext.put(FlumeConstants.CONFIG_ZK_QUORUM,context.getString(FlumeConstants.CONFIG_ZK_QUORUM));
+        
serializerContext.put(FlumeConstants.CONFIG_JDBC_URL,context.getString(FlumeConstants.CONFIG_JDBC_URL));
+        
serializerContext.put(FlumeConstants.CONFIG_BATCHSIZE,context.getString(FlumeConstants.CONFIG_BATCHSIZE));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/afdcca5c/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumerTool.java
----------------------------------------------------------------------
diff --git 
a/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumerTool.java
 
b/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumerTool.java
new file mode 100644
index 0000000..8c10aa5
--- /dev/null
+++ 
b/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumerTool.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.kafka.consumer;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PhoenixConsumerTool extends Configured implements Tool {
+    private static final Logger logger = 
LoggerFactory.getLogger(PhoenixConsumerTool.class);
+    static final Option FILE_PATH_OPT = new Option("f", "file", true, "input 
file path");
+    static final Option HELP_OPT = new Option("h", "help", false, "Show this 
help and quit");
+    
+    public static Options getOptions() {
+        Options options = new Options();
+        options.addOption(FILE_PATH_OPT);
+        options.addOption(HELP_OPT);
+        return options;
+    }
+
+    public static CommandLine parseOptions(String[] args) {
+
+        Options options = getOptions();
+
+        CommandLineParser parser = new PosixParser();
+        CommandLine cmdLine = null;
+        try {
+            cmdLine = parser.parse(options, args);
+        } catch (ParseException e) {
+            printHelpAndExit("Error parsing command line options: " + 
e.getMessage(), options);
+        }
+
+        if (cmdLine.hasOption(HELP_OPT.getOpt())) {
+            printHelpAndExit(options, 0);
+        }
+
+        if (!cmdLine.hasOption(FILE_PATH_OPT.getOpt())) {
+            throw new IllegalStateException(FILE_PATH_OPT.getLongOpt() + " is 
a mandatory " + "parameter");
+        }
+
+        if (!cmdLine.getArgList().isEmpty()) {
+            throw new IllegalStateException("Got unexpected extra parameters: 
" + cmdLine.getArgList());
+        }
+
+        return cmdLine;
+    }
+
+    public static void printHelpAndExit(String errorMessage, Options options) {
+        System.err.println(errorMessage);
+        printHelpAndExit(options, 1);
+    }
+
+    public static void printHelpAndExit(Options options, int exitCode) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("help", options);
+        System.exit(exitCode);
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Configuration conf = HBaseConfiguration.create(getConf());
+
+        CommandLine cmdLine = null;
+        try {
+            cmdLine = parseOptions(args);
+        } catch (IllegalStateException e) {
+            printHelpAndExit(e.getMessage(), getOptions());
+        }
+        
+        String path = cmdLine.getOptionValue(FILE_PATH_OPT.getOpt());
+        conf.set("kafka.consumer.file", path);
+        new PhoenixConsumer(conf);
+        
+        return 1;
+    }
+    
+    public static void main(String[] args) throws Exception {
+        int exitStatus = ToolRunner.run(new PhoenixConsumerTool(), args);
+        System.exit(exitStatus);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/afdcca5c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index daabe08..00c4dec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,6 +25,7 @@
   <modules>
     <module>phoenix-core</module>
     <module>phoenix-flume</module>
+    <module>phoenix-kafka</module>
     <module>phoenix-pig</module>
     <module>phoenix-queryserver-client</module>
     <module>phoenix-queryserver</module>
@@ -85,6 +86,7 @@
     <sqlline.version>1.2.0</sqlline.version>
     <guava.version>13.0.1</guava.version>
     <flume.version>1.4.0</flume.version>
+    <kafka.version>0.9.0.0</kafka.version>
     <findbugs-annotations.version>1.3.9-1</findbugs-annotations.version>
     <jcip-annotations.version>1.0-1</jcip-annotations.version>
     <jline.version>2.11</jline.version>
@@ -545,6 +547,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.phoenix</groupId>
+        <artifactId>phoenix-kafka</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.phoenix</groupId>
         <artifactId>phoenix-pig</artifactId>
         <version>${project.version}</version>
       </dependency>

Reply via email to