This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a165bda1d03 [fix][io] ElasticSearch sink: align null fields behaviour
(#18577)
a165bda1d03 is described below
commit a165bda1d03e370f5efe1173134fb94e12b584b5
Author: Nicolò Boschi <[email protected]>
AuthorDate: Wed Nov 23 14:04:37 2022 +0100
[fix][io] ElasticSearch sink: align null fields behaviour (#18577)
---
pom.xml | 4 +--
.../elastic/ElasticSearchJavaRestClient.java | 12 ++++----
.../io/elasticsearch/ElasticSearchClientTests.java | 35 +++++++++++++++++++---
.../io/elasticsearch/ElasticSearchTestBase.java | 4 +--
.../io/sinks/ElasticSearch7SinkTester.java | 7 ++++-
.../io/sinks/ElasticSearch8SinkTester.java | 7 ++++-
.../integration/io/sinks/OpenSearchSinkTester.java | 6 +++-
7 files changed, 59 insertions(+), 16 deletions(-)
diff --git a/pom.xml b/pom.xml
index f67a61f8958..d02329a841d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -176,7 +176,7 @@ flexible messaging model and an intuitive client
API.</description>
<hdfs-offload-version3>3.3.3</hdfs-offload-version3>
<json-smart.version>2.4.7</json-smart.version>
<opensearch.version>1.2.4</opensearch.version>
- <elasticsearch-java.version>8.1.0</elasticsearch-java.version>
+ <elasticsearch-java.version>8.5.2</elasticsearch-java.version>
<trino.version>363</trino.version>
<scala.binary.version>2.13</scala.binary.version>
<debezium.version>1.9.7.Final</debezium.version>
@@ -236,7 +236,7 @@ flexible messaging model and an intuitive client
API.</description>
<netty-reactive-streams.version>2.0.6</netty-reactive-streams.version>
<!-- test dependencies -->
- <testcontainers.version>1.17.2</testcontainers.version>
+ <testcontainers.version>1.17.6</testcontainers.version>
<hamcrest.version>2.2</hamcrest.version>
<!-- Set docker-java.version to the version of docker-java used in
Testcontainers -->
diff --git
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
index 50876704ff2..4749ea2e2d3 100644
---
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
+++
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
@@ -36,7 +36,9 @@ import
co.elastic.clients.elasticsearch.indices.RefreshRequest;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Map;
@@ -53,8 +55,9 @@ import org.elasticsearch.client.RestClientBuilder;
public class ElasticSearchJavaRestClient extends RestClient {
private final ElasticsearchClient client;
- private final ObjectMapper objectMapper = new ObjectMapper();
-
+ private final ObjectMapper objectMapper = new ObjectMapper()
+ .configure(SerializationFeature.INDENT_OUTPUT, false)
+ .setSerializationInclusion(JsonInclude.Include.ALWAYS);
private BulkProcessor bulkProcessor;
private ElasticsearchTransport transport;
@@ -87,8 +90,7 @@ public class ElasticSearchJavaRestClient extends RestClient {
log.warn("Node host={} failed", node.getHost());
}
});
- transport = new RestClientTransport(builder.build(),
- new JacksonJsonpMapper());
+ transport = new RestClientTransport(builder.build(), new
JacksonJsonpMapper(objectMapper));
client = new ElasticsearchClient(transport);
if (elasticSearchConfig.isBulkEnabled()) {
bulkProcessor = new ElasticBulkProcessor(elasticSearchConfig,
client, bulkProcessorListener);
@@ -117,7 +119,7 @@ public class ElasticSearchJavaRestClient extends RestClient
{
.build();
try {
final CreateIndexResponse createIndexResponse =
client.indices().create(createIndexRequest);
- if ((createIndexResponse.acknowledged() != null &&
createIndexResponse.acknowledged())
+ if ((createIndexResponse.acknowledged())
&& createIndexResponse.shardsAcknowledged()) {
return true;
}
diff --git
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
index dc3ca8c34e7..6d9928c0426 100644
---
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
+++
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
@@ -29,6 +29,7 @@ import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import java.io.IOException;
+import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.LongAdder;
@@ -193,7 +194,7 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
@Test
public void testTopicToIndexName() throws IOException {
try (ElasticSearchClient client = new ElasticSearchClient(new
ElasticSearchConfig()
- .setElasticSearchUrl("http://" +
container.getHttpHostAddress())); ) {
+ .setElasticSearchUrl("http://" +
container.getHttpHostAddress()));) {
assertEquals(client.topicToIndexName("data-ks1.table1"),
"data-ks1.table1");
assertEquals(client.topicToIndexName("persistent://public/default/testesjson"),
"testesjson");
assertEquals(client.topicToIndexName("default/testesjson"),
"testesjson");
@@ -211,7 +212,7 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
public void testMalformedDocFails() throws Exception {
String index = "indexmalformed-" + UUID.randomUUID();
ElasticSearchConfig config = new ElasticSearchConfig()
- .setElasticSearchUrl("http://"+container.getHttpHostAddress())
+ .setElasticSearchUrl("http://" +
container.getHttpHostAddress())
.setIndexName(index)
.setBulkEnabled(true)
.setBulkFlushIntervalInMs(-1L)
@@ -235,7 +236,7 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
public void testMalformedDocIgnore() throws Exception {
String index = "indexmalformed2-" + UUID.randomUUID();
ElasticSearchConfig config = new ElasticSearchConfig()
- .setElasticSearchUrl("http://"+container.getHttpHostAddress())
+ .setElasticSearchUrl("http://" +
container.getHttpHostAddress())
.setIndexName(index)
.setBulkEnabled(true)
.setBulkFlushIntervalInMs(-1)
@@ -366,7 +367,7 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
public void testBulkIndexAndDelete() throws Exception {
final String index = "indexbulktest-" + UUID.randomUUID();
ElasticSearchConfig config = new ElasticSearchConfig()
- .setElasticSearchUrl("http://"+container.getHttpHostAddress())
+ .setElasticSearchUrl("http://" +
container.getHttpHostAddress())
.setIndexName(index)
.setBulkEnabled(true)
.setBulkActions(10)
@@ -389,4 +390,30 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
}
}
+ @Test
+ public void testIndexKeepNulls() throws Exception {
+ final String index = "indexnulls";
+ ElasticSearchConfig config = new ElasticSearchConfig()
+ .setElasticSearchUrl("http://" +
container.getHttpHostAddress())
+ .setIndexName(index);
+
+ try (ElasticSearchClient client = new ElasticSearchClient(config)) {
+ MockRecord<GenericObject> mockRecord = new MockRecord<>();
+ client.indexDocument(mockRecord, Pair.of("key0",
"{\"a\":1,\"b\":null}"));
+ final Map<String, Object> sourceAsMap;
+ if (elasticImageName.equals(ELASTICSEARCH_8)) {
+ final ElasticSearchJavaRestClient restClient =
(ElasticSearchJavaRestClient) client.getRestClient();
+ sourceAsMap =
+ restClient.search(index,
"*:*").hits().hits().get(0).source();
+ } else {
+ final OpenSearchHighLevelRestClient restClient =
(OpenSearchHighLevelRestClient) client.getRestClient();
+ sourceAsMap =
+ restClient.search(index,
"*:*").getHits().getHits()[0].getSourceAsMap();
+ }
+ assertEquals(sourceAsMap.get("a"), 1);
+ assertTrue(sourceAsMap.containsKey("b"));
+ assertNull(sourceAsMap.get("b"));
+ }
+ }
+
}
diff --git
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java
index 5f2bb1e5d75..4c6fd020fa3 100644
---
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java
+++
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java
@@ -39,10 +39,10 @@ import org.testcontainers.utility.DockerImageName;
public abstract class ElasticSearchTestBase {
public static final String ELASTICSEARCH_8 =
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8"))
- .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.1.0");
+ .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1");
public static final String ELASTICSEARCH_7 =
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V7"))
-
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64");
+ .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7");
public static final String OPENSEARCH =
Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE"))
.orElse("opensearchproject/opensearch:1.2.4");
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
index 17c17e7f496..65b38c677bf 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
@@ -18,18 +18,23 @@
*/
package org.apache.pulsar.tests.integration.io.sinks;
+import java.util.Optional;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
public class ElasticSearch7SinkTester extends ElasticSearchSinkTester {
+ public static final String ELASTICSEARCH_7 =
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V7"))
+ .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7");
+
+
public ElasticSearch7SinkTester(boolean schemaEnable) {
super(schemaEnable);
}
@Override
protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
- return new
ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64")
+ return new ElasticsearchContainer(ELASTICSEARCH_7)
.withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m");
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
index 02990ae4533..bb52c4ff03f 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
@@ -18,18 +18,23 @@
*/
package org.apache.pulsar.tests.integration.io.sinks;
+import java.util.Optional;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
public class ElasticSearch8SinkTester extends ElasticSearchSinkTester {
+ public static final String ELASTICSEARCH_8 =
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8"))
+ .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1");
+
+
public ElasticSearch8SinkTester(boolean schemaEnable) {
super(schemaEnable);
}
@Override
protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
- return new
ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:8.1.0")
+ return new ElasticsearchContainer(ELASTICSEARCH_8)
.withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m")
.withEnv("xpack.security.enabled", "false")
.withEnv("xpack.security.http.ssl.enabled", "false");
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
index 12caddb52e0..1e10cc4189c 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.tests.integration.io.sinks;
+import java.util.Optional;
import org.apache.http.HttpHost;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
@@ -36,6 +37,9 @@ import static org.testng.Assert.assertTrue;
public class OpenSearchSinkTester extends ElasticSearchSinkTester {
+ public static final String OPENSEARCH =
Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE"))
+ .orElse("opensearchproject/opensearch:1.2.4");
+
private RestHighLevelClient elasticClient;
@@ -45,7 +49,7 @@ public class OpenSearchSinkTester extends
ElasticSearchSinkTester {
@Override
protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
- DockerImageName dockerImageName =
DockerImageName.parse("opensearchproject/opensearch:1.2.4")
+ DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH)
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
return new ElasticsearchContainer(dockerImageName)
.withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m")