PHOENIX-2084 Support loading JSON data using phoenix-flume (Kalyan Hadoop)

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d9d53bc1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d9d53bc1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d9d53bc1

Branch: refs/heads/calcite
Commit: d9d53bc1498a85ce73e89ba63d0bfaed96a186e3
Parents: 1a6a689
Author: Josh Mahonin <jmaho...@gmail.com>
Authored: Thu Dec 15 14:38:15 2016 -0500
Committer: Josh Mahonin <jmaho...@gmail.com>
Committed: Thu Dec 15 14:38:15 2016 -0500

----------------------------------------------------------------------
 phoenix-flume/pom.xml                           |  12 +
 .../phoenix/flume/JsonEventSerializerIT.java    | 541 +++++++++++++++++++
 .../apache/phoenix/flume/FlumeConstants.java    |   7 +-
 .../flume/serializer/BaseEventSerializer.java   |   4 +-
 .../flume/serializer/EventSerializers.java      |   2 +-
 .../flume/serializer/JsonEventSerializer.java   | 226 ++++++++
 6 files changed, 789 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d9d53bc1/phoenix-flume/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-flume/pom.xml b/phoenix-flume/pom.xml
index ae8bb2c..5c4f197 100644
--- a/phoenix-flume/pom.xml
+++ b/phoenix-flume/pom.xml
@@ -167,6 +167,18 @@
       <scope>test</scope>
     </dependency>
 
+    <!-- to work with json data using flume -->
+    <dependency>
+      <groupId>org.json</groupId>
+      <artifactId>json</artifactId>
+      <version>20160212</version>
+    </dependency>
+    <dependency>
+      <groupId>com.jayway.jsonpath</groupId>
+      <artifactId>json-path</artifactId>
+      <version>2.2.0</version>
+    </dependency>
+
     <!-- Main dependency on flume. The last to avoid using old commons-io in 
IT -->
     <dependency>
       <groupId>org.apache.flume</groupId>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d9d53bc1/phoenix-flume/src/it/java/org/apache/phoenix/flume/JsonEventSerializerIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-flume/src/it/java/org/apache/phoenix/flume/JsonEventSerializerIT.java 
