Repository: nifi
Updated Branches:
  refs/heads/master 4e09a03f8 -> 66783c18b


NIFI-5427: Updating ScrollElasticsearchHttp to use POST, supporting ES6

This closes #2890

Signed-off-by: Mike Thomsen <mikerthom...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/66783c18
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/66783c18
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/66783c18

Branch: refs/heads/master
Commit: 66783c18b24b1c6b1cfd662c58ca9df1e60b866e
Parents: 4e09a03
Author: Joe Gresock <joseph.gres...@lmco.com>
Authored: Fri Jul 13 16:29:45 2018 +0000
Committer: Mike Thomsen <mikerthom...@gmail.com>
Committed: Wed Jul 18 13:50:53 2018 -0400

----------------------------------------------------------------------
 .../AbstractElasticsearchHttpProcessor.java            |  2 ++
 .../elasticsearch/ScrollElasticsearchHttp.java         | 13 +++++++++----
 2 files changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/66783c18/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
index 0b40c75..8cb34a1 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
@@ -250,6 +250,8 @@ public abstract class AbstractElasticsearchHttpProcessor 
extends AbstractElastic
             requestBuilder = requestBuilder.get();
         } else if ("put".equalsIgnoreCase(verb)) {
             requestBuilder = requestBuilder.put(body);
+        } else if ("post".equalsIgnoreCase(verb)) {
+            requestBuilder = requestBuilder.post(body);
         } else {
             throw new IllegalArgumentException("Elasticsearch REST API verb 
not supported by this processor: " + verb);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/66783c18/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
index e90af79..01e5ae1 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
@@ -18,7 +18,9 @@ package org.apache.nifi.processors.elasticsearch;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import okhttp3.HttpUrl;
+import okhttp3.MediaType;
 import okhttp3.OkHttpClient;
+import okhttp3.RequestBody;
 import okhttp3.Response;
 import okhttp3.ResponseBody;
 import org.apache.commons.lang3.StringUtils;
@@ -86,7 +88,6 @@ public class ScrollElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor
     private static final String FINISHED_QUERY_STATE = "finishedQuery";
     private static final String SCROLL_ID_STATE = "scrollId";
     private static final String SCROLL_QUERY_PARAM = "scroll";
-    private static final String SCROLL_ID_QUERY_PARAM = "scroll_id";
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -260,8 +261,13 @@ public class ScrollElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor
                         scrollId, pageSize, scroll, context);
                 final long startNanos = System.nanoTime();
 
+                final String scrollBody = String.format("{ \"scroll\": \"%s\", 
\"scroll_id\": \"%s\" }", scroll,
+                        scrollId);
+
+                final RequestBody body = 
RequestBody.create(MediaType.parse("application/json"), scrollBody);
+
                 final Response getResponse = 
sendRequestToElasticsearch(okHttpClient, scrollurl,
-                        username, password, "GET", null);
+                        username, password, "POST", body);
                 this.getPage(getResponse, scrollurl, context, session, 
flowFile, logger, startNanos);
                 getResponse.close();
             } else {
@@ -415,7 +421,6 @@ public class ScrollElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor
         if (!StringUtils.isEmpty(scrollId)) {
             builder.addPathSegment("_search");
             builder.addPathSegment("scroll");
-            builder.addQueryParameter(SCROLL_ID_QUERY_PARAM, scrollId);
         } else {
             builder.addPathSegment((StringUtils.isEmpty(index)) ? "_all" : 
index);
             if (!StringUtils.isEmpty(type)) {
@@ -432,8 +437,8 @@ public class ScrollElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor
                 String trimmedFields = 
Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));
                 builder.addQueryParameter(SORT_QUERY_PARAM, trimmedFields);
             }
+            builder.addQueryParameter(SCROLL_QUERY_PARAM, scroll);
         }
-        builder.addQueryParameter(SCROLL_QUERY_PARAM, scroll);
 
         // Find the user-added properties and set them as query parameters on 
the URL
         for (Map.Entry<PropertyDescriptor, String> property : 
context.getProperties().entrySet()) {

Reply via email to