Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 2b01232bf -> 99358e7b4


PHOENIX-3135 Support loading csv data using Flume plugin (Kalyan Hadoop)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4fa414ea
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4fa414ea
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4fa414ea

Branch: refs/heads/4.x-HBase-0.98
Commit: 4fa414eaaef2049ce1c8f5233c6fe0b085e93bea
Parents: 2b01232
Author: Josh Mahonin <jmaho...@gmail.com>
Authored: Wed Feb 15 09:12:08 2017 -0500
Committer: Josh Mahonin <jmaho...@gmail.com>
Committed: Wed Feb 15 09:18:31 2017 -0500

----------------------------------------------------------------------
 phoenix-flume/pom.xml                           |   6 +-
 .../phoenix/flume/CsvEventSerializerIT.java     | 416 +++++++++++++++++++
 .../apache/phoenix/flume/FlumeConstants.java    |  14 +-
 .../flume/serializer/CsvEventSerializer.java    | 196 +++++++++
 .../flume/serializer/EventSerializers.java      |   4 +-
 5 files changed, 631 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa414ea/phoenix-flume/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-flume/pom.xml b/phoenix-flume/pom.xml
index 8046ff2..58f26f3 100644
--- a/phoenix-flume/pom.xml
+++ b/phoenix-flume/pom.xml
@@ -187,7 +187,11 @@
       <artifactId>json-path</artifactId>
       <version>2.2.0</version>
     </dependency>
-
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-csv</artifactId>
+      <version>${commons-csv.version}</version>
+    </dependency>
     <!-- Main dependency on flume. The last to avoid using old commons-io in 
IT -->
     <dependency>
       <groupId>org.apache.flume</groupId>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa414ea/phoenix-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java 
