Hi,

I'm using Flink and Elasticsearch and I want to recieve in elasticsearch a
json object ({"id":1, "name":"X"} ect...), I already have a string with
this information, but I don't want to save it as string.

I recieve this:

{
  "_index": "logs",
  "_type": "object",
  "_id": "AVpcARfkfYWqSubr0ZvK",
  "_score": 1,
  "_source": {
    "data": "{\"id\":6,\"name\":\"A green
door\",\"price\":12.5,\"tags\":[\"home\",\"green\"]}"
  }
}

And I want to recieve this:

{
"_index": "logs",
"_type": "external",
"_id": "AVpcARfkfYWqSubr0ZvK",
"_score": 1,
"_source": {
"data": {
"id":6,
"name":"A green door",
"price":12.5,
"tags":
["home","green"]
}
}
}

my java code:

try {
            ArrayList<InetSocketAddress> transports = new ArrayList<>();
            transports.add(new InetSocketAddress("127.0.0.1", 9300));

            ElasticsearchSinkFunction<String> indexLog = new
ElasticsearchSinkFunction<String>() {

private static final long serialVersionUID = 8802869701292023100L;

public IndexRequest createIndexRequest(String element){

                    HashMap<String, Object> esJson = new HashMap<>();

                    esJson.put("data", element);



                    return Requests
                            .indexRequest()
                            .index("logs")
                            .type("object")
                            .source(esJson);
                }
@Override
                public void process(String element, RuntimeContext ctx,
RequestIndexer indexer) {
                    indexer.add(createIndexRequest(element));
                }
            };

            ElasticsearchSink<String> esSink = new
ElasticsearchSink<String>(config, transports, indexLog);
            input.addSink(esSink);
        }
        catch (Exception e) {
            System.out.println(e);
        }


Do I need to treat every entry as a map? Can I just send a object with key
value?

Thanks.

Reply via email to