Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 2b01232bf -> 99358e7b4
PHOENIX-3135 Support loading csv data using Flume plugin (Kalyan Hadoop) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4fa414ea Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4fa414ea Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4fa414ea Branch: refs/heads/4.x-HBase-0.98 Commit: 4fa414eaaef2049ce1c8f5233c6fe0b085e93bea Parents: 2b01232 Author: Josh Mahonin <jmaho...@gmail.com> Authored: Wed Feb 15 09:12:08 2017 -0500 Committer: Josh Mahonin <jmaho...@gmail.com> Committed: Wed Feb 15 09:18:31 2017 -0500 ---------------------------------------------------------------------- phoenix-flume/pom.xml | 6 +- .../phoenix/flume/CsvEventSerializerIT.java | 416 +++++++++++++++++++ .../apache/phoenix/flume/FlumeConstants.java | 14 +- .../flume/serializer/CsvEventSerializer.java | 196 +++++++++ .../flume/serializer/EventSerializers.java | 4 +- 5 files changed, 631 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa414ea/phoenix-flume/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-flume/pom.xml b/phoenix-flume/pom.xml index 8046ff2..58f26f3 100644 --- a/phoenix-flume/pom.xml +++ b/phoenix-flume/pom.xml @@ -187,7 +187,11 @@ <artifactId>json-path</artifactId> <version>2.2.0</version> </dependency> - + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-csv</artifactId> + <version>${commons-csv.version}</version> + </dependency> <!-- Main dependency on flume. The last to avoid using old commons-io in IT --> <dependency> <groupId>org.apache.flume</groupId> http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa414ea/phoenix-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java ---------------------------------------------------------------------- diff --git a/phoenix-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java b/phoenix-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java new file mode 100644 index 0000000..842db04 --- /dev/null +++ b/phoenix-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.flume; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.Transaction; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.conf.Configurables; +import org.apache.flume.event.EventBuilder; +import org.apache.flume.lifecycle.LifecycleState; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; +import org.apache.phoenix.flume.serializer.EventSerializers; +import org.apache.phoenix.flume.sink.PhoenixSink; +import org.apache.phoenix.util.PropertiesUtil; +import org.junit.Test; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class CsvEventSerializerIT extends BaseHBaseManagedTimeIT { + + private Context sinkContext; + private PhoenixSink sink; + + @Test + public void testWithDefaultDelimiters() throws EventDeliveryException, SQLException { + + final String fullTableName = "FLUME_CSV_TEST"; + + String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName + + " (flume_time timestamp not null, col1 varchar , col2 double, col3 varchar[], col4 integer[]" + + " CONSTRAINT pk PRIMARY KEY (flume_time))\n"; + String columns = "col1,col2,col3,col4"; + String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name(); + initSinkContext(fullTableName, ddl, columns, null, null, null, null, rowkeyType, null); + + sink = new PhoenixSink(); + Configurables.configure(sink, sinkContext); + + assertEquals(LifecycleState.IDLE, sink.getLifecycleState()); + + final Channel channel = this.initChannel(); + sink.setChannel(channel); + + sink.start(); + + final String eventBody = "kalyan,10.5,\"abc,pqr,xyz\",\"1,2,3,4\""; + final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody)); + // put event in channel + Transaction transaction = channel.getTransaction(); + transaction.begin(); + channel.put(event); + transaction.commit(); + transaction.close(); + + sink.process(); + + int rowsInDb = countRows(fullTableName); + assertEquals(1, rowsInDb); + + sink.stop(); + assertEquals(LifecycleState.STOP, sink.getLifecycleState()); + + dropTable(fullTableName); + } + + @Test + public void testKeyGenerator() throws EventDeliveryException, SQLException { + + final String fullTableName = "FLUME_CSV_TEST"; + initSinkContextWithDefaults(fullTableName); + + sink = new PhoenixSink(); + Configurables.configure(sink, sinkContext); + + assertEquals(LifecycleState.IDLE, sink.getLifecycleState()); + + final Channel channel = this.initChannel(); + sink.setChannel(channel); + + sink.start(); + final String eventBody = "kalyan,10.5,\"abc,pqr,xyz\",\"1,2,3,4\""; + final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody)); + // put event in channel + Transaction transaction = channel.getTransaction(); + transaction.begin(); + channel.put(event); + transaction.commit(); + transaction.close(); + + sink.process(); + + int rowsInDb = countRows(fullTableName); + assertEquals(1, rowsInDb); + + sink.stop(); + assertEquals(LifecycleState.STOP, sink.getLifecycleState()); + + dropTable(fullTableName); + } + + @Test + public void testMismatchKeyGenerator() throws EventDeliveryException, SQLException { + + final String fullTableName = "FLUME_CSV_TEST"; + initSinkContextWithDefaults(fullTableName); + setConfig(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR, + DefaultKeyGenerator.UUID.name()); + + sink = new PhoenixSink(); + Configurables.configure(sink, sinkContext); + assertEquals(LifecycleState.IDLE, sink.getLifecycleState()); + + final Channel channel = this.initChannel(); + sink.setChannel(channel); + + sink.start(); + final String eventBody = "kalyan,10.5,\"abc,pqr,xyz\",\"1,2,3,4\""; + final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody)); + // put event in channel + Transaction transaction = channel.getTransaction(); + transaction.begin(); + channel.put(event); + transaction.commit(); + transaction.close(); + + try { + sink.process(); + fail(); + } catch (Exception ex) { + assertTrue(ex.getCause().getMessage().contains("java.lang.IllegalArgumentException: Invalid format:")); + } + + dropTable(fullTableName); + } + + @Test + public void testMissingColumnsInEvent() throws EventDeliveryException, SQLException { + + final String fullTableName = "FLUME_CSV_TEST"; + initSinkContextWithDefaults(fullTableName); + + sink = new PhoenixSink(); + Configurables.configure(sink, sinkContext); + assertEquals(LifecycleState.IDLE, sink.getLifecycleState()); + + final Channel channel = this.initChannel(); + sink.setChannel(channel); + + sink.start(); + final String eventBody = "kalyan,\"abc,pqr,xyz\",\"1,2,3,4\""; + final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody)); + // put event in channel + Transaction transaction = channel.getTransaction(); + transaction.begin(); + channel.put(event); + transaction.commit(); + transaction.close(); + + sink.process(); + + int rowsInDb = countRows(fullTableName); + assertEquals(0, rowsInDb); + + sink.stop(); + assertEquals(LifecycleState.STOP, sink.getLifecycleState()); + + dropTable(fullTableName); + } + + @Test + public void testBatchEvents() throws EventDeliveryException, SQLException { + + final String fullTableName = "FLUME_CSV_TEST"; + initSinkContextWithDefaults(fullTableName); + + sink = new PhoenixSink(); + Configurables.configure(sink, sinkContext); + assertEquals(LifecycleState.IDLE, sink.getLifecycleState()); + + final Channel channel = this.initChannel(); + sink.setChannel(channel); + + sink.start(); + int numEvents = 150; + String col1 = "val1"; + String a1 = "\"aaa,bbb,ccc\""; + String a2 = "\"1,2,3,4\""; + String eventBody = null; + List<Event> eventList = Lists.newArrayListWithCapacity(numEvents); + for (int i = 0; i < eventList.size(); i++) { + eventBody = (col1 + i) + "," + i * 10.5 + "," + a1 + "," + a2; + Event event = EventBuilder.withBody(Bytes.toBytes(eventBody)); + eventList.add(event); + } + + // put event in channel + Transaction transaction = channel.getTransaction(); + transaction.begin(); + for (Event event : eventList) { + channel.put(event); + } + transaction.commit(); + transaction.close(); + + sink.process(); + + int rowsInDb = countRows(fullTableName); + assertEquals(eventList.size(), rowsInDb); + + sink.stop(); + assertEquals(LifecycleState.STOP, sink.getLifecycleState()); + + dropTable(fullTableName); + } + + @Test + public void testEventsWithHeaders() throws Exception { + + sinkContext = new Context(); + final String fullTableName = "FLUME_CSV_TEST"; + final String ddl = "CREATE TABLE IF NOT EXISTS " + + fullTableName + + " (rowkey VARCHAR not null, col1 varchar , col2 double, col3 varchar[], col4 integer[], host varchar , source varchar \n" + + " CONSTRAINT pk PRIMARY KEY (rowkey))\n"; + String columns = "col1,col2,col3,col4"; + String rowkeyType = DefaultKeyGenerator.UUID.name(); + String headers = "host,source"; + initSinkContext(fullTableName, ddl, columns, null, null, null, null, rowkeyType, headers); + + sink = new PhoenixSink(); + Configurables.configure(sink, sinkContext); + assertEquals(LifecycleState.IDLE, sink.getLifecycleState()); + + final Channel channel = this.initChannel(); + sink.setChannel(channel); + + sink.start(); + + int numEvents = 10; + String col1 = "val1"; + String a1 = "\"aaa,bbb,ccc\""; + String a2 = "\"1,2,3,4\""; + String hostHeader = "host1"; + String sourceHeader = "source1"; + String eventBody = null; + List<Event> eventList = Lists.newArrayListWithCapacity(numEvents); + for (int i = 0; i < numEvents; i++) { + eventBody = (col1 + i) + "," + i * 10.5 + "," + a1 + "," + a2; + Map<String, String> headerMap = Maps.newHashMapWithExpectedSize(2); + headerMap.put("host", hostHeader); + headerMap.put("source", sourceHeader); + Event event = EventBuilder.withBody(Bytes.toBytes(eventBody), headerMap); + eventList.add(event); + } + + // put event in channel + Transaction transaction = channel.getTransaction(); + transaction.begin(); + for (Event event : eventList) { + channel.put(event); + } + transaction.commit(); + transaction.close(); + + sink.process(); + + final String query = " SELECT * FROM \n " + fullTableName; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + final ResultSet rs; + final Connection conn = DriverManager.getConnection(getUrl(), props); + try { + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("host1", rs.getString("host")); + assertEquals("source1", rs.getString("source")); + + assertTrue(rs.next()); + assertEquals("host1", rs.getString("host")); + assertEquals("source1", rs.getString("source")); + } finally { + if (conn != null) { + conn.close(); + } + } + sink.stop(); + assertEquals(LifecycleState.STOP, sink.getLifecycleState()); + + dropTable(fullTableName); + } + + private Channel initChannel() { + // Channel configuration + Context channelContext = new Context(); + channelContext.put("capacity", "10000"); + channelContext.put("transactionCapacity", "200"); + + Channel channel = new MemoryChannel(); + channel.setName("memorychannel"); + Configurables.configure(channel, channelContext); + return channel; + } + + private void initSinkContext(final String fullTableName, final String ddl, final String columns, + final String csvDelimiter, final String csvQuote, final String csvEscape, final String csvArrayDelimiter, + final String rowkeyType, final String headers) { + Preconditions.checkNotNull(fullTableName); + sinkContext = new Context(); + sinkContext.put(FlumeConstants.CONFIG_TABLE, fullTableName); + sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl()); + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER, EventSerializers.CSV.name()); + sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl); + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_COLUMN_NAMES, columns); + if (null != csvDelimiter) + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CSV_DELIMITER, csvDelimiter); + if (null != csvQuote) + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CSV_QUOTE, csvQuote); + if (null != csvEscape) + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CSV_ESCAPE, csvEscape); + if (null != csvArrayDelimiter) + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CSV_ARRAY_DELIMITER, + csvArrayDelimiter); + if (null != rowkeyType) + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR, + rowkeyType); + if (null != headers) + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_HEADER_NAMES, headers); + } + + private void initSinkContextWithDefaults(final String fullTableName) { + String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName + + " (flume_time timestamp not null, col1 varchar , col2 double, col3 varchar[], col4 integer[]" + + " CONSTRAINT pk PRIMARY KEY (flume_time))\n"; + String columns = "col1,col2,col3,col4"; + String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name(); + initSinkContext(fullTableName, ddl, columns, null, null, null, null, rowkeyType, null); + } + + private void setConfig(final String configName, final String configValue) { + Preconditions.checkNotNull(sinkContext); + Preconditions.checkNotNull(configName); + Preconditions.checkNotNull(configValue); + sinkContext.put(configName, configValue); + } + + private int countRows(final String fullTableName) throws SQLException { + Preconditions.checkNotNull(fullTableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + final Connection conn = DriverManager.getConnection(getUrl(), props); + ResultSet rs = null; + try { + rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName); + int rowsCount = 0; + while (rs.next()) { + rowsCount = rs.getInt(1); + } + return rowsCount; + + } finally { + if (rs != null) { + rs.close(); + } + if (conn != null) { + conn.close(); + } + } + + } + + private void dropTable(final String fullTableName) throws SQLException { + Preconditions.checkNotNull(fullTableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + final Connection conn = DriverManager.getConnection(getUrl(), props); + try { + conn.createStatement().execute("drop table if exists " + fullTableName); + } finally { + if (conn != null) { + conn.close(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa414ea/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java b/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java index f60d7dc..a146bbe 100644 --- a/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java +++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java @@ -62,12 +62,22 @@ public final class FlumeConstants { /** Whether to ignore case when performing regex matches. */ public static final String IGNORE_CASE_CONFIG = "regexIgnoreCase"; public static final boolean IGNORE_CASE_DEFAULT = false; - + /** JSON expression used to parse groups from event data. */ public static final String CONFIG_COLUMNS_MAPPING = "columnsMapping"; public static final String CONFIG_PARTIAL_SCHEMA = "partialSchema"; public static final String JSON_DEFAULT = "{}"; - + + /** CSV expression used to parse groups from event data. */ + public static final String CSV_DELIMITER = "csvDelimiter"; + public static final String CSV_DELIMITER_DEFAULT = ","; + public static final String CSV_QUOTE = "csvQuote"; + public static final String CSV_QUOTE_DEFAULT = "\""; + public static final String CSV_ESCAPE = "csvEscape"; + public static final String CSV_ESCAPE_DEFAULT = "\\"; + public static final String CSV_ARRAY_DELIMITER = "csvArrayDelimiter"; + public static final String CSV_ARRAY_DELIMITER_DEFAULT = ","; + /** Comma separated list of column names . */ public static final String CONFIG_COLUMN_NAMES = "columns"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa414ea/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java ---------------------------------------------------------------------- diff --git a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java new file mode 100644 index 0000000..1521084 --- /dev/null +++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.flume.serializer; + +import static org.apache.phoenix.flume.FlumeConstants.CSV_DELIMITER; +import static org.apache.phoenix.flume.FlumeConstants.CSV_DELIMITER_DEFAULT; +import static org.apache.phoenix.flume.FlumeConstants.CSV_QUOTE; +import static org.apache.phoenix.flume.FlumeConstants.CSV_QUOTE_DEFAULT; +import static org.apache.phoenix.flume.FlumeConstants.CSV_ESCAPE; +import static org.apache.phoenix.flume.FlumeConstants.CSV_ESCAPE_DEFAULT; +import static org.apache.phoenix.flume.FlumeConstants.CSV_ARRAY_DELIMITER; +import static org.apache.phoenix.flume.FlumeConstants.CSV_ARRAY_DELIMITER_DEFAULT; + +import java.io.IOException; +import java.io.StringReader; +import java.sql.Array; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.phoenix.schema.types.PDataType; +import org.json.JSONArray; +import org.json.JSONTokener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; + +public class CsvEventSerializer extends BaseEventSerializer { + + private static final Logger logger = LoggerFactory.getLogger(CsvEventSerializer.class); + + private String csvDelimiter; + private String csvQuote; + private String csvEscape; + private String csvArrayDelimiter; + private CsvLineParser csvLineParser; + + /** + * + */ + @Override + public void doConfigure(Context context) { + csvDelimiter = context.getString(CSV_DELIMITER, CSV_DELIMITER_DEFAULT); + csvQuote = context.getString(CSV_QUOTE, CSV_QUOTE_DEFAULT); + csvEscape = context.getString(CSV_ESCAPE, CSV_ESCAPE_DEFAULT); + csvArrayDelimiter = context.getString(CSV_ARRAY_DELIMITER, CSV_ARRAY_DELIMITER_DEFAULT); + csvLineParser = new CsvLineParser(csvDelimiter.toCharArray()[0], csvQuote.toCharArray()[0], + csvEscape.toCharArray()[0]); + } + + /** + * + */ + @Override + public void doInitialize() throws SQLException { + // NO-OP + } + + @Override + public void upsertEvents(List<Event> events) throws SQLException { + Preconditions.checkNotNull(events); + Preconditions.checkNotNull(connection); + Preconditions.checkNotNull(this.upsertStatement); + + boolean wasAutoCommit = connection.getAutoCommit(); + connection.setAutoCommit(false); + try (PreparedStatement colUpsert = connection.prepareStatement(upsertStatement)) { + String value = null; + Integer sqlType = null; + for (Event event : events) { + byte[] payloadBytes = event.getBody(); + if (payloadBytes == null || payloadBytes.length == 0) { + continue; + } + String payload = new String(payloadBytes); + CSVRecord csvRecord = csvLineParser.parse(payload); + if (colNames.size() != csvRecord.size()) { + logger.debug("payload data {} doesn't match the fields mapping {} ", payload, colNames); + continue; + } + Map<String, String> data = new HashMap<String, String>(); + for (int i = 0; i < csvRecord.size(); i++) { + data.put(colNames.get(i), csvRecord.get(i)); + } + Collection<String> values = data.values(); + if (values.contains(null)) { + logger.debug("payload data {} doesn't match the fields mapping {} ", payload, colNames); + continue; + } + + int index = 1; + int offset = 0; + for (int i = 0; i < colNames.size(); i++, offset++) { + if (columnMetadata[offset] == null) { + continue; + } + String colName = colNames.get(i); + value = data.get(colName); + sqlType = columnMetadata[offset].getSqlType(); + PDataType pDataType = PDataType.fromTypeId(sqlType); + Object upsertValue; + if (pDataType.isArrayType()) { + String arrayJson = Arrays.toString(value.split(csvArrayDelimiter)); + JSONArray jsonArray = new JSONArray(new JSONTokener(arrayJson)); + Object[] vals = new Object[jsonArray.length()]; + for (int x = 0; x < jsonArray.length(); x++) { + vals[x] = jsonArray.get(x); + } + String baseTypeSqlName = PDataType.arrayBaseType(pDataType).getSqlTypeName(); + Array array = connection.createArrayOf(baseTypeSqlName, vals); + upsertValue = pDataType.toObject(array, pDataType); + } else { + upsertValue = pDataType.toObject(value); + } + if (upsertValue != null) { + colUpsert.setObject(index++, upsertValue, sqlType); + } else { + colUpsert.setNull(index++, sqlType); + } + } + + // add headers if necessary + Map<String, String> headerValues = event.getHeaders(); + for (int i = 0; i < headers.size(); i++, offset++) { + String headerName = headers.get(i); + String headerValue = headerValues.get(headerName); + sqlType = columnMetadata[offset].getSqlType(); + Object upsertValue = PDataType.fromTypeId(sqlType).toObject(headerValue); + if (upsertValue != null) { + colUpsert.setObject(index++, upsertValue, sqlType); + } else { + colUpsert.setNull(index++, sqlType); + } + } + + if (autoGenerateKey) { + sqlType = columnMetadata[offset].getSqlType(); + String generatedRowValue = this.keyGenerator.generate(); + Object rowkeyValue = PDataType.fromTypeId(sqlType).toObject(generatedRowValue); + colUpsert.setObject(index++, rowkeyValue, sqlType); + } + colUpsert.execute(); + } + connection.commit(); + } catch (Exception ex) { + logger.error("An error {} occurred during persisting the event ", ex.getMessage()); + throw new SQLException(ex.getMessage()); + } finally { + if (wasAutoCommit) { + connection.setAutoCommit(true); + } + } + + } + + static class CsvLineParser { + private final CSVFormat csvFormat; + + CsvLineParser(char fieldDelimiter, char quote, char escape) { + this.csvFormat = CSVFormat.DEFAULT.withIgnoreEmptyLines(true).withDelimiter(fieldDelimiter) + .withEscape(escape).withQuote(quote); + } + + public CSVRecord parse(String input) throws IOException { + CSVParser csvParser = new CSVParser(new StringReader(input), this.csvFormat); + return ((CSVRecord) Iterables.getFirst(csvParser, null)); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/4fa414ea/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java ---------------------------------------------------------------------- diff --git a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java index 68a609b..8c99d7d 100644 --- a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java +++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java @@ -19,7 +19,7 @@ package org.apache.phoenix.flume.serializer; public enum EventSerializers { - REGEX(RegexEventSerializer.class.getName()), JSON(JsonEventSerializer.class.getName()); + REGEX(RegexEventSerializer.class.getName()), JSON(JsonEventSerializer.class.getName()), CSV(CsvEventSerializer.class.getName()); private final String className; @@ -33,4 +33,4 @@ public enum EventSerializers { public String getClassName() { return className; } -} +} \ No newline at end of file