This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 50f2fb9  [FLINK-18583][elasticsearch] Fix Elasticsearch6 sink uses 
index as document id
50f2fb9 is described below

commit 50f2fb9fbcfc162f5933a9f06519742de020001e
Author: Yangze Guo <[email protected]>
AuthorDate: Tue Jul 14 19:31:05 2020 +0800

    [FLINK-18583][elasticsearch] Fix Elasticsearch6 sink uses index as document 
id
    
    This closes #12894
---
 .../table/Elasticsearch6DynamicSink.java           |  2 +-
 .../table/Elasticsearch6DynamicSinkITCase.java     | 50 ++++++++++++++++------
 .../table/Elasticsearch7DynamicSinkITCase.java     | 50 ++++++++++++++++------
 3 files changed, 73 insertions(+), 29 deletions(-)

diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
 
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
index b7fd44d..33ddfef 100644
--- 
a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
@@ -220,7 +220,7 @@ final class Elasticsearch6DynamicSink implements 
DynamicTableSink {
                                String key,
                                XContentType contentType,
                                byte[] document) {
-                       return new IndexRequest(index, docType, index)
+                       return new IndexRequest(index, docType, key)
                                .source(document, contentType);
                }
 
diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
 
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
index b306b34..8a7c180 100644
--- 
a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
@@ -47,6 +47,7 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 
 import static 
org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context;
@@ -208,7 +209,15 @@ public class Elasticsearch6DynamicSinkITCase {
                                12.12f,
                                (byte) 2,
                                LocalDate.ofEpochDay(12345),
-                               LocalDateTime.parse("2012-12-12T12:12:12"))
+                               LocalDateTime.parse("2012-12-12T12:12:12")),
+                       row(
+                               2L,
+                               LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+                               "FGHIJK",
+                               13.13f,
+                               (byte) 4,
+                               LocalDate.ofEpochDay(12345),
+                               LocalDateTime.parse("2013-12-12T13:13:13"))
                ).executeInsert("esTable")
                        .getJobClient()
                        .get()
@@ -226,25 +235,38 @@ public class Elasticsearch6DynamicSinkITCase {
                                .execute()
                                .actionGet()
                                .getHits();
-                       if (hits.getTotalHits() < 1) {
+                       if (hits.getTotalHits() < 2) {
                                Thread.sleep(200);
                        }
-               } while (hits.getTotalHits() < 1 && deadline.hasTimeLeft());
+               } while (hits.getTotalHits() < 2 && deadline.hasTimeLeft());
 
-               if (hits.getTotalHits() < 1) {
+               if (hits.getTotalHits() < 2) {
                        throw new AssertionError("Could not retrieve results 
from Elasticsearch.");
                }
 
-               Map<String, Object> result = hits.getAt(0).getSourceAsMap();
-               Map<Object, Object> expectedMap = new HashMap<>();
-               expectedMap.put("a", 1);
-               expectedMap.put("b", "00:00:12");
-               expectedMap.put("c", "ABCDE");
-               expectedMap.put("d", 12.12d);
-               expectedMap.put("e", 2);
-               expectedMap.put("f", "2003-10-20");
-               expectedMap.put("g", "2012-12-12 12:12:12");
-               assertThat(result, equalTo(expectedMap));
+               HashSet<Map<String, Object>> resultSet = new HashSet<>();
+               resultSet.add(hits.getAt(0).getSourceAsMap());
+               resultSet.add(hits.getAt(1).getSourceAsMap());
+               Map<Object, Object> expectedMap1 = new HashMap<>();
+               expectedMap1.put("a", 1);
+               expectedMap1.put("b", "00:00:12");
+               expectedMap1.put("c", "ABCDE");
+               expectedMap1.put("d", 12.12d);
+               expectedMap1.put("e", 2);
+               expectedMap1.put("f", "2003-10-20");
+               expectedMap1.put("g", "2012-12-12 12:12:12");
+               Map<Object, Object> expectedMap2 = new HashMap<>();
+               expectedMap2.put("a", 2);
+               expectedMap2.put("b", "00:00:12");
+               expectedMap2.put("c", "FGHIJK");
+               expectedMap2.put("d", 13.13d);
+               expectedMap2.put("e", 4);
+               expectedMap2.put("f", "2003-10-20");
+               expectedMap2.put("g", "2013-12-12 13:13:13");
+               HashSet<Map<Object, Object>> expectedSet = new HashSet<>();
+               expectedSet.add(expectedMap1);
+               expectedSet.add(expectedMap2);
+               assertThat(resultSet, equalTo(expectedSet));
        }
 
        @Test
diff --git 
a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
 
b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
index 483a783..b011e5b 100644
--- 
a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
@@ -47,6 +47,7 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 
 import static 
org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context;
@@ -200,7 +201,15 @@ public class Elasticsearch7DynamicSinkITCase {
                                12.12f,
                                (byte) 2,
                                LocalDate.ofEpochDay(12345),
-                               LocalDateTime.parse("2012-12-12T12:12:12"))
+                               LocalDateTime.parse("2012-12-12T12:12:12")),
+                       row(
+                               2L,
+                               LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+                               "FGHIJK",
+                               13.13f,
+                               (byte) 4,
+                               LocalDate.ofEpochDay(12345),
+                               LocalDateTime.parse("2013-12-12T13:13:13"))
                ).executeInsert("esTable")
                        .getJobClient()
                        .get()
