This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new dd0be98  PHOENIX-6935 Remove Phoenix Kafka connector
dd0be98 is described below

commit dd0be98983bf3e6bb9ec47f5bebe2dee203d88cd
Author: Aron Meszaros <meszaros.aron.att...@gmail.com>
AuthorDate: Mon Oct 9 09:52:40 2023 +0200

    PHOENIX-6935 Remove Phoenix Kafka connector
---
 README.md                                          |   2 +-
 phoenix5-connectors-assembly/pom.xml               |  24 --
 .../src/build/components/phoenix5-jars.xml         |   8 -
 phoenix5-kafka/pom.xml                             | 236 -----------------
 .../apache/phoenix/kafka/PhoenixConsumerIT.java    | 293 ---------------------
 phoenix5-kafka/src/it/resources/consumer.props     |  32 ---
 phoenix5-kafka/src/it/resources/producer.props     |  24 --
 .../org/apache/phoenix/kafka/KafkaConstants.java   |  52 ----
 .../phoenix/kafka/consumer/PhoenixConsumer.java    | 292 --------------------
 .../kafka/consumer/PhoenixConsumerTool.java        | 107 --------
 pom.xml                                            |   9 -
 11 files changed, 1 insertion(+), 1078 deletions(-)

diff --git a/README.md b/README.md
index c7487ff..fec24b6 100644
--- a/README.md
+++ b/README.md
@@ -22,4 +22,4 @@ limitations under the License.
 Copyright ©2019 [Apache Software Foundation](http://www.apache.org/). All 
Rights Reserved. 
 
 ## Introduction
-This repo contains the Flume, Kafka, Spark and Hive connectors for Phoenix.
\ No newline at end of file
+This repo contains the Flume, Spark and Hive connectors for Phoenix.
\ No newline at end of file
diff --git a/phoenix5-connectors-assembly/pom.xml 
b/phoenix5-connectors-assembly/pom.xml
index c6c4253..917ec5e 100644
--- a/phoenix5-connectors-assembly/pom.xml
+++ b/phoenix5-connectors-assembly/pom.xml
@@ -49,10 +49,6 @@
       <groupId>org.apache.phoenix</groupId>
       <artifactId>phoenix5-hive-shaded</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.apache.phoenix</groupId>
-      <artifactId>phoenix5-kafka</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.apache.phoenix</groupId>
       <artifactId>phoenix5-spark</artifactId>
@@ -153,26 +149,6 @@
               </arguments>
             </configuration>
           </execution>          
-          <execution>
-            <id>kafka without version</id>
-            <phase>package</phase>
-            <goals>
-              <goal>exec</goal>
-            </goals>
-            <configuration>
-              <executable>ln</executable>
-              
<workingDirectory>${project.basedir}/../phoenix5-kafka/target</workingDirectory>
-              <arguments>
-                <argument>-fnsv</argument>
-                <argument>
-                  phoenix5-kafka-${project.version}.jar
-                </argument>
-                <argument>
-                  phoenix5-kafka.jar
-                </argument>
-              </arguments>
-            </configuration>
-          </execution>
         </executions>
       </plugin>
       <plugin>
diff --git 
a/phoenix5-connectors-assembly/src/build/components/phoenix5-jars.xml 
b/phoenix5-connectors-assembly/src/build/components/phoenix5-jars.xml
index b507a93..589c5d8 100644
--- a/phoenix5-connectors-assembly/src/build/components/phoenix5-jars.xml
+++ b/phoenix5-connectors-assembly/src/build/components/phoenix5-jars.xml
@@ -31,14 +31,6 @@
         <include>phoenix5-flume.jar</include>
       </includes>
     </fileSet>
-    <fileSet>
-      <directory>${project.basedir}/../phoenix5-kafka/target</directory>
-      <outputDirectory>/</outputDirectory>
-      <includes>
-        <include>phoenix5-kafka-${project.version}.jar</include>
-        <include>phoenix5-kafka.jar</include>
-      </includes>
-    </fileSet>
     <fileSet>
       <directory>${project.basedir}/../phoenix5-spark-shaded/target</directory>
       <outputDirectory>/</outputDirectory>
diff --git a/phoenix5-kafka/pom.xml b/phoenix5-kafka/pom.xml
deleted file mode 100644
index ecf9c7e..0000000
--- a/phoenix5-kafka/pom.xml
+++ /dev/null
@@ -1,236 +0,0 @@
-<?xml version='1.0'?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied.  See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.phoenix</groupId>
-    <artifactId>phoenix-connectors</artifactId>
-    <version>6.0.0-SNAPSHOT</version>
-  </parent>
-  <artifactId>phoenix5-kafka</artifactId>
-  <name>Phoenix Kafka Connector for Phoenix 5</name>
-
-  <properties>
-    <top.dir>${project.basedir}/..</top.dir>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase-common</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
-
-    <!-- To work with kafka with phoenix -->
-    <dependency>
-      <groupId>org.apache.phoenix</groupId>
-      <artifactId>phoenix-hbase-compat-${hbase.compat.version}</artifactId>
-      <scope>runtime</scope>
-      <optional>true</optional>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka-clients</artifactId>
-      <version>${kafka.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>commons-cli</groupId>
-      <artifactId>commons-cli</artifactId>
-      <version>${commons-cli.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.phoenix</groupId>
-      <artifactId>phoenix5-flume</artifactId>
-    </dependency>
-    <!-- Test dependencies -->
-    <dependency>
-      <groupId>org.apache.phoenix</groupId>
-      <artifactId>phoenix-core</artifactId>
-      <classifier>tests</classifier>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.phoenix.thirdparty</groupId>
-      <artifactId>phoenix-shaded-guava</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.11</artifactId>
-      <version>${kafka.version}</version>
-      <classifier>test</classifier>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase-it</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-minicluster</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <!-- Flume dependencies, as the Kafka connectors extends Flume. -->
-<!--     <dependency> -->
-<!--       <groupId>org.apache.flume</groupId> -->
-<!--       <artifactId>flume-ng-core</artifactId> -->
-<!--     </dependency> -->
-    <dependency>
-      <groupId>org.apache.flume</groupId>
-      <artifactId>flume-ng-sdk</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.flume</groupId>
-      <artifactId>flume-ng-configuration</artifactId>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <pluginManagement>
-      <plugins>
-        <plugin>
-          <groupId>org.apache.maven.plugins</groupId>
-          <artifactId>maven-dependency-plugin</artifactId>
-          <configuration>
-            <ignoreNonCompile>true</ignoreNonCompile>
-          </configuration>
-        </plugin>
-        <plugin>
-          <groupId>org.codehaus.mojo</groupId>
-          <artifactId>build-helper-maven-plugin</artifactId>
-          <version>3.0.0</version>
-          <executions>
-            <execution>
-              <id>add-parent-source</id>
-              <phase>generate-sources</phase>
-              <goals>
-                <goal>add-source</goal>
-              </goals>
-              <configuration>
-                <sources>
-                  <source>${project.parent.basedir}/src/main/java</source>
-                </sources>
-              </configuration>
-            </execution>
-            <execution>
-              <id>add-parent-test-source</id>
-              <phase>generate-sources</phase>
-              <goals>
-                <goal>add-test-source</goal>
-              </goals>
-              <configuration>
-                <sources>
-                  <source>${project.parent.basedir}/src/it/java</source>
-                  <source>${project.parent.basedir}/src/it/resources</source>
-                </sources>
-              </configuration>
-            </execution>
-          </executions>
-        </plugin>
-        <plugin>
-          <groupId>org.apache.maven.plugins</groupId>
-          <artifactId>maven-failsafe-plugin</artifactId>
-        </plugin>
-        <plugin>
-          <groupId>org.apache.maven.plugins</groupId>
-          <artifactId>maven-site-plugin</artifactId>
-        </plugin>
-        <plugin>
-          <groupId>org.apache.maven.plugins</groupId>
-          <artifactId>maven-resources-plugin</artifactId>
-          <executions>
-            <execution>
-              <id>copy-resources</id>
-              <phase>generate-resources</phase>
-              <goals>
-                <goal>copy-resources</goal>
-              </goals>
-              <configuration>
-                <outputDirectory>${project.build.directory}/test-classes
-                </outputDirectory>
-                <overwrite>true</overwrite>
-                <resources>
-                  <resource>
-                    
<directory>${project.parent.basedir}/src/it/resources</directory>
-                  </resource>
-                </resources>
-              </configuration>
-            </execution>
-          </executions>
-        </plugin>
-        <plugin>
-          <groupId>org.apache.maven.plugins</groupId>
-          <artifactId>maven-shade-plugin</artifactId>
-          <configuration>
-            <shadedArtifactAttached>true</shadedArtifactAttached>
-            <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
-            <shadedClassifierName>shaded-minimal</shadedClassifierName>
-            <shadeTestJar>false</shadeTestJar>
-            <artifactSet>
-              <includes>
-                <include>org.apache.phoenix:phoenix5-kafka</include>
-                <include>org.apache.kafka:kafka-clients</include>
-                <include>org.apache.phoenix:phoenix5-flume</include>
-              </includes>
-            </artifactSet>
-          </configuration>
-          <executions>
-            <execution>
-              <phase>package</phase>
-              <goals>
-                <goal>shade</goal>
-              </goals>
-            </execution>
-          </executions>
-        </plugin>
-      </plugins>
-    </pluginManagement>
-    <plugins>
-      <plugin>
-        <!-- Allows us to get the apache-ds bundle artifacts -->
-        <groupId>org.apache.felix</groupId>
-        <artifactId>maven-bundle-plugin</artifactId>
-        <extensions>true</extensions>
-        <inherited>true</inherited>
-      </plugin>
-    </plugins>
-  </build>
-
-</project>
diff --git 
a/phoenix5-kafka/src/it/java/org/apache/phoenix/kafka/PhoenixConsumerIT.java 
b/phoenix5-kafka/src/it/java/org/apache/phoenix/kafka/PhoenixConsumerIT.java
deleted file mode 100644
index cb41008..0000000
--- a/phoenix5-kafka/src/it/java/org/apache/phoenix/kafka/PhoenixConsumerIT.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.kafka;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Map;
-import java.util.Properties;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.flume.Context;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.flume.DefaultKeyGenerator;
-import org.apache.phoenix.flume.FlumeConstants;
-import org.apache.phoenix.flume.serializer.EventSerializers;
-import org.apache.phoenix.kafka.consumer.PhoenixConsumer;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.query.BaseTest;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-
-import kafka.admin.AdminUtils;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.MockTime;
-import kafka.utils.TestUtils;
-import kafka.utils.Time;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import kafka.zk.EmbeddedZookeeper;
-
-@Category(NeedsOwnMiniClusterTest.class)
-public class PhoenixConsumerIT extends BaseTest {
-    private static final String ZKHOST = "127.0.0.1";
-    private static final String BROKERHOST = "127.0.0.1";
-    private static final String BROKERPORT = "9092";
-    private static final String TOPIC = "topic1";
-    private KafkaServer kafkaServer;
-    private PhoenixConsumer pConsumer;
-    private EmbeddedZookeeper zkServer;
-    private ZkClient zkClient;
-    private Connection conn;
-
-    @BeforeClass
-    public static void doSetup() throws Exception {
-        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
-        props.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, 
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
-        // Must update config before starting server
-        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-    }
-
-    @Before
-    public void setUp() throws Exception {
-        setUpTestDriver(ReadOnlyProps.EMPTY_PROPS);
-        // setup Zookeeper
-        zkServer = new EmbeddedZookeeper();
-        String zkConnect = ZKHOST + ":" + zkServer.port();
-        zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
-        ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
-
-        // setup Broker
-        Properties brokerProps = new Properties();
-        brokerProps.setProperty("zookeeper.connect", zkConnect);
-        brokerProps.setProperty("broker.id", "0");
-        brokerProps.setProperty("log.dirs",
-            Files.createTempDirectory("kafka-").toAbsolutePath().toString());
-        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" 
+ BROKERPORT);
-        KafkaConfig config = new KafkaConfig(brokerProps);
-        Time mock = new MockTime();
-        kafkaServer = TestUtils.createServer(config, mock);
-        kafkaServer.startup();
-
-        // create topic
-        AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties());
-
-        pConsumer = new PhoenixConsumer();
-        
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        conn = DriverManager.getConnection(getUrl(), props);
-    }
-
-    @Test
-    public void testPhoenixConsumerWithFile() throws SQLException {
-        String consumerPath = "consumer.props";
-        PhoenixConsumerThread pConsumerThread = new 
PhoenixConsumerThread(pConsumer, consumerPath);
-        pConsumerThread.properties.setProperty(FlumeConstants.CONFIG_JDBC_URL, 
getUrl());
-        Thread phoenixConsumer = new Thread(pConsumerThread);
-
-        String producerPath = "producer.props";
-        KafkaProducerThread kProducerThread = new 
KafkaProducerThread(producerPath, TOPIC);
-        Thread kafkaProducer = new Thread(kProducerThread);
-
-        phoenixConsumer.start();
-
-        try {
-            phoenixConsumer.join(10000);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-
-        kafkaProducer.start();
-
-        try {
-            kafkaProducer.join();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-
-        if (!kafkaProducer.isAlive()) {
-            System.out.println("kafka producer is not alive");
-            pConsumer.stop();
-        }
-        
-        // Verify our serializer wrote out data
-        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM 
SAMPLE1");
-        assertTrue(rs.next());
-        assertTrue(rs.getFetchSize() > 0);
-        rs.close();
-    }
-    
-    @Test
-    public void testPhoenixConsumerWithProperties() throws SQLException {
-        
-        final String fullTableName = "SAMPLE2";
-        final String ddl = "CREATE TABLE IF NOT EXISTS SAMPLE2(uid VARCHAR NOT 
NULL,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR CONSTRAINT pk PRIMARY KEY(uid))\n";
-        
-        Properties consumerProperties = new Properties();
-        consumerProperties.setProperty(FlumeConstants.CONFIG_TABLE, 
fullTableName);
-        consumerProperties.setProperty(FlumeConstants.CONFIG_JDBC_URL, 
getUrl());
-        
consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name());
-        consumerProperties.setProperty(FlumeConstants.CONFIG_TABLE_DDL, ddl);
-        consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CONFIG_REGULAR_EXPRESSION,"([^\\,]*),([^\\,]*),([^\\,]*)");
-        consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CONFIG_COLUMN_NAMES,"c1,c2,c3");
-        consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR, DefaultKeyGenerator.UUID.name());
-        consumerProperties.setProperty(KafkaConstants.BOOTSTRAP_SERVERS, 
"localhost:9092");
-        consumerProperties.setProperty(KafkaConstants.TOPICS, "topic1,topic2");
-        consumerProperties.setProperty(KafkaConstants.TIMEOUT, "100");
-        
-        PhoenixConsumerThread pConsumerThread = new 
PhoenixConsumerThread(pConsumer, consumerProperties);
-        Thread phoenixConsumer = new Thread(pConsumerThread);
-
-        Properties producerProperties = new Properties();
-        producerProperties.setProperty(KafkaConstants.BOOTSTRAP_SERVERS, 
"localhost:9092");
-        producerProperties.setProperty(KafkaConstants.KEY_SERIALIZER, 
KafkaConstants.DEFAULT_KEY_SERIALIZER);
-        producerProperties.setProperty(KafkaConstants.VALUE_SERIALIZER, 
KafkaConstants.DEFAULT_VALUE_SERIALIZER);
-        producerProperties.setProperty("auto.commit.interval.ms", "1000");
-        
-        KafkaProducerThread kProducerThread = new 
KafkaProducerThread(producerProperties, TOPIC);
-        Thread kafkaProducer = new Thread(kProducerThread);
-
-        phoenixConsumer.start();
-
-        try {
-            phoenixConsumer.join(10000);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-
-        kafkaProducer.start();
-
-        try {
-            kafkaProducer.join();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-
-        if (!kafkaProducer.isAlive()) {
-            System.out.println("kafka producer is not alive");
-            pConsumer.stop();
-        }
-        
-        // Verify our serializer wrote out data
-        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM 
SAMPLE2");
-        assertTrue(rs.next());
-        assertTrue(rs.getFetchSize() > 0);
-        rs.close();
-    }
-
-    @After
-    public void cleanUp() throws Exception {
-        kafkaServer.shutdown();
-        zkClient.close();
-        zkServer.shutdown();
-        conn.close();
-    }
-
-    class PhoenixConsumerThread implements Runnable {
-        PhoenixConsumer pConsumer;
-        Properties properties;
-
-        PhoenixConsumerThread(PhoenixConsumer pConsumer, String path) {
-            this.pConsumer = pConsumer;
-            try (InputStream props = 
getClass().getClassLoader().getResourceAsStream(path)) {
-                Properties properties = new Properties();
-                properties.load(props);
-                this.properties = properties;
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-        
-        PhoenixConsumerThread(PhoenixConsumer pConsumer, Properties 
properties) {
-            this.pConsumer = pConsumer;
-            this.properties = properties;
-        }
-
-        @Override
-        public void run() {
-            // intialize the kafka
-            pConsumer.intializeKafka(properties);
-
-            // configure the phoenix
-            Context context = pConsumer.prepareContext();
-            pConsumer.configure(context);
-
-            // start the kafka consumer
-            pConsumer.start();
-
-            // process kafka messages
-            pConsumer.process();
-        }
-    }
-
-    class KafkaProducerThread implements Runnable {
-        KafkaProducer<String, String> producer;
-        String topic;
-
-        KafkaProducerThread(String path, String topic) {
-            this.topic = topic;
-            try (InputStream props = 
getClass().getClassLoader().getResourceAsStream(path)) {
-                Properties properties = new Properties();
-                properties.load(props);
-                producer = new KafkaProducer<>(properties);
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-
-        KafkaProducerThread(Properties properties, String topic) {
-            this.topic = topic;
-            producer = new KafkaProducer<>(properties);
-        }
-
-        @Override
-        public void run() {
-            try {
-                for (int i = 1; i <= 10; i++) {
-                    String message = String.format("%s,%.3f,%d", "msg" + i, i 
* 2000f, i);
-                    producer.send(new ProducerRecord<String, String>(topic, 
message));
-                    producer.flush();
-                    Thread.sleep(100);
-                }
-            } catch (Throwable throwable) {
-                System.out.printf("%s", throwable.fillInStackTrace());
-            } finally {
-                producer.close();
-            }
-        }
-    }
-}
diff --git a/phoenix5-kafka/src/it/resources/consumer.props 
b/phoenix5-kafka/src/it/resources/consumer.props
deleted file mode 100644
index 703fd7c..0000000
--- a/phoenix5-kafka/src/it/resources/consumer.props
+++ /dev/null
@@ -1,32 +0,0 @@
-############################################################################
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-############################################################################
-
-serializer=regex
-serializer.rowkeyType=uuid
-serializer.regex=([^\,]*),([^\,]*),([^\,]*)
-serializer.columns=c1,c2,c3
-
-jdbcUrl=jdbc:phoenix:localhost
-table=SAMPLE1
-ddl=CREATE TABLE IF NOT EXISTS SAMPLE1(uid VARCHAR NOT NULL,c1 VARCHAR,c2 
VARCHAR,c3 VARCHAR CONSTRAINT pk PRIMARY KEY(uid))
-
-bootstrap.servers=localhost:9092
-topics=topic1,topic2
-poll.timeout.ms=100
diff --git a/phoenix5-kafka/src/it/resources/producer.props 
b/phoenix5-kafka/src/it/resources/producer.props
deleted file mode 100644
index 4c3cd2f..0000000
--- a/phoenix5-kafka/src/it/resources/producer.props
+++ /dev/null
@@ -1,24 +0,0 @@
-############################################################################
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-############################################################################
-
-bootstrap.servers=localhost:9092
-auto.commit.interval.ms=1000
-key.serializer=org.apache.kafka.common.serialization.StringSerializer
-value.serializer=org.apache.kafka.common.serialization.StringSerializer
diff --git 
a/phoenix5-kafka/src/main/java/org/apache/phoenix/kafka/KafkaConstants.java 
b/phoenix5-kafka/src/main/java/org/apache/phoenix/kafka/KafkaConstants.java
deleted file mode 100644
index cc1aa61..0000000
--- a/phoenix5-kafka/src/main/java/org/apache/phoenix/kafka/KafkaConstants.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.kafka;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-
-public final class KafkaConstants {
-
-    public static final String BOOTSTRAP_SERVERS = 
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
-    
-    public static final String KEY_SERIALIZER = 
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
-
-    public static final String VALUE_SERIALIZER = 
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-
-    public static final String DEFAULT_KEY_SERIALIZER = 
StringSerializer.class.getName();
-
-    public static final String DEFAULT_VALUE_SERIALIZER = 
StringSerializer.class.getName();
-    
-    public static final String KEY_DESERIALIZER = 
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
-
-    public static final String VALUE_DESERIALIZER = 
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-
-    public static final String DEFAULT_KEY_DESERIALIZER = 
StringDeserializer.class.getName();
-
-    public static final String DEFAULT_VALUE_DESERIALIZER = 
StringDeserializer.class.getName();
-
-    public static final String TOPICS = "topics";
-
-    public static final String GROUP_ID = ConsumerConfig.GROUP_ID_CONFIG;
-    
-    public static final String TIMEOUT = "poll.timeout.ms";
-    
-    public static final long DEFAULT_TIMEOUT = 100;
-}
diff --git 
a/phoenix5-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumer.java
 
b/phoenix5-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumer.java
deleted file mode 100644
index 6551386..0000000
--- 
a/phoenix5-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumer.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.kafka.consumer;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Random;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.event.EventBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.phoenix.flume.FlumeConstants;
-import org.apache.phoenix.flume.serializer.EventSerializer;
-import org.apache.phoenix.flume.serializer.EventSerializers;
-import org.apache.phoenix.kafka.KafkaConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class PhoenixConsumer {
-    private static final Logger logger = 
LoggerFactory.getLogger(PhoenixConsumer.class);
-
-    private KafkaConsumer<String, String> consumer = null;
-    private Properties properties = new Properties();
-    private Integer batchSize;
-    private long timeout;
-    private EventSerializer serializer;
-    private Boolean process = true;
-    
-    public PhoenixConsumer() {
-
-    }
-    
-    public PhoenixConsumer(Configuration conf) throws IOException {
-        // intialize the kafka
-        intializeKafka(conf);
-
-        // configure the phoenix
-        Context context = prepareContext();
-        configure(context);
-
-        // start the kafka consumer
-        start();
-
-        // process kafka messages
-        process();
-    }
-    
-    /**
-     * Initializes the kafka with properties file.
-     * @param conf
-     * @throws IOException 
-     */
-    public void intializeKafka(Configuration conf) throws IOException {
-       // get the kafka consumer file
-       String file = conf.get("kafka.consumer.file");
-       if(file==null){
-           throw new NullPointerException("File path cannot be empty, please 
specify in the arguments");
-      }
-
-        Path path = new Path(file);
-        FileSystem fs = FileSystem.get(conf);
-        try (InputStream props = fs.open(path)) {
-            properties.load(props);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        
-        intializeKafka(properties);
-    }
-    
-    /**
-     * Initializes the kafka with properties.
-     * @param properties
-     */
-    public void intializeKafka(Properties properties) {
-        this.properties = properties;
-        
-        String servers = 
properties.getProperty(KafkaConstants.BOOTSTRAP_SERVERS);
-        if(servers ==null){
-            throw new NullPointerException("Bootstrap Servers cannot be empty, 
please specify in the configuration file");
-        }
-        properties.setProperty(KafkaConstants.BOOTSTRAP_SERVERS, servers);
-        
-        if (properties.getProperty(KafkaConstants.GROUP_ID) == null) {
-            properties.setProperty(KafkaConstants.GROUP_ID, "group-" + new 
Random().nextInt(100000));
-        }
-        
-        if (properties.getProperty(KafkaConstants.TIMEOUT) == null) {
-            properties.setProperty(KafkaConstants.TIMEOUT, 
String.valueOf(KafkaConstants.DEFAULT_TIMEOUT));
-        }
-
-        String topics = properties.getProperty(KafkaConstants.TOPICS);
-
-        if (topics == null) {
-            throw new NullPointerException("Topics cannot be empty, please 
specify in the configuration file");
-        }
-
-        properties.setProperty(KafkaConstants.KEY_DESERIALIZER, 
KafkaConstants.DEFAULT_KEY_DESERIALIZER);
-        
-        properties.setProperty(KafkaConstants.VALUE_DESERIALIZER, 
KafkaConstants.DEFAULT_VALUE_DESERIALIZER);
-        
-        this.consumer = new KafkaConsumer<>(properties);
-        consumer.subscribe(Arrays.asList(topics.split(",")));
-    }
-  
-    /**
-     * Convert the properties to context
-     */
-    public Context prepareContext() {
-        Map<String, String> map = new HashMap<String, String>();
-        for (Entry<Object, Object> entry : properties.entrySet()) {
-            map.put((String) entry.getKey(), (String) entry.getValue());
-        }
-        return new Context(map);
-    }
-
-    /**
-     * Configure the context
-     */
-    public void configure(Context context){
-        this.timeout = context.getLong(KafkaConstants.TIMEOUT, 
KafkaConstants.DEFAULT_TIMEOUT);
-        this.batchSize = context.getInteger(FlumeConstants.CONFIG_BATCHSIZE, 
FlumeConstants.DEFAULT_BATCH_SIZE);
-        final String eventSerializerType = 
context.getString(FlumeConstants.CONFIG_SERIALIZER);
-        if (eventSerializerType ==null){
-            throw new NullPointerException("Event serializer cannot be empty, 
please specify in the configuration file");
-        }
-        initializeSerializer(context,eventSerializerType);
-    }
-    
-    /**
-     * Process the kafka messages
-     */
-    public void process() {
-        int timeouts = 0;
-        // noinspection InfiniteLoopStatement
-        while (process) {
-            // read records with a short timeout.
-            // If we time out, we don't really care.
-            // Assuming only key & value text data
-            ConsumerRecords<String, String> records = 
consumer.poll(this.timeout);
-            if (records.count() == 0) {
-                timeouts++;
-            } else {
-                System.out.printf("Got %d records after %d timeouts\n", 
records.count(), timeouts);
-                timeouts = 0;
-            }
-
-            if (!records.isEmpty()) {
-                List<Event> events = new ArrayList<>(records.count());
-                for (ConsumerRecord<String, String> record : records) {
-                    Event event = 
EventBuilder.withBody(Bytes.toBytes(record.value()));
-                    events.add(event);
-                }
-                // save to Hbase
-                try {
-                    serializer.upsertEvents(events);
-                } catch (SQLException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    /**
-     * start the serializer
-     */
-    public void start() {
-        logger.info("Starting consumer {} ", this.getClass());
-        try {
-            serializer.initialize();
-        } catch (Exception ex) {
-            logger.error("Error {} in initializing the serializer.", 
ex.getMessage());
-            if (ex instanceof RuntimeException){
-                throw RuntimeException.class.cast(ex);
-            }
-            else {
-                throw new RuntimeException(ex);
-            }
-        }
-    }
-
-    /**
-     * stop the consumer and serializer
-     */
-    public void stop() {
-        this.close();
-        consumer.close();
-        try {
-            serializer.close();
-        } catch (SQLException e) {
-            logger.error(" Error while closing connection {} for consumer.", 
e.getMessage());
-        }
-    }
-    
-    /**
-     * make the changes to stop in gracefully
-     */
-    public void close(){
-        this.process = false;
-        try {
-            Thread.sleep(30000);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-    }
-
-    /**
-     * Initializes the serializer for kafka messages.
-     * @param context
-     * @param eventSerializerType
-     */
-    private void initializeSerializer(final Context context, final String 
eventSerializerType) {
-        String serializerClazz = null;
-        EventSerializers eventSerializer = null;
-
-        try {
-            eventSerializer = 
EventSerializers.valueOf(eventSerializerType.toUpperCase());
-        } catch (IllegalArgumentException iae) {
-            serializerClazz = eventSerializerType;
-        }
-
-        final Context serializerContext = new Context();
-        
serializerContext.putAll(context.getSubProperties(FlumeConstants.CONFIG_SERIALIZER_PREFIX));
-        copyPropertiesToSerializerContext(context,serializerContext);
-        
-        try {
-            @SuppressWarnings("unchecked")
-            Class<? extends EventSerializer> clazz = null;
-            if (serializerClazz == null) {
-                clazz = (Class<? extends EventSerializer>) 
Class.forName(eventSerializer.getClassName());
-            } else {
-                clazz = (Class<? extends EventSerializer>) 
Class.forName(serializerClazz);
-            }
-
-            serializer = clazz.newInstance();
-            serializer.configure(serializerContext);
-        } catch (Exception e) {
-            logger.error("Could not instantiate event serializer.", e);
-            if (e instanceof RuntimeException){
-                throw (RuntimeException)e;
-            }
-            else {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-    
-    /**
-     * Copy properties to serializer context.
-     * @param context
-     * @param serializerContext
-     */
-    private void copyPropertiesToSerializerContext(Context context, Context 
serializerContext) {
-        
serializerContext.put(FlumeConstants.CONFIG_TABLE_DDL,context.getString(FlumeConstants.CONFIG_TABLE_DDL));
-        
serializerContext.put(FlumeConstants.CONFIG_TABLE,context.getString(FlumeConstants.CONFIG_TABLE));
-        
serializerContext.put(FlumeConstants.CONFIG_ZK_QUORUM,context.getString(FlumeConstants.CONFIG_ZK_QUORUM));
-        
serializerContext.put(FlumeConstants.CONFIG_JDBC_URL,context.getString(FlumeConstants.CONFIG_JDBC_URL));
-        
serializerContext.put(FlumeConstants.CONFIG_BATCHSIZE,context.getString(FlumeConstants.CONFIG_BATCHSIZE));
-    }
-
-}
diff --git 
a/phoenix5-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumerTool.java
 
b/phoenix5-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumerTool.java
deleted file mode 100644
index 8c10aa5..0000000
--- 
a/phoenix5-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumerTool.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.kafka.consumer;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PhoenixConsumerTool extends Configured implements Tool {
-    private static final Logger logger = 
LoggerFactory.getLogger(PhoenixConsumerTool.class);
-    static final Option FILE_PATH_OPT = new Option("f", "file", true, "input 
file path");
-    static final Option HELP_OPT = new Option("h", "help", false, "Show this 
help and quit");
-    
-    public static Options getOptions() {
-        Options options = new Options();
-        options.addOption(FILE_PATH_OPT);
-        options.addOption(HELP_OPT);
-        return options;
-    }
-
-    public static CommandLine parseOptions(String[] args) {
-
-        Options options = getOptions();
-
-        CommandLineParser parser = new PosixParser();
-        CommandLine cmdLine = null;
-        try {
-            cmdLine = parser.parse(options, args);
-        } catch (ParseException e) {
-            printHelpAndExit("Error parsing command line options: " + 
e.getMessage(), options);
-        }
-
-        if (cmdLine.hasOption(HELP_OPT.getOpt())) {
-            printHelpAndExit(options, 0);
-        }
-
-        if (!cmdLine.hasOption(FILE_PATH_OPT.getOpt())) {
-            throw new IllegalStateException(FILE_PATH_OPT.getLongOpt() + " is 
a mandatory " + "parameter");
-        }
-
-        if (!cmdLine.getArgList().isEmpty()) {
-            throw new IllegalStateException("Got unexpected extra parameters: 
" + cmdLine.getArgList());
-        }
-
-        return cmdLine;
-    }
-
-    public static void printHelpAndExit(String errorMessage, Options options) {
-        System.err.println(errorMessage);
-        printHelpAndExit(options, 1);
-    }
-
-    public static void printHelpAndExit(Options options, int exitCode) {
-        HelpFormatter formatter = new HelpFormatter();
-        formatter.printHelp("help", options);
-        System.exit(exitCode);
-    }
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Configuration conf = HBaseConfiguration.create(getConf());
-
-        CommandLine cmdLine = null;
-        try {
-            cmdLine = parseOptions(args);
-        } catch (IllegalStateException e) {
-            printHelpAndExit(e.getMessage(), getOptions());
-        }
-        
-        String path = cmdLine.getOptionValue(FILE_PATH_OPT.getOpt());
-        conf.set("kafka.consumer.file", path);
-        new PhoenixConsumer(conf);
-        
-        return 1;
-    }
-    
-    public static void main(String[] args) throws Exception {
-        int exitStatus = ToolRunner.run(new PhoenixConsumerTool(), args);
-        System.exit(exitStatus);
-    }
-}
diff --git a/pom.xml b/pom.xml
index b4c4cb8..b1a590a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,7 +46,6 @@
       <module>phoenix5-hive</module>
       <module>phoenix5-hive-shaded</module>
       <module>phoenix5-flume</module>
-      <module>phoenix5-kafka</module>
       <module>phoenix5-spark</module>
       <module>phoenix5-spark-shaded</module>
       <module>phoenix5-spark3</module>
@@ -100,7 +99,6 @@
     <hive3-storage.version>2.7.0</hive3-storage.version>
     <hive-storage.version>${hive3-storage.version}</hive-storage.version>
     <flume.version>1.4.0</flume.version>
-    <kafka.version>0.9.0.0</kafka.version>
     <spark.version>2.4.0</spark.version>
     <spark3.version>3.0.3</spark3.version>
     <scala.version>2.11.12</scala.version>
@@ -118,8 +116,6 @@
     <jodatime.version>2.10.5</jodatime.version>
     <commons-cli.version>1.4</commons-cli.version>
     <commons-compress.version>1.9</commons-compress.version>
-    <!-- For Kafka -->
-    <com-101tek-zkclient.version>0.7</com-101tek-zkclient.version>
     <!-- For hive -->
     <commons-io.version>2.11.0</commons-io.version>
 
@@ -515,11 +511,6 @@
         <artifactId>phoenix5-flume</artifactId>
         <version>${project.version}</version>
       </dependency>
-      <dependency>
-        <groupId>org.apache.phoenix</groupId>
-        <artifactId>phoenix5-kafka</artifactId>
-        <version>${project.version}</version>
-      </dependency>
       <dependency>
         <groupId>org.apache.phoenix</groupId>
         <artifactId>phoenix5-spark</artifactId>


Reply via email to