b/phoenix-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java
new file mode 100644
index 0000000..842db04
--- /dev/null
+++ 
b/phoenix-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.flume;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.flume.serializer.EventSerializers;
+import org.apache.phoenix.flume.sink.PhoenixSink;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Test;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class CsvEventSerializerIT extends BaseHBaseManagedTimeIT {
+
+       private Context sinkContext;
+       private PhoenixSink sink;
+
+       @Test
+       public void testWithDefaultDelimiters() throws EventDeliveryException, 
SQLException {
+
+               final String fullTableName = "FLUME_CSV_TEST";
+
+               String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
+                               + "  (flume_time timestamp not null, col1 
varchar , col2 double, col3 varchar[], col4 integer[]"
+                               + "  CONSTRAINT pk PRIMARY KEY (flume_time))\n";
+               String columns = "col1,col2,col3,col4";
+               String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
+               initSinkContext(fullTableName, ddl, columns, null, null, null, 
null, rowkeyType, null);
+
+               sink = new PhoenixSink();
+               Configurables.configure(sink, sinkContext);
+
+               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+               final Channel channel = this.initChannel();
+               sink.setChannel(channel);
+
+               sink.start();
+
+               final String eventBody = 
"kalyan,10.5,\"abc,pqr,xyz\",\"1,2,3,4\"";
+               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
+               // put event in channel
+               Transaction transaction = channel.getTransaction();
+               transaction.begin();
+               channel.put(event);
+               transaction.commit();
+               transaction.close();
+
+               sink.process();
+
+               int rowsInDb = countRows(fullTableName);
+               assertEquals(1, rowsInDb);
+
+               sink.stop();
+               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+               dropTable(fullTableName);
+       }
+
+       @Test
+       public void testKeyGenerator() throws EventDeliveryException, 
SQLException {
+
+               final String fullTableName = "FLUME_CSV_TEST";
+               initSinkContextWithDefaults(fullTableName);
+
+               sink = new PhoenixSink();
+               Configurables.configure(sink, sinkContext);
+
+               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+               final Channel channel = this.initChannel();
+               sink.setChannel(channel);
+
+               sink.start();
+               final String eventBody = 
"kalyan,10.5,\"abc,pqr,xyz\",\"1,2,3,4\"";
+               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
+               // put event in channel
+               Transaction transaction = channel.getTransaction();
+               transaction.begin();
+               channel.put(event);
+               transaction.commit();
+               transaction.close();
+
+               sink.process();
+
+               int rowsInDb = countRows(fullTableName);
+               assertEquals(1, rowsInDb);
+
+               sink.stop();
+               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+               dropTable(fullTableName);
+       }
+
+       @Test
+       public void testMismatchKeyGenerator() throws EventDeliveryException, 
SQLException {
+
+               final String fullTableName = "FLUME_CSV_TEST";
+               initSinkContextWithDefaults(fullTableName);
+               setConfig(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,
+                               DefaultKeyGenerator.UUID.name());
+
+               sink = new PhoenixSink();
+               Configurables.configure(sink, sinkContext);
+               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+               final Channel channel = this.initChannel();
+               sink.setChannel(channel);
+
+               sink.start();
+               final String eventBody = 
"kalyan,10.5,\"abc,pqr,xyz\",\"1,2,3,4\"";
+               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
+               // put event in channel
+               Transaction transaction = channel.getTransaction();
+               transaction.begin();
+               channel.put(event);
+               transaction.commit();
+               transaction.close();
+
+               try {
+                       sink.process();
+                       fail();
+               } catch (Exception ex) {
+                       
assertTrue(ex.getCause().getMessage().contains("java.lang.IllegalArgumentException:
 Invalid format:"));
+               }
+
+               dropTable(fullTableName);
+       }
+
+       @Test
+       public void testMissingColumnsInEvent() throws EventDeliveryException, 
SQLException {
+
+               final String fullTableName = "FLUME_CSV_TEST";
+               initSinkContextWithDefaults(fullTableName);
+
+               sink = new PhoenixSink();
+               Configurables.configure(sink, sinkContext);
+               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+               final Channel channel = this.initChannel();
+               sink.setChannel(channel);
+
+               sink.start();
+               final String eventBody = "kalyan,\"abc,pqr,xyz\",\"1,2,3,4\"";
+               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
+               // put event in channel
+               Transaction transaction = channel.getTransaction();
+               transaction.begin();
+               channel.put(event);
+               transaction.commit();
+               transaction.close();
+
+               sink.process();
+
+               int rowsInDb = countRows(fullTableName);
+               assertEquals(0, rowsInDb);
+
+               sink.stop();
+               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+               dropTable(fullTableName);
+       }
+
+       @Test
+       public void testBatchEvents() throws EventDeliveryException, 
SQLException {
+
+               final String fullTableName = "FLUME_CSV_TEST";
+               initSinkContextWithDefaults(fullTableName);
+
+               sink = new PhoenixSink();
+               Configurables.configure(sink, sinkContext);
+               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+               final Channel channel = this.initChannel();
+               sink.setChannel(channel);
+
+               sink.start();
+               int numEvents = 150;
+               String col1 = "val1";
+               String a1 = "\"aaa,bbb,ccc\"";
+               String a2 = "\"1,2,3,4\"";
+               String eventBody = null;
+               List<Event> eventList = 
Lists.newArrayListWithCapacity(numEvents);
+               for (int i = 0; i < eventList.size(); i++) {
+                       eventBody = (col1 + i) + "," + i * 10.5 + "," + a1 + 
"," + a2;
+                       Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
+                       eventList.add(event);
+               }
+
+               // put event in channel
+               Transaction transaction = channel.getTransaction();
+               transaction.begin();
+               for (Event event : eventList) {
+                       channel.put(event);
+               }
+               transaction.commit();
+               transaction.close();
+
+               sink.process();
+
+               int rowsInDb = countRows(fullTableName);
+               assertEquals(eventList.size(), rowsInDb);
+
+               sink.stop();
+               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+               dropTable(fullTableName);
+       }
+
+       @Test
+       public void testEventsWithHeaders() throws Exception {
+
+               sinkContext = new Context();
+               final String fullTableName = "FLUME_CSV_TEST";
+               final String ddl = "CREATE TABLE IF NOT EXISTS "
+                               + fullTableName
+                               + "  (rowkey VARCHAR not null, col1 varchar , 
col2 double, col3 varchar[], col4 integer[], host varchar , source varchar \n"
+                               + "  CONSTRAINT pk PRIMARY KEY (rowkey))\n";
+               String columns = "col1,col2,col3,col4";
+               String rowkeyType = DefaultKeyGenerator.UUID.name();
+               String headers = "host,source";
+               initSinkContext(fullTableName, ddl, columns, null, null, null, 
null, rowkeyType, headers);
+
+               sink = new PhoenixSink();
+               Configurables.configure(sink, sinkContext);
+               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+               final Channel channel = this.initChannel();
+               sink.setChannel(channel);
+
+               sink.start();
+
+               int numEvents = 10;
+               String col1 = "val1";
+               String a1 = "\"aaa,bbb,ccc\"";
+               String a2 = "\"1,2,3,4\"";
+               String hostHeader = "host1";
+               String sourceHeader = "source1";
+               String eventBody = null;
+               List<Event> eventList = 
Lists.newArrayListWithCapacity(numEvents);
+               for (int i = 0; i < numEvents; i++) {
+                       eventBody = (col1 + i) + "," + i * 10.5 + "," + a1 + 
"," + a2;
+                       Map<String, String> headerMap = 
Maps.newHashMapWithExpectedSize(2);
+                       headerMap.put("host", hostHeader);
+                       headerMap.put("source", sourceHeader);
+                       Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody), headerMap);
+                       eventList.add(event);
+               }
+
+               // put event in channel
+               Transaction transaction = channel.getTransaction();
+               transaction.begin();
+               for (Event event : eventList) {
+                       channel.put(event);
+               }
+               transaction.commit();
+               transaction.close();
+
+               sink.process();
+
+               final String query = " SELECT * FROM \n " + fullTableName;
+               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+               final ResultSet rs;
+               final Connection conn = DriverManager.getConnection(getUrl(), 
props);
+               try {
+                       rs = conn.createStatement().executeQuery(query);
+                       assertTrue(rs.next());
+                       assertEquals("host1", rs.getString("host"));
+                       assertEquals("source1", rs.getString("source"));
+
+                       assertTrue(rs.next());
+                       assertEquals("host1", rs.getString("host"));
+                       assertEquals("source1", rs.getString("source"));
+               } finally {
+                       if (conn != null) {
+                               conn.close();
+                       }
+               }
+               sink.stop();
+               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+               dropTable(fullTableName);
+       }
+
+       private Channel initChannel() {
+               // Channel configuration
+               Context channelContext = new Context();
+               channelContext.put("capacity", "10000");
+               channelContext.put("transactionCapacity", "200");
+
+               Channel channel = new MemoryChannel();
+               channel.setName("memorychannel");
+               Configurables.configure(channel, channelContext);
+               return channel;
+       }
+
+       private void initSinkContext(final String fullTableName, final String 
ddl, final String columns,
+                       final String csvDelimiter, final String csvQuote, final 
String csvEscape, final String csvArrayDelimiter,
+                       final String rowkeyType, final String headers) {
+               Preconditions.checkNotNull(fullTableName);
+               sinkContext = new Context();
+               sinkContext.put(FlumeConstants.CONFIG_TABLE, fullTableName);
+               sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
+               sinkContext.put(FlumeConstants.CONFIG_SERIALIZER, 
EventSerializers.CSV.name());
+               sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl);
+               sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_COLUMN_NAMES, columns);
+               if (null != csvDelimiter)
+                       sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CSV_DELIMITER, csvDelimiter);
+               if (null != csvQuote)
+                       sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CSV_QUOTE, csvQuote);
+               if (null != csvEscape)
+                       sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CSV_ESCAPE, csvEscape);
+               if (null != csvArrayDelimiter)
+                       sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CSV_ARRAY_DELIMITER,
+                                       csvArrayDelimiter);
+               if (null != rowkeyType)
+                       sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,
+                                       rowkeyType);
+               if (null != headers)
+                       sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CONFIG_HEADER_NAMES, headers);
+       }
+
+       private void initSinkContextWithDefaults(final String fullTableName) {
+               String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
+                               + "  (flume_time timestamp not null, col1 
varchar , col2 double, col3 varchar[], col4 integer[]"
+                               + "  CONSTRAINT pk PRIMARY KEY (flume_time))\n";
+               String columns = "col1,col2,col3,col4";
+               String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
+               initSinkContext(fullTableName, ddl, columns, null, null, null, 
null, rowkeyType, null);
+       }
+
+       private void setConfig(final String configName, final String 
configValue) {
+               Preconditions.checkNotNull(sinkContext);
+               Preconditions.checkNotNull(configName);
+               Preconditions.checkNotNull(configValue);
+               sinkContext.put(configName, configValue);
+       }
+
+       private int countRows(final String fullTableName) throws SQLException {
+               Preconditions.checkNotNull(fullTableName);
+               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+               final Connection conn = DriverManager.getConnection(getUrl(), 
props);
+               ResultSet rs = null;
+               try {
+                       rs = conn.createStatement().executeQuery("select 
count(*) from " + fullTableName);
+                       int rowsCount = 0;
+                       while (rs.next()) {
+                               rowsCount = rs.getInt(1);
+                       }
+                       return rowsCount;
+
+               } finally {
+                       if (rs != null) {
+                               rs.close();
+                       }
+                       if (conn != null) {
+                               conn.close();
+                       }
+               }
+
+       }
+
+       private void dropTable(final String fullTableName) throws SQLException {
+               Preconditions.checkNotNull(fullTableName);
+               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+               final Connection conn = DriverManager.getConnection(getUrl(), 
props);
+               try {
+                       conn.createStatement().execute("drop table if exists " 
+ fullTableName);
+               } finally {
+                       if (conn != null) {
+                               conn.close();
+                       }
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa414ea/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java
----------------------------------------------------------------------
diff --git 
a/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java 
b/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java
index f60d7dc..a146bbe 100644
--- a/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java
+++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java
@@ -62,12 +62,22 @@ public final class FlumeConstants {
     /** Whether to ignore case when performing regex matches. */
     public static final String IGNORE_CASE_CONFIG = "regexIgnoreCase";
     public static final boolean IGNORE_CASE_DEFAULT = false;
-    
+
     /** JSON expression used to parse groups from event data. */
     public static final String CONFIG_COLUMNS_MAPPING = "columnsMapping";
     public static final String CONFIG_PARTIAL_SCHEMA = "partialSchema";
     public static final String JSON_DEFAULT = "{}";
-    
+
+    /** CSV expression used to parse groups from event data. */
+    public static final String CSV_DELIMITER = "csvDelimiter";
+    public static final String CSV_DELIMITER_DEFAULT = ",";
+    public static final String CSV_QUOTE = "csvQuote";
+    public static final String CSV_QUOTE_DEFAULT = "\"";
+    public static final String CSV_ESCAPE = "csvEscape";
+    public static final String CSV_ESCAPE_DEFAULT = "\\";
+    public static final String CSV_ARRAY_DELIMITER = "csvArrayDelimiter";
+    public static final String CSV_ARRAY_DELIMITER_DEFAULT = ",";
+
     /** Comma separated list of column names . */
     public static final String CONFIG_COLUMN_NAMES = "columns";
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa414ea/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
 
b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
new file mode 100644
index 0000000..1521084
--- /dev/null
+++ 
b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.flume.serializer;
+
+import static org.apache.phoenix.flume.FlumeConstants.CSV_DELIMITER;
+import static org.apache.phoenix.flume.FlumeConstants.CSV_DELIMITER_DEFAULT;
+import static org.apache.phoenix.flume.FlumeConstants.CSV_QUOTE;
+import static org.apache.phoenix.flume.FlumeConstants.CSV_QUOTE_DEFAULT;
+import static org.apache.phoenix.flume.FlumeConstants.CSV_ESCAPE;
+import static org.apache.phoenix.flume.FlumeConstants.CSV_ESCAPE_DEFAULT;
+import static org.apache.phoenix.flume.FlumeConstants.CSV_ARRAY_DELIMITER;
+import static 
org.apache.phoenix.flume.FlumeConstants.CSV_ARRAY_DELIMITER_DEFAULT;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.sql.Array;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.phoenix.schema.types.PDataType;
+import org.json.JSONArray;
+import org.json.JSONTokener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+public class CsvEventSerializer extends BaseEventSerializer {
+
+       private static final Logger logger = 
LoggerFactory.getLogger(CsvEventSerializer.class);
+
+       private String csvDelimiter;
+       private String csvQuote;
+       private String csvEscape;
+       private String csvArrayDelimiter;
+       private CsvLineParser csvLineParser;
+
+       /**
+     * 
+     */
+       @Override
+       public void doConfigure(Context context) {
+               csvDelimiter = context.getString(CSV_DELIMITER, 
CSV_DELIMITER_DEFAULT);
+               csvQuote = context.getString(CSV_QUOTE, CSV_QUOTE_DEFAULT);
+               csvEscape = context.getString(CSV_ESCAPE, CSV_ESCAPE_DEFAULT);
+               csvArrayDelimiter = context.getString(CSV_ARRAY_DELIMITER, 
CSV_ARRAY_DELIMITER_DEFAULT);
+               csvLineParser = new 
CsvLineParser(csvDelimiter.toCharArray()[0], csvQuote.toCharArray()[0],
+                               csvEscape.toCharArray()[0]);
+       }
+
+       /**
+     * 
+     */
+       @Override
+       public void doInitialize() throws SQLException {
+               // NO-OP
+       }
+
+       @Override
+       public void upsertEvents(List<Event> events) throws SQLException {
+               Preconditions.checkNotNull(events);
+               Preconditions.checkNotNull(connection);
+               Preconditions.checkNotNull(this.upsertStatement);
+
+               boolean wasAutoCommit = connection.getAutoCommit();
+               connection.setAutoCommit(false);
+               try (PreparedStatement colUpsert = 
connection.prepareStatement(upsertStatement)) {
+                       String value = null;
+                       Integer sqlType = null;
+                       for (Event event : events) {
+                               byte[] payloadBytes = event.getBody();
+                               if (payloadBytes == null || payloadBytes.length 
== 0) {
+                                       continue;
+                               }
+                               String payload = new String(payloadBytes);
+                               CSVRecord csvRecord = 
csvLineParser.parse(payload);
+                               if (colNames.size() != csvRecord.size()) {
+                                       logger.debug("payload data {} doesn't 
match the fields mapping {} ", payload, colNames);
+                                       continue;
+                               }
+                               Map<String, String> data = new HashMap<String, 
String>();
+                               for (int i = 0; i < csvRecord.size(); i++) {
+                                       data.put(colNames.get(i), 
csvRecord.get(i));
+                               }
+                               Collection<String> values = data.values();
+                               if (values.contains(null)) {
+                                       logger.debug("payload data {} doesn't 
match the fields mapping {} ", payload, colNames);
+                                       continue;
+                               }
+
+                               int index = 1;
+                               int offset = 0;
+                               for (int i = 0; i < colNames.size(); i++, 
offset++) {
+                                       if (columnMetadata[offset] == null) {
+                                               continue;
+                                       }
+                                       String colName = colNames.get(i);
+                                       value = data.get(colName);
+                                       sqlType = 
columnMetadata[offset].getSqlType();
+                                       PDataType pDataType = 
PDataType.fromTypeId(sqlType);
+                                       Object upsertValue;
+                                       if (pDataType.isArrayType()) {
+                                               String arrayJson = 
Arrays.toString(value.split(csvArrayDelimiter));
+                                               JSONArray jsonArray = new 
JSONArray(new JSONTokener(arrayJson));
+                                               Object[] vals = new 
Object[jsonArray.length()];
+                                               for (int x = 0; x < 
jsonArray.length(); x++) {
+                                                       vals[x] = 
jsonArray.get(x);
+                                               }
+                                               String baseTypeSqlName = 
PDataType.arrayBaseType(pDataType).getSqlTypeName();
+                                               Array array = 
connection.createArrayOf(baseTypeSqlName, vals);
+                                               upsertValue = 
pDataType.toObject(array, pDataType);
+                                       } else {
+                                               upsertValue = 
pDataType.toObject(value);
+                                       }
+                                       if (upsertValue != null) {
+                                               colUpsert.setObject(index++, 
upsertValue, sqlType);
+                                       } else {
+                                               colUpsert.setNull(index++, 
sqlType);
+                                       }
+                               }
+
+                               // add headers if necessary
+                               Map<String, String> headerValues = 
event.getHeaders();
+                               for (int i = 0; i < headers.size(); i++, 
offset++) {
+                                       String headerName = headers.get(i);
+                                       String headerValue = 
headerValues.get(headerName);
+                                       sqlType = 
columnMetadata[offset].getSqlType();
+                                       Object upsertValue = 
PDataType.fromTypeId(sqlType).toObject(headerValue);
+                                       if (upsertValue != null) {
+                                               colUpsert.setObject(index++, 
upsertValue, sqlType);
+                                       } else {
+                                               colUpsert.setNull(index++, 
sqlType);
+                                       }
+                               }
+
+                               if (autoGenerateKey) {
+                                       sqlType = 
columnMetadata[offset].getSqlType();
+                                       String generatedRowValue = 
this.keyGenerator.generate();
+                                       Object rowkeyValue = 
PDataType.fromTypeId(sqlType).toObject(generatedRowValue);
+                                       colUpsert.setObject(index++, 
rowkeyValue, sqlType);
+                               }
+                               colUpsert.execute();
+                       }
+                       connection.commit();
+               } catch (Exception ex) {
+                       logger.error("An error {} occurred during persisting 
the event ", ex.getMessage());
+                       throw new SQLException(ex.getMessage());
+               } finally {
+                       if (wasAutoCommit) {
+                               connection.setAutoCommit(true);
+                       }
+               }
+
+       }
+
+       static class CsvLineParser {
+               private final CSVFormat csvFormat;
+
+               CsvLineParser(char fieldDelimiter, char quote, char escape) {
+                       this.csvFormat = 
CSVFormat.DEFAULT.withIgnoreEmptyLines(true).withDelimiter(fieldDelimiter)
+                                       .withEscape(escape).withQuote(quote);
+               }
+
+               public CSVRecord parse(String input) throws IOException {
+                       CSVParser csvParser = new CSVParser(new 
StringReader(input), this.csvFormat);
+                       return ((CSVRecord) Iterables.getFirst(csvParser, 
null));
+               }
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa414ea/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
----------------------------------------------------------------------
diff --git 
a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
 
b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
index 68a609b..8c99d7d 100644
--- 
a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
+++ 
b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
@@ -19,7 +19,7 @@ package org.apache.phoenix.flume.serializer;
 
 public enum EventSerializers {
 
-    REGEX(RegexEventSerializer.class.getName()), 
JSON(JsonEventSerializer.class.getName());
+    REGEX(RegexEventSerializer.class.getName()), 
JSON(JsonEventSerializer.class.getName()), 
CSV(CsvEventSerializer.class.getName());
     
     private final String className;
     
@@ -33,4 +33,4 @@ public enum EventSerializers {
     public String getClassName() {
         return className;
     }
-}
+}
\ No newline at end of file

Reply via email to