[ 
https://issues.apache.org/jira/browse/EDGENT-368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15832504#comment-15832504
 ] 

Dale LaBossiere commented on EDGENT-368:
----------------------------------------

It seems pretty straightforward to utilize the Elasticsearch RestClient API 
from an Edgent application.
At least not much harder than whatever you've have to write to utilize the 
RestClient from any application.

Is just sample code / recipies needed?

Here's a sample that adds some JsonObject TStream tuples to ES.
The class {{ESPutTuples}} implements {{Function<JsonObject,Response>}} to be 
used in a {{TStream.map()}}.
Its {{Response apply(JsonObject tuple)}} is called for each tuple. The impl uses
{{RestClient.performRequest()}} to send it and returns the Response object, 
which Edgent adds to the response stream.

{code}
public class ESPutSample {

  public static void main(String[] args) {
    
    DirectProvider dp = new DirectProvider();
    Topology top = dp.newTopology();

    TStream<String> json = top.of(
          "{ \"user\": \"dlaboss\", \"message\": \"Tuple 1 message\" }",
          "{ \"user\": \"dlaboss\", \"message\": \"Tuple 2 message\" }"
        );
    TStream<JsonObject> tuples = json.map(JsonFunctions.fromString());
    
    TStream<Response> responses = tuples.map(new ESPutTuples());
    
    responses.print();
    
    dp.submit(top);

    // prints
    // apply() sending endpoint=/sample/tuple/1 
tuple={"user":"dlaboss","message":"Tuple 1 message"}
    // Response{requestLine=PUT /sample/tuple/1 HTTP/1.1, 
host=http://localhost:9200, response=HTTP/1.1 201 Created}
    // apply() sending endpoint=/sample/tuple/2 
tuple={"user":"dlaboss","message":"Tuple 2 message"}
    // Response{requestLine=PUT /sample/tuple/2 HTTP/1.1, 
host=http://localhost:9200, response=HTTP/1.1 201 Created}
    //
    // Use curl to query elasticsearch to see them:
    // curl -XGET 'http://localhost:9200/sample/tuple/0?pretty=true'
    // curl -XGET 'http://localhost:9200/sample/tuple/1?pretty=true'
  }
  
  static class ESPutTuples implements Function<JsonObject,Response>, 
AutoCloseable {
    private static final long serialVersionUID = 1L;
    private final RestClient client;
    private final Map<String,String> params = new HashMap<>();
    private long tupCnt;
    
    public ESPutTuples() {
      client = RestClient.builder(new HttpHost("localhost", 9200, 
"http")).build();
    }

    @Override
    public void close() throws Exception {
      System.out.println("close()");
      if (client != null) {
        client.close();
      }
    }

    @Override
    public Response apply(JsonObject tuple) {
      try {
        String endpoint = "/sample/tuple/" + ++tupCnt;
        HttpEntity entity = new StringEntity(tuple.toString(), 
ContentType.APPLICATION_JSON);
        System.out.println("apply() sending endpoint="+endpoint+" 
tuple="+tuple);
        return client.performRequest("PUT", endpoint, params, entity);
      }
      catch (IOException e) {
        System.err.println(e);
        throw new RuntimeException(e);
      }
    }
    
  }
  
}
{code}

> Implement Elasticsearch output connector
> ----------------------------------------
>
>                 Key: EDGENT-368
>                 URL: https://issues.apache.org/jira/browse/EDGENT-368
>             Project: Edgent
>          Issue Type: New Feature
>          Components: Connectors
>            Reporter: Otis Gospodnetic
>              Labels: connector, elasticsearch
>             Fix For: Apache Edgent 1.1.0
>
>
> It would be great to be able to write data to Elasticsearch (bulk) API.
> This connector should use the ES the ES HTTP client, not TransportClient (TC 
> has issues with things like different client vs. server library versions, so 
> should be avoided and the ES HTTP client, written in Java, was written to 
> avoid these TC issues)
> Once such connector exists one could use it not only writing to their own ES 
> cluster, but also services that expose the ES API, like Logsene 
> (http://sematext.com/logsene) and others.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to