PHOENIX-3214 Kafka Phoenix Consumer (Kalyan Hadoop)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/afdcca5c Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/afdcca5c Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/afdcca5c Branch: refs/heads/encodecolumns2 Commit: afdcca5ccadcd4f609433491b2da5ea322f196e2 Parents: b2ebe1f Author: Josh Mahonin <jmaho...@gmail.com> Authored: Sun Feb 5 12:12:52 2017 -0500 Committer: Josh Mahonin <jmaho...@gmail.com> Committed: Mon Feb 6 09:33:25 2017 -0500 ---------------------------------------------------------------------- phoenix-assembly/pom.xml | 4 + .../src/build/components/all-common-jars.xml | 8 + phoenix-kafka/pom.xml | 435 +++++++++++++++++++ .../apache/phoenix/kafka/PhoenixConsumerIT.java | 276 ++++++++++++ phoenix-kafka/src/it/resources/consumer.props | 12 + phoenix-kafka/src/it/resources/producer.props | 4 + .../apache/phoenix/kafka/KafkaConstants.java | 52 +++ .../phoenix/kafka/consumer/PhoenixConsumer.java | 276 ++++++++++++ .../kafka/consumer/PhoenixConsumerTool.java | 107 +++++ pom.xml | 7 + 10 files changed, 1181 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/afdcca5c/phoenix-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-assembly/pom.xml b/phoenix-assembly/pom.xml index a221dca..0292926 100644 --- a/phoenix-assembly/pom.xml +++ b/phoenix-assembly/pom.xml @@ -108,6 +108,10 @@ </dependency> <dependency> <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-pig</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/phoenix/blob/afdcca5c/phoenix-assembly/src/build/components/all-common-jars.xml ---------------------------------------------------------------------- diff --git a/phoenix-assembly/src/build/components/all-common-jars.xml b/phoenix-assembly/src/build/components/all-common-jars.xml index e68016f..3d27b26 100644 --- a/phoenix-assembly/src/build/components/all-common-jars.xml +++ b/phoenix-assembly/src/build/components/all-common-jars.xml @@ -99,6 +99,14 @@ <fileMode>0644</fileMode> </fileSet> <fileSet> + <directory>${project.basedir}/../phoenix-kafka/target/</directory> + <outputDirectory>lib</outputDirectory> + <includes> + <include>phoenix-*.jar</include> + </includes> + <fileMode>0644</fileMode> + </fileSet> + <fileSet> <directory>${project.basedir}/../phoenix-core/target/</directory> <outputDirectory>lib</outputDirectory> <includes> http://git-wip-us.apache.org/repos/asf/phoenix/blob/afdcca5c/phoenix-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-kafka/pom.xml b/phoenix-kafka/pom.xml new file mode 100644 index 0000000..042f54c --- /dev/null +++ b/phoenix-kafka/pom.xml @@ -0,0 +1,435 @@ +<?xml version='1.0'?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix</artifactId> + <version>4.10.0-HBase-1.2-SNAPSHOT</version> + </parent> + <artifactId>phoenix-kafka</artifactId> + <name>Phoenix - Kafka</name> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + <distribution>repo</distribution> + <comments /> + </license> + </licenses> + + <organization> + <name>Apache Software Foundation</name> + <url>http://www.apache.org</url> + </organization> + + <properties> + <top.dir>${project.basedir}/..</top.dir> + </properties> + + <dependencies> + <!-- Transaction dependencies --> + <dependency> + <groupId>org.apache.tephra</groupId> + <artifactId>tephra-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tephra</groupId> + <artifactId>tephra-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tephra</groupId> + <artifactId>tephra-core</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.tephra</groupId> + <artifactId>tephra-hbase-compat-1.1</artifactId> + </dependency> + + <!-- Make sure we have all the antlr dependencies --> + <dependency> + <groupId>org.antlr</groupId> + <artifactId>antlr-runtime</artifactId> + </dependency> + <dependency> + <groupId>jline</groupId> + <artifactId>jline</artifactId> + </dependency> + <dependency> + <groupId>sqlline</groupId> + <artifactId>sqlline</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + </dependency> + <!-- JSR-305 and jcip-annotations --> + <dependency> + <groupId>com.github.stephenc.findbugs</groupId> + <artifactId>findbugs-annotations</artifactId> + </dependency> + <dependency> + <groupId>com.github.stephenc.jcip</groupId> + <artifactId>jcip-annotations</artifactId> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-jaxrs</artifactId> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-xc</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protobuf-java.version}</version> + </dependency> + <!-- Intentionally avoid an dependencyManagement entry because of conflict with thin-client --> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.0.1</version> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>${log4j.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> + <dependency> + <groupId>org.iq80.snappy</groupId> + <artifactId>snappy</artifactId> + <version>${snappy.version}</version> + </dependency> + <dependency> + <groupId>com.github.stephenc.high-scale-lib</groupId> + <artifactId>high-scale-lib</artifactId> + <version>1.1.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.yammer.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>2.1.2</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.lmax</groupId> + <artifactId>disruptor</artifactId> + <version>3.2.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.htrace</groupId> + <artifactId>htrace-core</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + </dependency> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </dependency> + <dependency> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-csv</artifactId> + </dependency> + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-testing-util</artifactId> + <scope>test</scope> + <optional>true</optional> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-it</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-annotations</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-protocol</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <exclusions> + <exclusion> + <groupId>xom</groupId> + <artifactId>xom</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-hadoop-compat</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-hadoop-compat</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-hadoop2-compat</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-hadoop2-compat</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minikdc</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.jruby.joni</groupId> + <artifactId>joni</artifactId> + <version>${joni.version}</version> + </dependency> + + <!-- To work with kafka with phoenix --> + <dependency> + <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix-core</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>${kafka.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>${kafka.version}</version> + <classifier>test</classifier> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> + <classifier>test</classifier> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-tools</artifactId> + <version>${kafka.version}</version> + </dependency> + <dependency> + <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix-flume</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- Add the ant-generated sources to the source path --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-site-plugin</artifactId> + <version>3.2</version> + <dependencies> + <dependency> + <groupId>org.apache.maven.doxia</groupId> + <artifactId>doxia-module-markdown</artifactId> + <version>1.3</version> + </dependency> + <dependency> + <groupId>lt.velykis.maven.skins</groupId> + <artifactId>reflow-velocity-tools</artifactId> + <version>1.0.0</version> + </dependency> + <dependency> + <groupId>org.apache.velocity</groupId> + <artifactId>velocity</artifactId> + <version>1.7</version> + </dependency> + </dependencies> + <configuration> + <reportPlugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <version>2.5.2</version> + </plugin> + </reportPlugins> + </configuration> + </plugin> + + <!-- Setup eclipse --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <configuration> + <buildcommands> + <buildcommand>org.jamon.project.templateBuilder</buildcommand> + <buildcommand>org.eclipse.jdt.core.javabuilder</buildcommand> + </buildcommands> + </configuration> + </plugin> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <version>${maven-dependency-plugin.version}</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-resources-plugin</artifactId> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <finalName>phoenix-kafka-${project.version}-minimal</finalName> + <shadedArtifactAttached>false</shadedArtifactAttached> + <promoteTransitiveDependencies>true</promoteTransitiveDependencies> + <shadeTestJar>false</shadeTestJar> + <artifactSet> + <includes> + <include>org.apache.phoenix:phoenix-kafka</include> + <include>org.apache.kafka:kafka-clients</include> + <include>org.apache.phoenix:phoenix-flume</include> + </includes> + </artifactSet> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/phoenix/blob/afdcca5c/phoenix-kafka/src/it/java/org/apache/phoenix/kafka/PhoenixConsumerIT.java ---------------------------------------------------------------------- diff --git a/phoenix-kafka/src/it/java/org/apache/phoenix/kafka/PhoenixConsumerIT.java b/phoenix-kafka/src/it/java/org/apache/phoenix/kafka/PhoenixConsumerIT.java new file mode 100644 index 0000000..cfec391 --- /dev/null +++ b/phoenix-kafka/src/it/java/org/apache/phoenix/kafka/PhoenixConsumerIT.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.kafka; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Properties; + +import org.I0Itec.zkclient.ZkClient; +import org.apache.flume.Context; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; +import org.apache.phoenix.flume.DefaultKeyGenerator; +import org.apache.phoenix.flume.FlumeConstants; +import org.apache.phoenix.flume.serializer.EventSerializers; +import org.apache.phoenix.kafka.consumer.PhoenixConsumer; +import org.apache.phoenix.util.PropertiesUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.io.Resources; + +import kafka.admin.AdminUtils; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import kafka.utils.Time; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import kafka.zk.EmbeddedZookeeper; + +public class PhoenixConsumerIT extends BaseHBaseManagedTimeIT { + private static final String ZKHOST = "127.0.0.1"; + private static final String BROKERHOST = "127.0.0.1"; + private static final String BROKERPORT = "9092"; + private static final String TOPIC = "topic1"; + private KafkaServer kafkaServer; + private PhoenixConsumer pConsumer; + private EmbeddedZookeeper zkServer; + private ZkClient zkClient; + private Connection conn; + + @Before + public void setUp() throws IOException, SQLException { + // setup Zookeeper + zkServer = new EmbeddedZookeeper(); + String zkConnect = ZKHOST + ":" + zkServer.port(); + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + ZkUtils zkUtils = ZkUtils.apply(zkClient, false); + + // setup Broker + Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", + Files.createTempDirectory("kafka-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); + KafkaConfig config = new KafkaConfig(brokerProps); + Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + kafkaServer.startup(); + + // create topic + AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties()); + + pConsumer = new PhoenixConsumer(); + + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + conn = DriverManager.getConnection(getUrl(), props); + } + + @Test + public void testPhoenixConsumerWithFile() throws SQLException { + String consumerPath = "consumer.props"; + PhoenixConsumerThread pConsumerThread = new PhoenixConsumerThread(pConsumer, consumerPath); + pConsumerThread.properties.setProperty(FlumeConstants.CONFIG_JDBC_URL, getUrl()); + Thread phoenixConsumer = new Thread(pConsumerThread); + + String producerPath = "producer.props"; + KafkaProducerThread kProducerThread = new KafkaProducerThread(producerPath, TOPIC); + Thread kafkaProducer = new Thread(kProducerThread); + + phoenixConsumer.start(); + + try { + phoenixConsumer.join(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + kafkaProducer.start(); + + try { + kafkaProducer.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + if (!kafkaProducer.isAlive()) { + System.out.println("kafka producer is not alive"); + pConsumer.stop(); + } + + // Verify our serializer wrote out data + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM SAMPLE1"); + assertTrue(rs.next()); + assertTrue(rs.getFetchSize() > 0); + rs.close(); + } + + @Test + public void testPhoenixConsumerWithProperties() throws SQLException { + + final String fullTableName = "SAMPLE2"; + final String ddl = "CREATE TABLE IF NOT EXISTS SAMPLE2(uid VARCHAR NOT NULL,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR CONSTRAINT pk PRIMARY KEY(uid))\n"; + + Properties consumerProperties = new Properties(); + consumerProperties.setProperty(FlumeConstants.CONFIG_TABLE, fullTableName); + consumerProperties.setProperty(FlumeConstants.CONFIG_JDBC_URL, getUrl()); + consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name()); + consumerProperties.setProperty(FlumeConstants.CONFIG_TABLE_DDL, ddl); + consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_REGULAR_EXPRESSION,"([^\\,]*),([^\\,]*),([^\\,]*)"); + consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_COLUMN_NAMES,"c1,c2,c3"); + consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR, DefaultKeyGenerator.UUID.name()); + consumerProperties.setProperty(KafkaConstants.BOOTSTRAP_SERVERS, "localhost:9092"); + consumerProperties.setProperty(KafkaConstants.TOPICS, "topic1,topic2"); + consumerProperties.setProperty(KafkaConstants.TIMEOUT, "100"); + + PhoenixConsumerThread pConsumerThread = new PhoenixConsumerThread(pConsumer, consumerProperties); + Thread phoenixConsumer = new Thread(pConsumerThread); + + Properties producerProperties = new Properties(); + producerProperties.setProperty(KafkaConstants.BOOTSTRAP_SERVERS, "localhost:9092"); + producerProperties.setProperty(KafkaConstants.KEY_SERIALIZER, KafkaConstants.DEFAULT_KEY_SERIALIZER); + producerProperties.setProperty(KafkaConstants.VALUE_SERIALIZER, KafkaConstants.DEFAULT_VALUE_SERIALIZER); + producerProperties.setProperty("auto.commit.interval.ms", "1000"); + + KafkaProducerThread kProducerThread = new KafkaProducerThread(producerProperties, TOPIC); + Thread kafkaProducer = new Thread(kProducerThread); + + phoenixConsumer.start(); + + try { + phoenixConsumer.join(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + kafkaProducer.start(); + + try { + kafkaProducer.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + if (!kafkaProducer.isAlive()) { + System.out.println("kafka producer is not alive"); + pConsumer.stop(); + } + + // Verify our serializer wrote out data + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM SAMPLE2"); + assertTrue(rs.next()); + assertTrue(rs.getFetchSize() > 0); + rs.close(); + } + + @After + public void cleanUp() throws Exception { + kafkaServer.shutdown(); + zkClient.close(); + zkServer.shutdown(); + conn.close(); + } + + class PhoenixConsumerThread implements Runnable { + PhoenixConsumer pConsumer; + Properties properties; + + PhoenixConsumerThread(PhoenixConsumer pConsumer, String path) { + this.pConsumer = pConsumer; + try (InputStream props = Resources.getResource(path).openStream()) { + Properties properties = new Properties(); + properties.load(props); + this.properties = properties; + } catch (IOException e) { + e.printStackTrace(); + } + } + + PhoenixConsumerThread(PhoenixConsumer pConsumer, Properties properties) { + this.pConsumer = pConsumer; + this.properties = properties; + } + + @Override + public void run() { + // intialize the kafka + pConsumer.intializeKafka(properties); + + // configure the phoenix + Context context = pConsumer.prepareContext(); + pConsumer.configure(context); + + // start the kafka consumer + pConsumer.start(); + + // process kafka messages + pConsumer.process(); + } + } + + class KafkaProducerThread implements Runnable { + KafkaProducer<String, String> producer; + String topic; + + KafkaProducerThread(String path, String topic) { + this.topic = topic; + try (InputStream props = Resources.getResource(path).openStream()) { + Properties properties = new Properties(); + properties.load(props); + producer = new KafkaProducer<>(properties); + } catch (IOException e) { + e.printStackTrace(); + } + } + + KafkaProducerThread(Properties properties, String topic) { + this.topic = topic; + producer = new KafkaProducer<>(properties); + } + + @Override + public void run() { + try { + for (int i = 1; i <= 10; i++) { + String message = String.format("%s,%.3f,%d", "msg" + i, i * 2000f, i); + producer.send(new ProducerRecord<String, String>(topic, message)); + producer.flush(); + Thread.sleep(100); + } + } catch (Throwable throwable) { + System.out.printf("%s", throwable.fillInStackTrace()); + } finally { + producer.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/afdcca5c/phoenix-kafka/src/it/resources/consumer.props ---------------------------------------------------------------------- diff --git a/phoenix-kafka/src/it/resources/consumer.props b/phoenix-kafka/src/it/resources/consumer.props new file mode 100644 index 0000000..eb490d7 --- /dev/null +++ b/phoenix-kafka/src/it/resources/consumer.props @@ -0,0 +1,12 @@ +serializer=regex +serializer.rowkeyType=uuid +serializer.regex=([^\,]*),([^\,]*),([^\,]*) +serializer.columns=c1,c2,c3 + +jdbcUrl=jdbc:phoenix:localhost +table=SAMPLE1 +ddl=CREATE TABLE IF NOT EXISTS SAMPLE1(uid VARCHAR NOT NULL,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR CONSTRAINT pk PRIMARY KEY(uid)) + +bootstrap.servers=localhost:9092 +topics=topic1,topic2 +poll.timeout.ms=100 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/afdcca5c/phoenix-kafka/src/it/resources/producer.props ---------------------------------------------------------------------- diff --git a/phoenix-kafka/src/it/resources/producer.props b/phoenix-kafka/src/it/resources/producer.props new file mode 100644 index 0000000..31e7caa --- /dev/null +++ b/phoenix-kafka/src/it/resources/producer.props @@ -0,0 +1,4 @@ +bootstrap.servers=localhost:9092 +auto.commit.interval.ms=1000 +key.serializer=org.apache.kafka.common.serialization.StringSerializer +value.serializer=org.apache.kafka.common.serialization.StringSerializer \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/afdcca5c/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/KafkaConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/KafkaConstants.java b/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/KafkaConstants.java new file mode 100644 index 0000000..cc1aa61 --- /dev/null +++ b/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/KafkaConstants.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.kafka; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; + +public final class KafkaConstants { + + public static final String BOOTSTRAP_SERVERS = ProducerConfig.BOOTSTRAP_SERVERS_CONFIG; + + public static final String KEY_SERIALIZER = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; + + public static final String VALUE_SERIALIZER = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; + + public static final String DEFAULT_KEY_SERIALIZER = StringSerializer.class.getName(); + + public static final String DEFAULT_VALUE_SERIALIZER = StringSerializer.class.getName(); + + public static final String KEY_DESERIALIZER = ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; + + public static final String VALUE_DESERIALIZER = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; + + public static final String DEFAULT_KEY_DESERIALIZER = StringDeserializer.class.getName(); + + public static final String DEFAULT_VALUE_DESERIALIZER = StringDeserializer.class.getName(); + + public static final String TOPICS = "topics"; + + public static final String GROUP_ID = ConsumerConfig.GROUP_ID_CONFIG; + + public static final String TIMEOUT = "poll.timeout.ms"; + + public static final long DEFAULT_TIMEOUT = 100; +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/afdcca5c/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumer.java ---------------------------------------------------------------------- diff --git a/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumer.java b/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumer.java new file mode 100644 index 0000000..1759cec --- /dev/null +++ b/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumer.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.kafka.consumer; + +import java.io.IOException; +import java.io.InputStream; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Random; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.event.EventBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.phoenix.flume.FlumeConstants; +import org.apache.phoenix.flume.serializer.EventSerializer; +import org.apache.phoenix.flume.serializer.EventSerializers; +import org.apache.phoenix.kafka.KafkaConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; + +public class PhoenixConsumer { + private static final Logger logger = LoggerFactory.getLogger(PhoenixConsumer.class); + + private KafkaConsumer<String, String> consumer = null; + private Properties properties = new Properties(); + private Integer batchSize; + private long timeout; + private EventSerializer serializer; + private Boolean process = true; + + public PhoenixConsumer() { + + } + + public PhoenixConsumer(Configuration conf) throws IOException { + // intialize the kafka + intializeKafka(conf); + + // configure the phoenix + Context context = prepareContext(); + configure(context); + + // start the kafka consumer + start(); + + // process kafka messages + process(); + } + + /** + * Initializes the kafka with properties file. + * @param path + * @throws IOException + */ + public void intializeKafka(Configuration conf) throws IOException { + // get the kafka consumer file + String file = conf.get("kafka.consumer.file"); + Preconditions.checkNotNull(file,"File path cannot be empty, please specify in the arguments"); + + Path path = new Path(file); + FileSystem fs = FileSystem.get(conf); + try (InputStream props = fs.open(path)) { + properties.load(props); + } catch (IOException e) { + e.printStackTrace(); + } + + intializeKafka(properties); + } + + /** + * Initializes the kafka with properties. + * @param properties + */ + public void intializeKafka(Properties properties) { + this.properties = properties; + + String servers = properties.getProperty(KafkaConstants.BOOTSTRAP_SERVERS); + Preconditions.checkNotNull(servers,"Bootstrap Servers cannot be empty, please specify in the configuration file"); + properties.setProperty(KafkaConstants.BOOTSTRAP_SERVERS, servers); + + if (properties.getProperty(KafkaConstants.GROUP_ID) == null) { + properties.setProperty(KafkaConstants.GROUP_ID, "group-" + new Random().nextInt(100000)); + } + + if (properties.getProperty(KafkaConstants.TIMEOUT) == null) { + properties.setProperty(KafkaConstants.TIMEOUT, String.valueOf(KafkaConstants.DEFAULT_TIMEOUT)); + } + + String topics = properties.getProperty(KafkaConstants.TOPICS); + Preconditions.checkNotNull(topics,"Topics cannot be empty, please specify in the configuration file"); + + properties.setProperty(KafkaConstants.KEY_DESERIALIZER, KafkaConstants.DEFAULT_KEY_DESERIALIZER); + + properties.setProperty(KafkaConstants.VALUE_DESERIALIZER, KafkaConstants.DEFAULT_VALUE_DESERIALIZER); + + this.consumer = new KafkaConsumer<>(properties); + consumer.subscribe(Arrays.asList(topics.split(","))); + } + + /** + * Convert the properties to context + */ + public Context prepareContext() { + Map<String, String> map = new HashMap<String, String>(); + for (Entry<Object, Object> entry : properties.entrySet()) { + map.put((String) entry.getKey(), (String) entry.getValue()); + } + return new Context(map); + } + + /** + * Configure the context + */ + public void configure(Context context){ + this.timeout = context.getLong(KafkaConstants.TIMEOUT, KafkaConstants.DEFAULT_TIMEOUT); + this.batchSize = context.getInteger(FlumeConstants.CONFIG_BATCHSIZE, FlumeConstants.DEFAULT_BATCH_SIZE); + final String eventSerializerType = context.getString(FlumeConstants.CONFIG_SERIALIZER); + + Preconditions.checkNotNull(eventSerializerType,"Event serializer cannot be empty, please specify in the configuration file"); + initializeSerializer(context,eventSerializerType); + } + + /** + * Process the kafka messages + */ + public void process() { + int timeouts = 0; + // noinspection InfiniteLoopStatement + while (process) { + // read records with a short timeout. + // If we time out, we don't really care. + // Assuming only key & value text data + ConsumerRecords<String, String> records = consumer.poll(this.timeout); + if (records.count() == 0) { + timeouts++; + } else { + System.out.printf("Got %d records after %d timeouts\n", records.count(), timeouts); + timeouts = 0; + } + + if (!records.isEmpty()) { + List<Event> events = Lists.newArrayListWithCapacity(records.count()); + for (ConsumerRecord<String, String> record : records) { + Event event = EventBuilder.withBody(Bytes.toBytes(record.value())); + events.add(event); + } + // save to Hbase + try { + serializer.upsertEvents(events); + } catch (SQLException e) { + e.printStackTrace(); + } + } + } + } + + /** + * start the serializer + */ + public void start() { + logger.info("Starting consumer {} ", this.getClass()); + try { + serializer.initialize(); + } catch (Exception ex) { + logger.error("Error {} in initializing the serializer.", ex.getMessage()); + Throwables.propagate(ex); + } + } + + /** + * stop the consumer & serializer + */ + public void stop() { + this.close(); + consumer.close(); + try { + serializer.close(); + } catch (SQLException e) { + logger.error(" Error while closing connection {} for consumer.", e.getMessage()); + } + } + + /** + * make the changes to stop in gracefully + */ + public void close(){ + this.process = false; + try { + Thread.sleep(30000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + /** + * Initializes the serializer for kafka messages. + * @param context + * @param eventSerializerType + */ + private void initializeSerializer(final Context context, final String eventSerializerType) { + String serializerClazz = null; + EventSerializers eventSerializer = null; + + try { + eventSerializer = EventSerializers.valueOf(eventSerializerType.toUpperCase()); + } catch (IllegalArgumentException iae) { + serializerClazz = eventSerializerType; + } + + final Context serializerContext = new Context(); + serializerContext.putAll(context.getSubProperties(FlumeConstants.CONFIG_SERIALIZER_PREFIX)); + copyPropertiesToSerializerContext(context,serializerContext); + + try { + @SuppressWarnings("unchecked") + Class<? extends EventSerializer> clazz = null; + if (serializerClazz == null) { + clazz = (Class<? extends EventSerializer>) Class.forName(eventSerializer.getClassName()); + } else { + clazz = (Class<? extends EventSerializer>) Class.forName(serializerClazz); + } + + serializer = clazz.newInstance(); + serializer.configure(serializerContext); + } catch (Exception e) { + logger.error("Could not instantiate event serializer.", e); + Throwables.propagate(e); + } + } + + /** + * Copy properties to serializer context. + * @param context + * @param serializerContext + */ + private void copyPropertiesToSerializerContext(Context context, Context serializerContext) { + serializerContext.put(FlumeConstants.CONFIG_TABLE_DDL,context.getString(FlumeConstants.CONFIG_TABLE_DDL)); + serializerContext.put(FlumeConstants.CONFIG_TABLE,context.getString(FlumeConstants.CONFIG_TABLE)); + serializerContext.put(FlumeConstants.CONFIG_ZK_QUORUM,context.getString(FlumeConstants.CONFIG_ZK_QUORUM)); + serializerContext.put(FlumeConstants.CONFIG_JDBC_URL,context.getString(FlumeConstants.CONFIG_JDBC_URL)); + serializerContext.put(FlumeConstants.CONFIG_BATCHSIZE,context.getString(FlumeConstants.CONFIG_BATCHSIZE)); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/afdcca5c/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumerTool.java ---------------------------------------------------------------------- diff --git a/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumerTool.java b/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumerTool.java new file mode 100644 index 0000000..8c10aa5 --- /dev/null +++ b/phoenix-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumerTool.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.kafka.consumer; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PhoenixConsumerTool extends Configured implements Tool { + private static final Logger logger = LoggerFactory.getLogger(PhoenixConsumerTool.class); + static final Option FILE_PATH_OPT = new Option("f", "file", true, "input file path"); + static final Option HELP_OPT = new Option("h", "help", false, "Show this help and quit"); + + public static Options getOptions() { + Options options = new Options(); + options.addOption(FILE_PATH_OPT); + options.addOption(HELP_OPT); + return options; + } + + public static CommandLine parseOptions(String[] args) { + + Options options = getOptions(); + + CommandLineParser parser = new PosixParser(); + CommandLine cmdLine = null; + try { + cmdLine = parser.parse(options, args); + } catch (ParseException e) { + printHelpAndExit("Error parsing command line options: " + e.getMessage(), options); + } + + if (cmdLine.hasOption(HELP_OPT.getOpt())) { + printHelpAndExit(options, 0); + } + + if (!cmdLine.hasOption(FILE_PATH_OPT.getOpt())) { + throw new IllegalStateException(FILE_PATH_OPT.getLongOpt() + " is a mandatory " + "parameter"); + } + + if (!cmdLine.getArgList().isEmpty()) { + throw new IllegalStateException("Got unexpected extra parameters: " + cmdLine.getArgList()); + } + + return cmdLine; + } + + public static void printHelpAndExit(String errorMessage, Options options) { + System.err.println(errorMessage); + printHelpAndExit(options, 1); + } + + public static void printHelpAndExit(Options options, int exitCode) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("help", options); + System.exit(exitCode); + } + + @Override + public int run(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(getConf()); + + CommandLine cmdLine = null; + try { + cmdLine = parseOptions(args); + } catch (IllegalStateException e) { + printHelpAndExit(e.getMessage(), getOptions()); + } + + String path = cmdLine.getOptionValue(FILE_PATH_OPT.getOpt()); + conf.set("kafka.consumer.file", path); + new PhoenixConsumer(conf); + + return 1; + } + + public static void main(String[] args) throws Exception { + int exitStatus = ToolRunner.run(new PhoenixConsumerTool(), args); + System.exit(exitStatus); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/afdcca5c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index daabe08..00c4dec 100644 --- a/pom.xml +++ b/pom.xml @@ -25,6 +25,7 @@ <modules> <module>phoenix-core</module> <module>phoenix-flume</module> + <module>phoenix-kafka</module> <module>phoenix-pig</module> <module>phoenix-queryserver-client</module> <module>phoenix-queryserver</module> @@ -85,6 +86,7 @@ <sqlline.version>1.2.0</sqlline.version> <guava.version>13.0.1</guava.version> <flume.version>1.4.0</flume.version> + <kafka.version>0.9.0.0</kafka.version> <findbugs-annotations.version>1.3.9-1</findbugs-annotations.version> <jcip-annotations.version>1.0-1</jcip-annotations.version> <jline.version>2.11</jline.version> @@ -545,6 +547,11 @@ </dependency> <dependency> <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix-kafka</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-pig</artifactId> <version>${project.version}</version> </dependency>