Repository: flume Updated Branches: refs/heads/flume-1.7 a8c0f90af -> d2fc881f5
FLUME-2649. Elasticsearch sink doesn't handle JSON fields correctly (Benjamin Fiorini via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d2fc881f Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d2fc881f Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d2fc881f Branch: refs/heads/flume-1.7 Commit: d2fc881f549568ea640fa29a96b0b37da64b225d Parents: a8c0f90 Author: Hari Shreedharan <hshreedha...@apache.org> Authored: Mon Apr 20 23:59:18 2015 -0700 Committer: Hari Shreedharan <hshreedha...@apache.org> Committed: Mon Apr 20 23:59:59 2015 -0700 ---------------------------------------------------------------------- .../sink/elasticsearch/ContentBuilderUtil.java | 25 +++++++++----- .../AbstractElasticSearchSinkTest.java | 3 +- .../elasticsearch/TestElasticSearchSink.java | 34 ++++++++++++++++++++ 3 files changed, 53 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/d2fc881f/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java index de0acf4..83c3ffd 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java @@ -55,20 +55,29 @@ public class ContentBuilderUtil { public static void addComplexField(XContentBuilder builder, String fieldName, XContentType contentType, byte[] data) throws IOException { - XContentParser parser = - XContentFactory.xContent(contentType).createParser(data); - parser.nextToken(); - // Add the field name, but not the value. - builder.field(fieldName); + XContentParser parser = null; try { + // Elasticsearch will accept JSON directly but we need to validate that + // the incoming event is JSON first. Sadly, the elasticsearch JSON parser + // is a stream parser so we need to instantiate it, parse the event to + // validate it, then instantiate it again to provide the JSON to + // elasticsearch. + // If validation fails then the incoming event is submitted to + // elasticsearch as plain text. + parser = XContentFactory.xContent(contentType).createParser(data); + while (parser.nextToken() != null) {}; + + // If the JSON is valid then include it + parser = XContentFactory.xContent(contentType).createParser(data); + // Add the field name, but not the value. + builder.field(fieldName); // This will add the whole parsed content as the value of the field. builder.copyCurrentStructure(parser); } catch (JsonParseException ex) { // If we get an exception here the most likely cause is nested JSON that // can't be figured out in the body. At this point just push it through - // as is, we have already added the field so don't do it again - builder.endObject(); - builder.field(fieldName, new String(data, charset)); + // as is + addSimpleField(builder, fieldName, data); } finally { if (parser != null) { parser.close(); http://git-wip-us.apache.org/repos/asf/flume/blob/d2fc881f/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java index 2f8fd6d..f9272fa 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java @@ -127,7 +127,8 @@ public abstract class AbstractElasticSearchSinkTest { void assertBodyQuery(int expectedHits, Event... events) { // Perform Multi Field Match assertSearch(expectedHits, - performSearch(QueryBuilders.fieldQuery("@message", "event")), null); + performSearch(QueryBuilders.fieldQuery("@message", "event")), + null, events); } SearchResponse performSearch(QueryBuilder query) { http://git-wip-us.apache.org/repos/asf/flume/blob/d2fc881f/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java index 78e1665..a58f344 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java @@ -95,6 +95,40 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest { } @Test + public void shouldIndexInvalidComplexJsonBody() throws Exception { + parameters.put(BATCH_SIZE, "3"); + Configurables.configure(fixture, new Context(parameters)); + Channel channel = bindAndStartChannel(fixture); + + Transaction tx = channel.getTransaction(); + tx.begin(); + Event event1 = EventBuilder.withBody("TEST1 {test}".getBytes()); + channel.put(event1); + Event event2 = EventBuilder.withBody("{test: TEST2 }".getBytes()); + channel.put(event2); + Event event3 = EventBuilder.withBody("{\"test\":{ TEST3 {test} }}".getBytes()); + channel.put(event3); + tx.commit(); + tx.close(); + + fixture.process(); + fixture.stop(); + client.admin().indices() + .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet(); + + assertMatchAllQuery(3); + assertSearch(1, + performSearch(QueryBuilders.fieldQuery("@message", "TEST1")), + null, event1); + assertSearch(1, + performSearch(QueryBuilders.fieldQuery("@message", "TEST2")), + null, event2); + assertSearch(1, + performSearch(QueryBuilders.fieldQuery("@message", "TEST3")), + null, event3); + } + + @Test public void shouldIndexComplexJsonEvent() throws Exception { Configurables.configure(fixture, new Context(parameters)); Channel channel = bindAndStartChannel(fixture);