PHOENIX-2084 Support loading JSON data using phoenix-flume (Kalyan Hadoop)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d9d53bc1 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d9d53bc1 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d9d53bc1 Branch: refs/heads/calcite Commit: d9d53bc1498a85ce73e89ba63d0bfaed96a186e3 Parents: 1a6a689 Author: Josh Mahonin <jmaho...@gmail.com> Authored: Thu Dec 15 14:38:15 2016 -0500 Committer: Josh Mahonin <jmaho...@gmail.com> Committed: Thu Dec 15 14:38:15 2016 -0500 ---------------------------------------------------------------------- phoenix-flume/pom.xml | 12 + .../phoenix/flume/JsonEventSerializerIT.java | 541 +++++++++++++++++++ .../apache/phoenix/flume/FlumeConstants.java | 7 +- .../flume/serializer/BaseEventSerializer.java | 4 +- .../flume/serializer/EventSerializers.java | 2 +- .../flume/serializer/JsonEventSerializer.java | 226 ++++++++ 6 files changed, 789 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d9d53bc1/phoenix-flume/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-flume/pom.xml b/phoenix-flume/pom.xml index ae8bb2c..5c4f197 100644 --- a/phoenix-flume/pom.xml +++ b/phoenix-flume/pom.xml @@ -167,6 +167,18 @@ <scope>test</scope> </dependency> + <!-- to work with json data using flume --> + <dependency> + <groupId>org.json</groupId> + <artifactId>json</artifactId> + <version>20160212</version> + </dependency> + <dependency> + <groupId>com.jayway.jsonpath</groupId> + <artifactId>json-path</artifactId> + <version>2.2.0</version> + </dependency> + <!-- Main dependency on flume. The last to avoid using old commons-io in IT --> <dependency> <groupId>org.apache.flume</groupId> http://git-wip-us.apache.org/repos/asf/phoenix/blob/d9d53bc1/phoenix-flume/src/it/java/org/apache/phoenix/flume/JsonEventSerializerIT.java ---------------------------------------------------------------------- diff --git a/phoenix-flume/src/it/java/org/apache/phoenix/flume/JsonEventSerializerIT.java b/phoenix-flume/src/it/java/org/apache/phoenix/flume/JsonEventSerializerIT.java new file mode 100644 index 0000000..0210bad --- /dev/null +++ b/phoenix-flume/src/it/java/org/apache/phoenix/flume/JsonEventSerializerIT.java @@ -0,0 +1,541 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.flume; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.Transaction; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.conf.Configurables; +import org.apache.flume.event.EventBuilder; +import org.apache.flume.lifecycle.LifecycleState; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; +import org.apache.phoenix.flume.serializer.EventSerializers; +import org.apache.phoenix.flume.sink.PhoenixSink; +import org.apache.phoenix.util.PropertiesUtil; +import org.junit.Test; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class JsonEventSerializerIT extends BaseHBaseManagedTimeIT { + + private Context sinkContext; + private PhoenixSink sink; + + @Test + public void testWithOutColumnsMapping() throws EventDeliveryException, SQLException { + + final String fullTableName = "FLUME_JSON_TEST"; + + String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName + + " (flume_time timestamp not null, col1 varchar , col2 double, col3 varchar[], col4 integer[]" + + " CONSTRAINT pk PRIMARY KEY (flume_time))\n"; + String columns = "col1,col2,col3,col4"; + String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name(); + initSinkContext(fullTableName, ddl, columns, null, rowkeyType, null); + + sink = new PhoenixSink(); + Configurables.configure(sink, sinkContext); + + assertEquals(LifecycleState.IDLE, sink.getLifecycleState()); + + final Channel channel = this.initChannel(); + sink.setChannel(channel); + + sink.start(); + final String eventBody = "{\"col1\" : \"kalyan\", \"col2\" : 10.5, \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}"; + final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody)); + // put event in channel + Transaction transaction = channel.getTransaction(); + transaction.begin(); + channel.put(event); + transaction.commit(); + transaction.close(); + + sink.process(); + + int rowsInDb = countRows(fullTableName); + assertEquals(1, rowsInDb); + + sink.stop(); + assertEquals(LifecycleState.STOP, sink.getLifecycleState()); + + dropTable(fullTableName); + } + + @Test + public void testDifferentColumnNames() throws EventDeliveryException, SQLException { + + final String fullTableName = "FLUME_JSON_TEST"; + + String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName + + " (flume_time timestamp not null, col1 varchar , col2 double, col3 varchar[], col4 integer[]" + + " CONSTRAINT pk PRIMARY KEY (flume_time))\n"; + String columns = "col1,col2,col3,col4"; + String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name(); + String columnsMapping = "{\"col1\":\"col1\",\"col2\":\"f2\",\"col3\":\"f3\",\"col4\":\"col4\"}"; + + initSinkContext(fullTableName, ddl, columns, columnsMapping, rowkeyType, null); + + sink = new PhoenixSink(); + Configurables.configure(sink, sinkContext); + + assertEquals(LifecycleState.IDLE, sink.getLifecycleState()); + + final Channel channel = this.initChannel(); + sink.setChannel(channel); + + sink.start(); + final String eventBody = "{\"col1\" : \"kalyan\", \"f2\" : 10.5, \"f3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}"; + final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody)); + // put event in channel + Transaction transaction = channel.getTransaction(); + transaction.begin(); + channel.put(event); + transaction.commit(); + transaction.close(); + + sink.process(); + + int rowsInDb = countRows(fullTableName); + assertEquals(1, rowsInDb); + + sink.stop(); + assertEquals(LifecycleState.STOP, sink.getLifecycleState()); + + dropTable(fullTableName); + } + + @Test + public void testInnerColumns() throws EventDeliveryException, SQLException { + + final String fullTableName = "FLUME_JSON_TEST"; + + String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName + + " (flume_time timestamp not null, col1 varchar , col2 double, col3 varchar[], col4 integer[]" + + " CONSTRAINT pk PRIMARY KEY (flume_time))\n"; + String columns = "col1,col2,col3,col4"; + String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name(); + String columnsMapping = "{\"col1\":\"col1\",\"col2\":\"x.y\",\"col3\":\"a.b1.c\",\"col4\":\"col4\"}"; + + initSinkContext(fullTableName, ddl, columns, columnsMapping, rowkeyType, null); + + sink = new PhoenixSink(); + Configurables.configure(sink, sinkContext); + + assertEquals(LifecycleState.IDLE, sink.getLifecycleState()); + + final Channel channel = this.initChannel(); + sink.setChannel(channel); + + sink.start(); + final String eventBody = "{\"col1\" : \"kalyan\", \"x\" : {\"y\" : 10.5}, \"a\" : {\"b1\" : {\"c\" : [\"abc\",\"pqr\",\"xyz\"] }, \"b2\" : 111}, \"col4\" : [1,2,3,4]}"; + final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody)); + // put event in channel + Transaction transaction = channel.getTransaction(); + transaction.begin(); + channel.put(event); + transaction.commit(); + transaction.close(); + + sink.process(); + + int rowsInDb = countRows(fullTableName); + assertEquals(1, rowsInDb); + + sink.stop(); + assertEquals(LifecycleState.STOP, sink.getLifecycleState()); + + dropTable(fullTableName); + } + + @Test + public void testInnerColumnsWithArrayMapping() throws EventDeliveryException, SQLException { + + final String fullTableName = "FLUME_JSON_TEST"; + + String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName + + " (flume_time timestamp not null, col1 varchar , col2 double, col3 varchar[], col4 integer[]" + + " CONSTRAINT pk PRIMARY KEY (flume_time))\n"; + String columns = "col1,col2,col3,col4"; + String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name(); + String columnsMapping = "{\"col1\":\"col1\",\"col2\":\"x.y\",\"col3\":\"a.b[*].c\",\"col4\":\"col4\"}"; + + initSinkContext(fullTableName, ddl, columns, columnsMapping, rowkeyType, null); + + sink = new PhoenixSink(); + Configurables.configure(sink, sinkContext); + + assertEquals(LifecycleState.IDLE, sink.getLifecycleState()); + + final Channel channel = this.initChannel(); + sink.setChannel(channel); + + sink.start(); + final String eventBody = "{\"col1\" : \"kalyan\", \"x\" : {\"y\" : 10.5}, \"a\" : {\"b\" : [{\"c\" : \"abc\"}, {\"c\" : \"pqr\"}, {\"c\" : \"xyz\"}] , \"b2\" : 111}, \"col4\" : [1,2,3,4]}"; + final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody)); + // put event in channel + Transaction transaction = channel.getTransaction(); + transaction.begin(); + channel.put(event); + transaction.commit(); + transaction.close(); + + sink.process(); + + int rowsInDb = countRows(fullTableName); + assertEquals(1, rowsInDb); + + sink.stop(); + assertEquals(LifecycleState.STOP, sink.getLifecycleState()); + + dropTable(fullTableName); + } + + @Test + public void testKeyGenerator() throws EventDeliveryException, SQLException { + + final String fullTableName = "FLUME_JSON_TEST"; + initSinkContextWithDefaults(fullTableName); + + sink = new PhoenixSink(); + Configurables.configure(sink, sinkContext); + + assertEquals(LifecycleState.IDLE, sink.getLifecycleState()); + + final Channel channel = this.initChannel(); + sink.setChannel(channel); + + sink.start(); + final String eventBody = "{\"col1\" : \"kalyan\", \"col2\" : 10.5, \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}"; + final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody)); + // put event in channel + Transaction transaction = channel.getTransaction(); + transaction.begin(); + channel.put(event); + transaction.commit(); + transaction.close(); + + sink.process(); + + int rowsInDb = countRows(fullTableName); + assertEquals(1, rowsInDb); + + sink.stop(); + assertEquals(LifecycleState.STOP, sink.getLifecycleState()); + + dropTable(fullTableName); + } + + @Test + public void testMismatchKeyGenerator() throws EventDeliveryException, SQLException { + + final String fullTableName = "FLUME_JSON_TEST"; + initSinkContextWithDefaults(fullTableName); + setConfig(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR, + DefaultKeyGenerator.UUID.name()); + + sink = new PhoenixSink(); + Configurables.configure(sink, sinkContext); + assertEquals(LifecycleState.IDLE, sink.getLifecycleState()); + + final Channel channel = this.initChannel(); + sink.setChannel(channel); + + sink.start(); + final String eventBody = "{\"col1\" : \"kalyan\", \"col2\" : 10.5, \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}"; + final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody)); + // put event in channel + Transaction transaction = channel.getTransaction(); + transaction.begin(); + channel.put(event); + transaction.commit(); + transaction.close(); + + try { + sink.process(); + fail(); + } catch (Exception ex) { + assertTrue(ex.getCause().getMessage().contains("java.lang.IllegalArgumentException: Invalid format:")); + } + + dropTable(fullTableName); + } + + @Test + public void testMissingColumnsInEvent() throws EventDeliveryException, SQLException { + + final String fullTableName = "FLUME_JSON_TEST"; + initSinkContextWithDefaults(fullTableName); + + sink = new PhoenixSink(); + Configurables.configure(sink, sinkContext); + assertEquals(LifecycleState.IDLE, sink.getLifecycleState()); + + final Channel channel = this.initChannel(); + sink.setChannel(channel); + + sink.start(); + final String eventBody = "{\"col1\" : \"kalyan\", \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}"; + final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody)); + // put event in channel + Transaction transaction = channel.getTransaction(); + transaction.begin(); + channel.put(event); + transaction.commit(); + transaction.close(); + + sink.process(); + + int rowsInDb = countRows(fullTableName); + assertEquals(0, rowsInDb); + + sink.stop(); + assertEquals(LifecycleState.STOP, sink.getLifecycleState()); + + dropTable(fullTableName); + } + + @Test + public void testBatchEvents() throws EventDeliveryException, SQLException { + + final String fullTableName = "FLUME_JSON_TEST"; + initSinkContextWithDefaults(fullTableName); + + sink = new PhoenixSink(); + Configurables.configure(sink, sinkContext); + assertEquals(LifecycleState.IDLE, sink.getLifecycleState()); + + final Channel channel = this.initChannel(); + sink.setChannel(channel); + + sink.start(); + int numEvents = 150; + String col1 = "val1"; + String a1 = "[aaa,bbb,ccc]"; + String a2 = "[1,2,3,4]"; + String eventBody = null; + List<Event> eventList = Lists.newArrayListWithCapacity(numEvents); + for (int i = 0; i < eventList.size(); i++) { + eventBody = "{\"col1\" : \"" + (col1 + i) + "\", \"col2\" : " + i * 10.5 + " , \"col3\" : " + a1 + + " , \"col4\" : " + a2 + "}"; + Event event = EventBuilder.withBody(Bytes.toBytes(eventBody)); + eventList.add(event); + } + + // put event in channel + Transaction transaction = channel.getTransaction(); + transaction.begin(); + for (Event event : eventList) { + channel.put(event); + } + transaction.commit(); + transaction.close(); + + sink.process(); + + int rowsInDb = countRows(fullTableName); + assertEquals(eventList.size(), rowsInDb); + + sink.stop(); + assertEquals(LifecycleState.STOP, sink.getLifecycleState()); + + dropTable(fullTableName); + } + + @Test + public void testEventsWithHeaders() throws Exception { + + sinkContext = new Context(); + final String fullTableName = "FLUME_JSON_TEST"; + final String ddl = "CREATE TABLE IF NOT EXISTS " + + fullTableName + + " (rowkey VARCHAR not null, col1 varchar , col2 double, col3 varchar[], col4 integer[], host varchar , source varchar \n" + + " CONSTRAINT pk PRIMARY KEY (rowkey))\n"; + String columns = "col1,col2,col3,col4"; + String columnsMapping = "{\"col1\":\"col1\",\"col2\":\"col2\",\"col3\":\"col3\",\"col4\":\"col4\"}"; + String rowkeyType = DefaultKeyGenerator.UUID.name(); + String headers = "host,source"; + initSinkContext(fullTableName, ddl, columns, columnsMapping, rowkeyType, headers); + + sink = new PhoenixSink(); + Configurables.configure(sink, sinkContext); + assertEquals(LifecycleState.IDLE, sink.getLifecycleState()); + + final Channel channel = this.initChannel(); + sink.setChannel(channel); + + sink.start(); + + int numEvents = 10; + String col1 = "val1"; + String a1 = "[aaa,bbb,ccc]"; + String a2 = "[1,2,3,4]"; + String hostHeader = "host1"; + String sourceHeader = "source1"; + String eventBody = null; + List<Event> eventList = Lists.newArrayListWithCapacity(numEvents); + for (int i = 0; i < numEvents; i++) { + eventBody = "{\"col1\" : \"" + (col1 + i) + "\", \"col2\" : " + i * 10.5 + " , \"col3\" : " + a1 + + " , \"col4\" : " + a2 + "}"; + Map<String, String> headerMap = Maps.newHashMapWithExpectedSize(2); + headerMap.put("host", hostHeader); + headerMap.put("source", sourceHeader); + Event event = EventBuilder.withBody(Bytes.toBytes(eventBody), headerMap); + eventList.add(event); + } + + // put event in channel + Transaction transaction = channel.getTransaction(); + transaction.begin(); + for (Event event : eventList) { + channel.put(event); + } + transaction.commit(); + transaction.close(); + + sink.process(); + + final String query = " SELECT * FROM \n " + fullTableName; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + final ResultSet rs; + final Connection conn = DriverManager.getConnection(getUrl(), props); + try { + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("host1", rs.getString("host")); + assertEquals("source1", rs.getString("source")); + + assertTrue(rs.next()); + assertEquals("host1", rs.getString("host")); + assertEquals("source1", rs.getString("source")); + } finally { + if (conn != null) { + conn.close(); + } + } + sink.stop(); + assertEquals(LifecycleState.STOP, sink.getLifecycleState()); + + dropTable(fullTableName); + } + + private Channel initChannel() { + // Channel configuration + Context channelContext = new Context(); + channelContext.put("capacity", "10000"); + channelContext.put("transactionCapacity", "200"); + + Channel channel = new MemoryChannel(); + channel.setName("memorychannel"); + Configurables.configure(channel, channelContext); + return channel; + } + + private void initSinkContext(final String fullTableName, final String ddl, final String columns, + final String columnsMapping, final String rowkeyType, final String headers) { + Preconditions.checkNotNull(fullTableName); + sinkContext = new Context(); + sinkContext.put(FlumeConstants.CONFIG_TABLE, fullTableName); + sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl()); + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER, EventSerializers.JSON.name()); + sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl); + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_COLUMN_NAMES, columns); + if (null != columnsMapping) + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_COLUMNS_MAPPING, + columnsMapping); + if (null != rowkeyType) + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR, + rowkeyType); + if (null != headers) + sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_HEADER_NAMES, headers); + } + + private void initSinkContextWithDefaults(final String fullTableName) { + String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName + + " (flume_time timestamp not null, col1 varchar , col2 double, col3 varchar[], col4 integer[]" + + " CONSTRAINT pk PRIMARY KEY (flume_time))\n"; + String columns = "col1,col2,col3,col4"; + String columnsMapping = "{\"col1\":\"col1\",\"col2\":\"col2\",\"col3\":\"col3\",\"col4\":\"col4\"}"; + String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name(); + initSinkContext(fullTableName, ddl, columns, columnsMapping, rowkeyType, null); + } + + private void setConfig(final String configName, final String configValue) { + Preconditions.checkNotNull(sinkContext); + Preconditions.checkNotNull(configName); + Preconditions.checkNotNull(configValue); + sinkContext.put(configName, configValue); + } + + private int countRows(final String fullTableName) throws SQLException { + Preconditions.checkNotNull(fullTableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + final Connection conn = DriverManager.getConnection(getUrl(), props); + ResultSet rs = null; + try { + rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName); + int rowsCount = 0; + while (rs.next()) { + rowsCount = rs.getInt(1); + } + return rowsCount; + + } finally { + if (rs != null) { + rs.close(); + } + if (conn != null) { + conn.close(); + } + } + + } + + private void dropTable(final String fullTableName) throws SQLException { + Preconditions.checkNotNull(fullTableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + final Connection conn = DriverManager.getConnection(getUrl(), props); + try { + conn.createStatement().execute("drop table if exists " + fullTableName); + } finally { + if (conn != null) { + conn.close(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d9d53bc1/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java b/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java index cf43a3b..f60d7dc 100644 --- a/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java +++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java @@ -62,7 +62,12 @@ public final class FlumeConstants { /** Whether to ignore case when performing regex matches. */ public static final String IGNORE_CASE_CONFIG = "regexIgnoreCase"; public static final boolean IGNORE_CASE_DEFAULT = false; - + + /** JSON expression used to parse groups from event data. */ + public static final String CONFIG_COLUMNS_MAPPING = "columnsMapping"; + public static final String CONFIG_PARTIAL_SCHEMA = "partialSchema"; + public static final String JSON_DEFAULT = "{}"; + /** Comma separated list of column names . */ public static final String CONFIG_COLUMN_NAMES = "columns"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/d9d53bc1/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java ---------------------------------------------------------------------- diff --git a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java index fddcba5..24527e3 100644 --- a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java +++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java @@ -157,7 +157,9 @@ public abstract class BaseEventSerializer implements EventSerializer { while (rs.next()) { cf = rs.getString(QueryUtil.COLUMN_FAMILY_POSITION); cq = rs.getString(QueryUtil.COLUMN_NAME_POSITION); - dt = rs.getInt(QueryUtil.DATA_TYPE_POSITION); + // TODO: Fix this .. change `DATA_TYPE_POSITION` value 5 to 26 + // dt = rs.getInt(QueryUtil.DATA_TYPE_POSITION); + dt = rs.getInt(26); if(Strings.isNullOrEmpty(cf)) { rowkey = cq; // this is required only when row key is auto generated } else { http://git-wip-us.apache.org/repos/asf/phoenix/blob/d9d53bc1/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java ---------------------------------------------------------------------- diff --git a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java index eb0078d..68a609b 100644 --- a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java +++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java @@ -19,7 +19,7 @@ package org.apache.phoenix.flume.serializer; public enum EventSerializers { - REGEX(RegexEventSerializer.class.getName()); + REGEX(RegexEventSerializer.class.getName()), JSON(JsonEventSerializer.class.getName()); private final String className; http://git-wip-us.apache.org/repos/asf/phoenix/blob/d9d53bc1/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/JsonEventSerializer.java ---------------------------------------------------------------------- diff --git a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/JsonEventSerializer.java b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/JsonEventSerializer.java new file mode 100644 index 0000000..9226017 --- /dev/null +++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/JsonEventSerializer.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.flume.serializer; + +import static org.apache.phoenix.flume.FlumeConstants.JSON_DEFAULT; +import static org.apache.phoenix.flume.FlumeConstants.CONFIG_COLUMNS_MAPPING; +import static org.apache.phoenix.flume.FlumeConstants.CONFIG_PARTIAL_SCHEMA; + +import java.sql.Array; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.phoenix.schema.types.PDataType; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.json.JSONTokener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.spi.json.JsonOrgJsonProvider; +import com.jayway.jsonpath.spi.mapper.JsonOrgMappingProvider; + +public class JsonEventSerializer extends BaseEventSerializer { + + private static final Logger logger = LoggerFactory.getLogger(JsonEventSerializer.class); + + private JSONObject jsonSchema; + private boolean isProperMapping; + private boolean partialSchema; + + /** + * + */ + @Override + public void doConfigure(Context context) { + final String jsonData = context.getString(CONFIG_COLUMNS_MAPPING, JSON_DEFAULT); + try { + jsonSchema = new JSONObject(jsonData); + if (jsonSchema.length() == 0) { + for (String colName : colNames) { + jsonSchema.put(colName, colName); + } + isProperMapping = true; + } else { + Iterator<String> keys = jsonSchema.keys(); + List<String> keylist = new ArrayList<String>(); + while (keys.hasNext()) { + keylist.add(keys.next()); + } + isProperMapping = CollectionUtils.isEqualCollection(keylist, colNames); + } + } catch (JSONException e) { + e.printStackTrace(); + logger.debug("json mapping not proper, verify the data {} ", jsonData); + } + partialSchema = context.getBoolean(CONFIG_PARTIAL_SCHEMA, false); + } + + /** + * + */ + @Override + public void doInitialize() throws SQLException { + // NO-OP + } + + @Override + public void upsertEvents(List<Event> events) throws SQLException { + Preconditions.checkNotNull(events); + Preconditions.checkNotNull(connection); + Preconditions.checkNotNull(this.upsertStatement); + Preconditions.checkArgument(isProperMapping, "Please verify fields mapping is not properly done.."); + + boolean wasAutoCommit = connection.getAutoCommit(); + connection.setAutoCommit(false); + try (PreparedStatement colUpsert = connection.prepareStatement(upsertStatement)) { + String value = null; + Integer sqlType = null; + JSONObject inputJson = new JSONObject(); + for (Event event : events) { + byte[] payloadBytes = event.getBody(); + if (payloadBytes == null || payloadBytes.length == 0) { + continue; + } + String payload = new String(payloadBytes); + + try { + inputJson = new JSONObject(payload); + } catch (Exception e) { + logger.debug("payload is not proper json"); + continue; + } + + Map<String, String> data = new HashMap<String, String>(); + for (String colName : colNames) { + String pattern = colName; + if (jsonSchema.has(colName)) { + Object obj = jsonSchema.opt(colName); + if (null != obj) { + pattern = obj.toString(); + } + } + pattern = "$." + pattern; + value = getPatternData(inputJson, pattern); + + // if field mapping data is null then look for column data + if (null == value && partialSchema) { + pattern = "$." + colName; + value = getPatternData(inputJson, pattern); + } + + data.put(colName, value); + } + + Collection<String> values = data.values(); + if (values.contains(null)) { + logger.debug("payload data {} doesn't match the fields mapping {} ", inputJson, jsonSchema); + continue; + } + + int index = 1; + int offset = 0; + for (int i = 0; i < colNames.size(); i++, offset++) { + if (columnMetadata[offset] == null) { + continue; + } + String colName = colNames.get(i); + value = data.get(colName); + sqlType = columnMetadata[offset].getSqlType(); + PDataType pDataType = PDataType.fromTypeId(sqlType); + Object upsertValue; + if (pDataType.isArrayType()) { + JSONArray jsonArray = new JSONArray(new JSONTokener(value)); + Object[] vals = new Object[jsonArray.length()]; + for (int x = 0; x < jsonArray.length(); x++) { + vals[x] = jsonArray.get(x); + } + String baseTypeSqlName = PDataType.arrayBaseType(pDataType).getSqlTypeName(); + Array array = connection.createArrayOf(baseTypeSqlName, vals); + upsertValue = pDataType.toObject(array, pDataType); + } else { + upsertValue = pDataType.toObject(value); + } + if (upsertValue != null) { + colUpsert.setObject(index++, upsertValue, sqlType); + } else { + colUpsert.setNull(index++, sqlType); + } + } + + // add headers if necessary + Map<String, String> headerValues = event.getHeaders(); + for (int i = 0; i < headers.size(); i++, offset++) { + String headerName = headers.get(i); + String headerValue = headerValues.get(headerName); + sqlType = columnMetadata[offset].getSqlType(); + Object upsertValue = PDataType.fromTypeId(sqlType).toObject(headerValue); + if (upsertValue != null) { + colUpsert.setObject(index++, upsertValue, sqlType); + } else { + colUpsert.setNull(index++, sqlType); + } + } + + if (autoGenerateKey) { + sqlType = columnMetadata[offset].getSqlType(); + String generatedRowValue = this.keyGenerator.generate(); + Object rowkeyValue = PDataType.fromTypeId(sqlType).toObject(generatedRowValue); + colUpsert.setObject(index++, rowkeyValue, sqlType); + } + colUpsert.execute(); + } + connection.commit(); + } catch (Exception ex) { + logger.error("An error {} occurred during persisting the event ", ex.getMessage()); + throw new SQLException(ex.getMessage()); + } finally { + if (wasAutoCommit) { + connection.setAutoCommit(true); + } + } + + } + + private String getPatternData(JSONObject json, String pattern) { + Configuration JSON_ORG_CONFIGURATION = Configuration.builder().mappingProvider(new JsonOrgMappingProvider()) + .jsonProvider(new JsonOrgJsonProvider()).build(); + String value; + try { + Object object = JsonPath.using(JSON_ORG_CONFIGURATION).parse(json).read(pattern); + value = object.toString(); + } catch (Exception e) { + value = null; + } + return value; + } + +} \ No newline at end of file