[ 
https://issues.apache.org/jira/browse/FLUME-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14317634#comment-14317634
 ] 

Xuri Nagarin commented on FLUME-2126:
-------------------------------------

[~ejsarge] How would you like the Flume event to be captured/posted? In my 
case, I have Json coming in via the Kafka source. Right now, I am using 
morphlines to extract the kafka message and populate all the key/value pairs in 
the flume event header so the ES sink can properly populate ES index. Would be 
much nicer if the ES sink or LogStash serializer could (1) detect message type 
(2) if message/flume-body is structured then populate the ES index with 
key/value pairs from the flume header and body content. If flume event body 
isn't recognized as structured message then shove it in the "message" field 
(current behaviour).

I believe the relevant code is:
builder.startObject("@fields");
    for (String key : headers.keySet()) {
      byte[] val = headers.get(key).getBytes(charset);
      ContentBuilderUtil.appendField(builder, key, val);
    }
    builder.endObject();
  }

https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializer.java

Add iterating over the body if the body is structured?

> Problem in elasticsearch sink when the event body is a complex field
> --------------------------------------------------------------------
>
>                 Key: FLUME-2126
>                 URL: https://issues.apache.org/jira/browse/FLUME-2126
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>         Environment: 1.3.1 and 1.4
>            Reporter: Massimo Paladin
>            Assignee: Ashish Paliwal
>         Attachments: FLUME-2126-0.patch
>
>
> I have found a bug in the elasticsearch sink, the problem is in the 
> {{ContentBuilderUtil.addComplexField}} method, when it does 
> {{builder.field(fieldName, tmp);}} the {{tmp}} object is taken as {{Object}} 
> with the result of being serialized with the {{toString}} method in the 
> {{XContentBuilder}}. In the end you get the object reference as content.
> The following change workaround the problem for me, the bad point is that it 
> has to parse the content twice, I guess there is a better way to solve the 
> problem but I am not an elasticsearch api expert. 
> {code}
> --- 
> a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
> +++ 
> b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
> @@ -61,7 +61,12 @@ public class ContentBuilderUtil {
>        parser = XContentFactory.xContent(contentType).createParser(data);
>        parser.nextToken();
>        tmp.copyCurrentStructure(parser);
> -      builder.field(fieldName, tmp);
> +
> +      // if it is a valid structure then we include it
> +      parser = XContentFactory.xContent(contentType).createParser(data);
> +      parser.nextToken();
> +      builder.field(fieldName);
> +      builder.copyCurrentStructure(parser);
>      } catch (JsonParseException ex) {
>        // If we get an exception here the most likely cause is nested JSON 
> that
>        // can't be figured out in the body. At this point just push it through
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to