b/phoenix-flume/src/it/java/org/apache/phoenix/flume/JsonEventSerializerIT.java
new file mode 100644
index 0000000..0210bad
--- /dev/null
+++ 
b/phoenix-flume/src/it/java/org/apache/phoenix/flume/JsonEventSerializerIT.java
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.flume;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.flume.serializer.EventSerializers;
+import org.apache.phoenix.flume.sink.PhoenixSink;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Test;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class JsonEventSerializerIT extends BaseHBaseManagedTimeIT {
+
+       private Context sinkContext;
+       private PhoenixSink sink;
+
+       @Test
+       public void testWithOutColumnsMapping() throws EventDeliveryException, 
SQLException {
+
+               final String fullTableName = "FLUME_JSON_TEST";
+
+               String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
+                               + "  (flume_time timestamp not null, col1 
varchar , col2 double, col3 varchar[], col4 integer[]"
+                               + "  CONSTRAINT pk PRIMARY KEY (flume_time))\n";
+               String columns = "col1,col2,col3,col4";
+               String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
+               initSinkContext(fullTableName, ddl, columns, null, rowkeyType, 
null);
+
+               sink = new PhoenixSink();
+               Configurables.configure(sink, sinkContext);
+
+               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+               final Channel channel = this.initChannel();
+               sink.setChannel(channel);
+
+               sink.start();
+               final String eventBody = "{\"col1\" : \"kalyan\", \"col2\" : 
10.5, \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}";
+               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
+               // put event in channel
+               Transaction transaction = channel.getTransaction();
+               transaction.begin();
+               channel.put(event);
+               transaction.commit();
+               transaction.close();
+
+               sink.process();
+
+               int rowsInDb = countRows(fullTableName);
+               assertEquals(1, rowsInDb);
+
+               sink.stop();
+               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+               dropTable(fullTableName);
+       }
+
+       @Test
+       public void testDifferentColumnNames() throws EventDeliveryException, 
SQLException {
+
+               final String fullTableName = "FLUME_JSON_TEST";
+
+               String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
+                               + "  (flume_time timestamp not null, col1 
varchar , col2 double, col3 varchar[], col4 integer[]"
+                               + "  CONSTRAINT pk PRIMARY KEY (flume_time))\n";
+               String columns = "col1,col2,col3,col4";
+               String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
+               String columnsMapping = 
"{\"col1\":\"col1\",\"col2\":\"f2\",\"col3\":\"f3\",\"col4\":\"col4\"}";
+
+               initSinkContext(fullTableName, ddl, columns, columnsMapping, 
rowkeyType, null);
+
+               sink = new PhoenixSink();
+               Configurables.configure(sink, sinkContext);
+
+               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+               final Channel channel = this.initChannel();
+               sink.setChannel(channel);
+
+               sink.start();
+               final String eventBody = "{\"col1\" : \"kalyan\", \"f2\" : 
10.5, \"f3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}";
+               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
+               // put event in channel
+               Transaction transaction = channel.getTransaction();
+               transaction.begin();
+               channel.put(event);
+               transaction.commit();
+               transaction.close();
+
+               sink.process();
+
+               int rowsInDb = countRows(fullTableName);
+               assertEquals(1, rowsInDb);
+
+               sink.stop();
+               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+               dropTable(fullTableName);
+       }
+
+       @Test
+       public void testInnerColumns() throws EventDeliveryException, 
SQLException {
+
+               final String fullTableName = "FLUME_JSON_TEST";
+
+               String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
+                               + "  (flume_time timestamp not null, col1 
varchar , col2 double, col3 varchar[], col4 integer[]"
+                               + "  CONSTRAINT pk PRIMARY KEY (flume_time))\n";
+               String columns = "col1,col2,col3,col4";
+               String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
+               String columnsMapping = 
"{\"col1\":\"col1\",\"col2\":\"x.y\",\"col3\":\"a.b1.c\",\"col4\":\"col4\"}";
+
+               initSinkContext(fullTableName, ddl, columns, columnsMapping, 
rowkeyType, null);
+
+               sink = new PhoenixSink();
+               Configurables.configure(sink, sinkContext);
+
+               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+               final Channel channel = this.initChannel();
+               sink.setChannel(channel);
+
+               sink.start();
+               final String eventBody = "{\"col1\" : \"kalyan\", \"x\" : 
{\"y\" : 10.5}, \"a\" : {\"b1\" : {\"c\" : [\"abc\",\"pqr\",\"xyz\"] }, \"b2\" 
: 111}, \"col4\" : [1,2,3,4]}";
+               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
+               // put event in channel
+               Transaction transaction = channel.getTransaction();
+               transaction.begin();
+               channel.put(event);
+               transaction.commit();
+               transaction.close();
+
+               sink.process();
+
+               int rowsInDb = countRows(fullTableName);
+               assertEquals(1, rowsInDb);
+
+               sink.stop();
+               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+               dropTable(fullTableName);
+       }
+
+       @Test
+       public void testInnerColumnsWithArrayMapping() throws 
EventDeliveryException, SQLException {
+
+               final String fullTableName = "FLUME_JSON_TEST";
+
+               String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
+                               + "  (flume_time timestamp not null, col1 
varchar , col2 double, col3 varchar[], col4 integer[]"
+                               + "  CONSTRAINT pk PRIMARY KEY (flume_time))\n";
+               String columns = "col1,col2,col3,col4";
+               String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
+               String columnsMapping = 
"{\"col1\":\"col1\",\"col2\":\"x.y\",\"col3\":\"a.b[*].c\",\"col4\":\"col4\"}";
+
+               initSinkContext(fullTableName, ddl, columns, columnsMapping, 
rowkeyType, null);
+
+               sink = new PhoenixSink();
+               Configurables.configure(sink, sinkContext);
+
+               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+               final Channel channel = this.initChannel();
+               sink.setChannel(channel);
+
+               sink.start();
+               final String eventBody = "{\"col1\" : \"kalyan\", \"x\" : 
{\"y\" : 10.5}, \"a\" : {\"b\" : [{\"c\" : \"abc\"}, {\"c\" : \"pqr\"}, {\"c\" 
: \"xyz\"}] , \"b2\" : 111}, \"col4\" : [1,2,3,4]}";
+               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
+               // put event in channel
+               Transaction transaction = channel.getTransaction();
+               transaction.begin();
+               channel.put(event);
+               transaction.commit();
+               transaction.close();
+
+               sink.process();
+
+               int rowsInDb = countRows(fullTableName);
+               assertEquals(1, rowsInDb);
+
+               sink.stop();
+               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+               dropTable(fullTableName);
+       }
+
+       @Test
+       public void testKeyGenerator() throws EventDeliveryException, 
SQLException {
+
+               final String fullTableName = "FLUME_JSON_TEST";
+               initSinkContextWithDefaults(fullTableName);
+
+               sink = new PhoenixSink();
+               Configurables.configure(sink, sinkContext);
+
+               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+               final Channel channel = this.initChannel();
+               sink.setChannel(channel);
+
+               sink.start();
+               final String eventBody = "{\"col1\" : \"kalyan\", \"col2\" : 
10.5, \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}";
+               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
+               // put event in channel
+               Transaction transaction = channel.getTransaction();
+               transaction.begin();
+               channel.put(event);
+               transaction.commit();
+               transaction.close();
+
+               sink.process();
+
+               int rowsInDb = countRows(fullTableName);
+               assertEquals(1, rowsInDb);
+
+               sink.stop();
+               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+               dropTable(fullTableName);
+       }
+
+       @Test
+       public void testMismatchKeyGenerator() throws EventDeliveryException, 
SQLException {
+
+               final String fullTableName = "FLUME_JSON_TEST";
+               initSinkContextWithDefaults(fullTableName);
+               setConfig(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,
+                               DefaultKeyGenerator.UUID.name());
+
+               sink = new PhoenixSink();
+               Configurables.configure(sink, sinkContext);
+               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+               final Channel channel = this.initChannel();
+               sink.setChannel(channel);
+
+               sink.start();
+               final String eventBody = "{\"col1\" : \"kalyan\", \"col2\" : 
10.5, \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}";
+               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
+               // put event in channel
+               Transaction transaction = channel.getTransaction();
+               transaction.begin();
+               channel.put(event);
+               transaction.commit();
+               transaction.close();
+
+               try {
+                       sink.process();
+                       fail();
+               } catch (Exception ex) {
+                       
assertTrue(ex.getCause().getMessage().contains("java.lang.IllegalArgumentException:
 Invalid format:"));
+               }
+
+               dropTable(fullTableName);
+       }
+
+       @Test
+       public void testMissingColumnsInEvent() throws EventDeliveryException, 
SQLException {
+
+               final String fullTableName = "FLUME_JSON_TEST";
+               initSinkContextWithDefaults(fullTableName);
+
+               sink = new PhoenixSink();
+               Configurables.configure(sink, sinkContext);
+               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+               final Channel channel = this.initChannel();
+               sink.setChannel(channel);
+
+               sink.start();
+               final String eventBody = "{\"col1\" : \"kalyan\", \"col3\" : 
[\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}";
+               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
+               // put event in channel
+               Transaction transaction = channel.getTransaction();
+               transaction.begin();
+               channel.put(event);
+               transaction.commit();
+               transaction.close();
+
+               sink.process();
+
+               int rowsInDb = countRows(fullTableName);
+               assertEquals(0, rowsInDb);
+
+               sink.stop();
+               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+               dropTable(fullTableName);
+       }
+
+       @Test
+       public void testBatchEvents() throws EventDeliveryException, 
SQLException {
+
+               final String fullTableName = "FLUME_JSON_TEST";
+               initSinkContextWithDefaults(fullTableName);
+
+               sink = new PhoenixSink();
+               Configurables.configure(sink, sinkContext);
+               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+               final Channel channel = this.initChannel();
+               sink.setChannel(channel);
+
+               sink.start();
+               int numEvents = 150;
+               String col1 = "val1";
+               String a1 = "[aaa,bbb,ccc]";
+               String a2 = "[1,2,3,4]";
+               String eventBody = null;
+               List<Event> eventList = 
Lists.newArrayListWithCapacity(numEvents);
+               for (int i = 0; i < eventList.size(); i++) {
+                       eventBody = "{\"col1\" : \"" + (col1 + i) + "\", 
\"col2\" : " + i * 10.5 + " , \"col3\" : " + a1
+                                       + " , \"col4\" : " + a2 + "}";
+                       Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
+                       eventList.add(event);
+               }
+
+               // put event in channel
+               Transaction transaction = channel.getTransaction();
+               transaction.begin();
+               for (Event event : eventList) {
+                       channel.put(event);
+               }
+               transaction.commit();
+               transaction.close();
+
+               sink.process();
+
+               int rowsInDb = countRows(fullTableName);
+               assertEquals(eventList.size(), rowsInDb);
+
+               sink.stop();
+               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+               dropTable(fullTableName);
+       }
+
+       @Test
+       public void testEventsWithHeaders() throws Exception {
+
+               sinkContext = new Context();
+               final String fullTableName = "FLUME_JSON_TEST";
+               final String ddl = "CREATE TABLE IF NOT EXISTS "
+                               + fullTableName
+                               + "  (rowkey VARCHAR not null, col1 varchar , 
col2 double, col3 varchar[], col4 integer[], host varchar , source varchar \n"
+                               + "  CONSTRAINT pk PRIMARY KEY (rowkey))\n";
+               String columns = "col1,col2,col3,col4";
+               String columnsMapping = 
"{\"col1\":\"col1\",\"col2\":\"col2\",\"col3\":\"col3\",\"col4\":\"col4\"}";
+               String rowkeyType = DefaultKeyGenerator.UUID.name();
+               String headers = "host,source";
+               initSinkContext(fullTableName, ddl, columns, columnsMapping, 
rowkeyType, headers);
+
+               sink = new PhoenixSink();
+               Configurables.configure(sink, sinkContext);
+               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+               final Channel channel = this.initChannel();
+               sink.setChannel(channel);
+
+               sink.start();
+
+               int numEvents = 10;
+               String col1 = "val1";
+               String a1 = "[aaa,bbb,ccc]";
+               String a2 = "[1,2,3,4]";
+               String hostHeader = "host1";
+               String sourceHeader = "source1";
+               String eventBody = null;
+               List<Event> eventList = 
Lists.newArrayListWithCapacity(numEvents);
+               for (int i = 0; i < numEvents; i++) {
+                       eventBody = "{\"col1\" : \"" + (col1 + i) + "\", 
\"col2\" : " + i * 10.5 + " , \"col3\" : " + a1
+                                       + " , \"col4\" : " + a2 + "}";
+                       Map<String, String> headerMap = 
Maps.newHashMapWithExpectedSize(2);
+                       headerMap.put("host", hostHeader);
+                       headerMap.put("source", sourceHeader);
+                       Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody), headerMap);
+                       eventList.add(event);
+               }
+
+               // put event in channel
+               Transaction transaction = channel.getTransaction();
+               transaction.begin();
+               for (Event event : eventList) {
+                       channel.put(event);
+               }
+               transaction.commit();
+               transaction.close();
+
+               sink.process();
+
+               final String query = " SELECT * FROM \n " + fullTableName;
+               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+               final ResultSet rs;
+               final Connection conn = DriverManager.getConnection(getUrl(), 
props);
+               try {
+                       rs = conn.createStatement().executeQuery(query);
+                       assertTrue(rs.next());
+                       assertEquals("host1", rs.getString("host"));
+                       assertEquals("source1", rs.getString("source"));
+
+                       assertTrue(rs.next());
+                       assertEquals("host1", rs.getString("host"));
+                       assertEquals("source1", rs.getString("source"));
+               } finally {
+                       if (conn != null) {
+                               conn.close();
+                       }
+               }
+               sink.stop();
+               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+               dropTable(fullTableName);
+       }
+
+       private Channel initChannel() {
+               // Channel configuration
+               Context channelContext = new Context();
+               channelContext.put("capacity", "10000");
+               channelContext.put("transactionCapacity", "200");
+
+               Channel channel = new MemoryChannel();
+               channel.setName("memorychannel");
+               Configurables.configure(channel, channelContext);
+               return channel;
+       }
+
+       private void initSinkContext(final String fullTableName, final String 
ddl, final String columns,
+                       final String columnsMapping, final String rowkeyType, 
final String headers) {
+               Preconditions.checkNotNull(fullTableName);
+               sinkContext = new Context();
+               sinkContext.put(FlumeConstants.CONFIG_TABLE, fullTableName);
+               sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
+               sinkContext.put(FlumeConstants.CONFIG_SERIALIZER, 
EventSerializers.JSON.name());
+               sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl);
+               sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_COLUMN_NAMES, columns);
+               if (null != columnsMapping)
+                       sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CONFIG_COLUMNS_MAPPING,
+                                       columnsMapping);
+               if (null != rowkeyType)
+                       sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,
+                                       rowkeyType);
+               if (null != headers)
+                       sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CONFIG_HEADER_NAMES, headers);
+       }
+
+       private void initSinkContextWithDefaults(final String fullTableName) {
+               String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
+                               + "  (flume_time timestamp not null, col1 
varchar , col2 double, col3 varchar[], col4 integer[]"
+                               + "  CONSTRAINT pk PRIMARY KEY (flume_time))\n";
+               String columns = "col1,col2,col3,col4";
+               String columnsMapping = 
"{\"col1\":\"col1\",\"col2\":\"col2\",\"col3\":\"col3\",\"col4\":\"col4\"}";
+               String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
+               initSinkContext(fullTableName, ddl, columns, columnsMapping, 
rowkeyType, null);
+       }
+
+       private void setConfig(final String configName, final String 
configValue) {
+               Preconditions.checkNotNull(sinkContext);
+               Preconditions.checkNotNull(configName);
+               Preconditions.checkNotNull(configValue);
+               sinkContext.put(configName, configValue);
+       }
+
+       private int countRows(final String fullTableName) throws SQLException {
+               Preconditions.checkNotNull(fullTableName);
+               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+               final Connection conn = DriverManager.getConnection(getUrl(), 
props);
+               ResultSet rs = null;
+               try {
+                       rs = conn.createStatement().executeQuery("select 
count(*) from " + fullTableName);
+                       int rowsCount = 0;
+                       while (rs.next()) {
+                               rowsCount = rs.getInt(1);
+                       }
+                       return rowsCount;
+
+               } finally {
+                       if (rs != null) {
+                               rs.close();
+                       }
+                       if (conn != null) {
+                               conn.close();
+                       }
+               }
+
+       }
+
+       private void dropTable(final String fullTableName) throws SQLException {
+               Preconditions.checkNotNull(fullTableName);
+               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+               final Connection conn = DriverManager.getConnection(getUrl(), 
props);
+               try {
+                       conn.createStatement().execute("drop table if exists " 
+ fullTableName);
+               } finally {
+                       if (conn != null) {
+                               conn.close();
+                       }
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d9d53bc1/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java
----------------------------------------------------------------------
diff --git 
a/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java 
b/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java
index cf43a3b..f60d7dc 100644
--- a/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java
+++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java
@@ -62,7 +62,12 @@ public final class FlumeConstants {
     /** Whether to ignore case when performing regex matches. */
     public static final String IGNORE_CASE_CONFIG = "regexIgnoreCase";
     public static final boolean IGNORE_CASE_DEFAULT = false;
-
+    
+    /** JSON expression used to parse groups from event data. */
+    public static final String CONFIG_COLUMNS_MAPPING = "columnsMapping";
+    public static final String CONFIG_PARTIAL_SCHEMA = "partialSchema";
+    public static final String JSON_DEFAULT = "{}";
+    
     /** Comma separated list of column names . */
     public static final String CONFIG_COLUMN_NAMES = "columns";
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d9d53bc1/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java
 
b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java
index fddcba5..24527e3 100644
--- 
a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java
+++ 
b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java
@@ -157,7 +157,9 @@ public abstract class BaseEventSerializer implements 
EventSerializer {
             while (rs.next()) {
                 cf = rs.getString(QueryUtil.COLUMN_FAMILY_POSITION);
                 cq = rs.getString(QueryUtil.COLUMN_NAME_POSITION);
-                dt = rs.getInt(QueryUtil.DATA_TYPE_POSITION);
+                // TODO: Fix this .. change `DATA_TYPE_POSITION` value 5 to 26
+                // dt = rs.getInt(QueryUtil.DATA_TYPE_POSITION);
+                dt = rs.getInt(26);
                 if(Strings.isNullOrEmpty(cf)) {
                     rowkey = cq; // this is required only when row key is auto 
generated
                 } else {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d9d53bc1/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
----------------------------------------------------------------------
diff --git 
a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
 
b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
index eb0078d..68a609b 100644
--- 
a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
+++ 
b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
@@ -19,7 +19,7 @@ package org.apache.phoenix.flume.serializer;
 
 public enum EventSerializers {
 
-    REGEX(RegexEventSerializer.class.getName());
+    REGEX(RegexEventSerializer.class.getName()), 
JSON(JsonEventSerializer.class.getName());
     
     private final String className;
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d9d53bc1/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/JsonEventSerializer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/JsonEventSerializer.java
 
b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/JsonEventSerializer.java
new file mode 100644
index 0000000..9226017
--- /dev/null
+++ 
b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/JsonEventSerializer.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.flume.serializer;
+
+import static org.apache.phoenix.flume.FlumeConstants.JSON_DEFAULT;
+import static org.apache.phoenix.flume.FlumeConstants.CONFIG_COLUMNS_MAPPING;
+import static org.apache.phoenix.flume.FlumeConstants.CONFIG_PARTIAL_SCHEMA;
+
+import java.sql.Array;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.phoenix.schema.types.PDataType;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.spi.json.JsonOrgJsonProvider;
+import com.jayway.jsonpath.spi.mapper.JsonOrgMappingProvider;
+
+public class JsonEventSerializer extends BaseEventSerializer {
+
+       private static final Logger logger = 
LoggerFactory.getLogger(JsonEventSerializer.class);
+
+       private JSONObject jsonSchema;
+       private boolean isProperMapping;
+       private boolean partialSchema;
+
+       /**
+     * 
+     */
+       @Override
+       public void doConfigure(Context context) {
+               final String jsonData = 
context.getString(CONFIG_COLUMNS_MAPPING, JSON_DEFAULT);
+               try {
+                       jsonSchema = new JSONObject(jsonData);
+                       if (jsonSchema.length() == 0) {
+                               for (String colName : colNames) {
+                                       jsonSchema.put(colName, colName);
+                               }
+                               isProperMapping = true;
+                       } else {
+                               Iterator<String> keys = jsonSchema.keys();
+                               List<String> keylist = new ArrayList<String>();
+                               while (keys.hasNext()) {
+                                       keylist.add(keys.next());
+                               }
+                               isProperMapping = 
CollectionUtils.isEqualCollection(keylist, colNames);
+                       }
+               } catch (JSONException e) {
+                       e.printStackTrace();
+                       logger.debug("json mapping not proper, verify the data 
{} ", jsonData);
+               }
+               partialSchema = context.getBoolean(CONFIG_PARTIAL_SCHEMA, 
false);
+       }
+
+       /**
+     * 
+     */
+       @Override
+       public void doInitialize() throws SQLException {
+               // NO-OP
+       }
+
+       @Override
+       public void upsertEvents(List<Event> events) throws SQLException {
+               Preconditions.checkNotNull(events);
+               Preconditions.checkNotNull(connection);
+               Preconditions.checkNotNull(this.upsertStatement);
+               Preconditions.checkArgument(isProperMapping, "Please verify 
fields mapping is not properly done..");
+
+               boolean wasAutoCommit = connection.getAutoCommit();
+               connection.setAutoCommit(false);
+               try (PreparedStatement colUpsert = 
connection.prepareStatement(upsertStatement)) {
+                       String value = null;
+                       Integer sqlType = null;
+                       JSONObject inputJson = new JSONObject();
+                       for (Event event : events) {
+                               byte[] payloadBytes = event.getBody();
+                               if (payloadBytes == null || payloadBytes.length 
== 0) {
+                                       continue;
+                               }
+                               String payload = new String(payloadBytes);
+
+                               try {
+                                       inputJson = new JSONObject(payload);
+                               } catch (Exception e) {
+                                       logger.debug("payload is not proper 
json");
+                                       continue;
+                               }
+
+                               Map<String, String> data = new HashMap<String, 
String>();
+                               for (String colName : colNames) {
+                                       String pattern = colName;
+                                       if (jsonSchema.has(colName)) {
+                                               Object obj = 
jsonSchema.opt(colName);
+                                               if (null != obj) {
+                                                       pattern = 
obj.toString();
+                                               }
+                                       }
+                                       pattern = "$." + pattern;
+                                       value = getPatternData(inputJson, 
pattern);
+
+                                       // if field mapping data is null then 
look for column data
+                                       if (null == value && partialSchema) {
+                                               pattern = "$." + colName;
+                                               value = 
getPatternData(inputJson, pattern);
+                                       }
+
+                                       data.put(colName, value);
+                               }
+
+                               Collection<String> values = data.values();
+                               if (values.contains(null)) {
+                                       logger.debug("payload data {} doesn't 
match the fields mapping {} ", inputJson, jsonSchema);
+                                       continue;
+                               }
+
+                               int index = 1;
+                               int offset = 0;
+                               for (int i = 0; i < colNames.size(); i++, 
offset++) {
+                                       if (columnMetadata[offset] == null) {
+                                               continue;
+                                       }
+                                       String colName = colNames.get(i);
+                                       value = data.get(colName);
+                                       sqlType = 
columnMetadata[offset].getSqlType();
+                                       PDataType pDataType = 
PDataType.fromTypeId(sqlType);
+                                       Object upsertValue;
+                                       if (pDataType.isArrayType()) {
+                                               JSONArray jsonArray = new 
JSONArray(new JSONTokener(value));
+                                               Object[] vals = new 
Object[jsonArray.length()];
+                                               for (int x = 0; x < 
jsonArray.length(); x++) {
+                                                       vals[x] = 
jsonArray.get(x);
+                                               }
+                                               String baseTypeSqlName = 
PDataType.arrayBaseType(pDataType).getSqlTypeName();
+                                               Array array = 
connection.createArrayOf(baseTypeSqlName, vals);
+                                               upsertValue = 
pDataType.toObject(array, pDataType);
+                                       } else {
+                                               upsertValue = 
pDataType.toObject(value);
+                                       }
+                                       if (upsertValue != null) {
+                                               colUpsert.setObject(index++, 
upsertValue, sqlType);
+                                       } else {
+                                               colUpsert.setNull(index++, 
sqlType);
+                                       }
+                               }
+
+                               // add headers if necessary
+                               Map<String, String> headerValues = 
event.getHeaders();
+                               for (int i = 0; i < headers.size(); i++, 
offset++) {
+                                       String headerName = headers.get(i);
+                                       String headerValue = 
headerValues.get(headerName);
+                                       sqlType = 
columnMetadata[offset].getSqlType();
+                                       Object upsertValue = 
PDataType.fromTypeId(sqlType).toObject(headerValue);
+                                       if (upsertValue != null) {
+                                               colUpsert.setObject(index++, 
upsertValue, sqlType);
+                                       } else {
+                                               colUpsert.setNull(index++, 
sqlType);
+                                       }
+                               }
+
+                               if (autoGenerateKey) {
+                                       sqlType = 
columnMetadata[offset].getSqlType();
+                                       String generatedRowValue = 
this.keyGenerator.generate();
+                                       Object rowkeyValue = 
PDataType.fromTypeId(sqlType).toObject(generatedRowValue);
+                                       colUpsert.setObject(index++, 
rowkeyValue, sqlType);
+                               }
+                               colUpsert.execute();
+                       }
+                       connection.commit();
+               } catch (Exception ex) {
+                       logger.error("An error {} occurred during persisting 
the event ", ex.getMessage());
+                       throw new SQLException(ex.getMessage());
+               } finally {
+                       if (wasAutoCommit) {
+                               connection.setAutoCommit(true);
+                       }
+               }
+
+       }
+
+       private String getPatternData(JSONObject json, String pattern) {
+               Configuration JSON_ORG_CONFIGURATION = 
Configuration.builder().mappingProvider(new JsonOrgMappingProvider())
+                               .jsonProvider(new 
JsonOrgJsonProvider()).build();
+               String value;
+               try {
+                       Object object = 
JsonPath.using(JSON_ORG_CONFIGURATION).parse(json).read(pattern);
+                       value = object.toString();
+               } catch (Exception e) {
+                       value = null;
+               }
+               return value;
+       }
+
+}
\ No newline at end of file

Reply via email to