@@ -218,25 +227,38 @@ public class Elasticsearch7DynamicSinkITCase {
                                .execute()
                                .actionGet()
                                .getHits();
-                       if (hits.getTotalHits().value < 1) {
+                       if (hits.getTotalHits().value < 2) {
                                Thread.sleep(200);
                        }
-               } while (hits.getTotalHits().value < 1 && 
deadline.hasTimeLeft());
+               } while (hits.getTotalHits().value < 2 && 
deadline.hasTimeLeft());
 
-               if (hits.getTotalHits().value < 1) {
+               if (hits.getTotalHits().value < 2) {
                        throw new AssertionError("Could not retrieve results 
from Elasticsearch.");
                }
 
-               Map<String, Object> result = hits.getAt(0).getSourceAsMap();
-               Map<Object, Object> expectedMap = new HashMap<>();
-               expectedMap.put("a", 1);
-               expectedMap.put("b", "00:00:12");
-               expectedMap.put("c", "ABCDE");
-               expectedMap.put("d", 12.12d);
-               expectedMap.put("e", 2);
-               expectedMap.put("f", "2003-10-20");
-               expectedMap.put("g", "2012-12-12 12:12:12");
-               assertThat(result, equalTo(expectedMap));
+               HashSet<Map<String, Object>> resultSet = new HashSet<>();
+               resultSet.add(hits.getAt(0).getSourceAsMap());
+               resultSet.add(hits.getAt(1).getSourceAsMap());
+               Map<Object, Object> expectedMap1 = new HashMap<>();
+               expectedMap1.put("a", 1);
+               expectedMap1.put("b", "00:00:12");
+               expectedMap1.put("c", "ABCDE");
+               expectedMap1.put("d", 12.12d);
+               expectedMap1.put("e", 2);
+               expectedMap1.put("f", "2003-10-20");
+               expectedMap1.put("g", "2012-12-12 12:12:12");
+               Map<Object, Object> expectedMap2 = new HashMap<>();
+               expectedMap2.put("a", 2);
+               expectedMap2.put("b", "00:00:12");
+               expectedMap2.put("c", "FGHIJK");
+               expectedMap2.put("d", 13.13d);
+               expectedMap2.put("e", 4);
+               expectedMap2.put("f", "2003-10-20");
+               expectedMap2.put("g", "2013-12-12 13:13:13");
+               HashSet<Map<Object, Object>> expectedSet = new HashSet<>();
+               expectedSet.add(expectedMap1);
+               expectedSet.add(expectedMap2);
+               assertThat(resultSet, equalTo(expectedSet));
        }
 
        @Test

Reply via email to