Repository: flume Updated Branches: refs/heads/trunk 84c526fed -> 8328bccd4
FLUME-2126. Problem in elasticsearch sink when the event body is a complex field (Ashish Paliwal via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/8328bccd Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/8328bccd Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/8328bccd Branch: refs/heads/trunk Commit: 8328bccd41077d457cab064541127fc993e97619 Parents: 84c526f Author: Hari Shreedharan <[email protected]> Authored: Tue Oct 14 17:28:25 2014 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Tue Oct 14 17:28:25 2014 -0700 ---------------------------------------------------------------------- .../sink/elasticsearch/ContentBuilderUtil.java | 2 +- .../elasticsearch/TestElasticSearchSink.java | 21 ++++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/8328bccd/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java index bf7c57c..70d0b86 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java @@ -61,7 +61,7 @@ public class ContentBuilderUtil { parser = XContentFactory.xContent(contentType).createParser(data); parser.nextToken(); tmp.copyCurrentStructure(parser); - builder.field(fieldName, tmp); + builder.field(fieldName, tmp.string()); } catch (JsonParseException ex) { // If we get an exception here the most likely cause is nested JSON that // can't be figured out in the body. At this point just push it through http://git-wip-us.apache.org/repos/asf/flume/blob/8328bccd/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java index 15546c1..3e11726 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java @@ -94,6 +94,27 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest { } @Test + public void shouldIndexComplexJsonEvent() throws Exception { + Configurables.configure(fixture, new Context(parameters)); + Channel channel = bindAndStartChannel(fixture); + + Transaction tx = channel.getTransaction(); + tx.begin(); + Event event = EventBuilder.withBody("{\"event\":\"json content\"}".getBytes()); + channel.put(event); + tx.commit(); + tx.close(); + + fixture.process(); + fixture.stop(); + client.admin().indices() + .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet(); + + assertMatchAllQuery(1, event); + assertBodyQuery(1, event); + } + + @Test public void shouldIndexFiveEvents() throws Exception { // Make it so we only need to call process once parameters.put(BATCH_SIZE, "5");
