[ https://issues.apache.org/jira/browse/FLINK-7386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16691378#comment-16691378 ]
ASF GitHub Bot commented on FLINK-7386: --------------------------------------- zentol closed pull request #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ client URL: https://github.com/apache/flink/pull/4675 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md index 3fba7f01fea..46d84695f6d 100644 --- a/docs/dev/connectors/elasticsearch.md +++ b/docs/dev/connectors/elasticsearch.md @@ -53,7 +53,12 @@ of the Elasticsearch installation: <tr> <td>flink-connector-elasticsearch5{{ site.scala_version_suffix }}</td> <td>1.3.0</td> - <td>5.x</td> + <td>5.2 and previous versions</td> + </tr> + <tr> + <td>flink-connector-elasticsearch5.3{{ site.scala_version_suffix }}</td> + <td>1.3.0</td> + <td>5.3 and later versions</td> </tr> </tbody> </table> diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java index ce98dfba1b9..0fac543e446 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java @@ -26,6 +26,7 @@ import java.io.Serializable; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; /** * An {@link ElasticsearchApiCallBridge} is used to bridge incompatible Elasticsearch Java API calls across different versions. @@ -36,7 +37,7 @@ * is allowed, the call bridge will hold reference to the created embedded node. Each instance of the sink will hold * exactly one instance of the call bridge, and state cleanup is performed when the sink is closed. */ -public interface ElasticsearchApiCallBridge extends Serializable { +public abstract class ElasticsearchApiCallBridge implements Serializable { /** * Creates an Elasticsearch {@link Client}. @@ -44,7 +45,7 @@ * @param clientConfig The configuration to use when constructing the client. * @return The created client. */ - Client createClient(Map<String, String> clientConfig); + public abstract Client createClient(Map<String, String> clientConfig); /** * Extracts the cause of failure of a bulk item action. @@ -52,7 +53,7 @@ * @param bulkItemResponse the bulk item response to extract cause of failure * @return the extracted {@link Throwable} from the response ({@code null} is the response is successful). */ - @Nullable Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse); + public abstract @Nullable Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse); /** * Set backoff-related configurations on the provided {@link BulkProcessor.Builder}. @@ -61,13 +62,30 @@ * @param builder the {@link BulkProcessor.Builder} to configure. * @param flushBackoffPolicy user-provided backoff retry settings ({@code null} if the user disabled backoff retries). */ - void configureBulkProcessorBackoff( + public abstract void configureBulkProcessorBackoff( BulkProcessor.Builder builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy); + /** + * Creates an RequestIndexer instance. + * + * @param bulkProcessor The instance of BulkProcessor + * @param flushOnCheckpoint If true, the producer will wait until all outstanding action requests have been sent to Elasticsearch. + * @param numPendingRequests Number of pending action requests not yet acknowledged by Elasticsearch. + * @return The created RequestIndexer. + */ + public RequestIndexer createRequestIndex( + BulkProcessor bulkProcessor, + boolean flushOnCheckpoint, + AtomicLong numPendingRequests) { + return new BulkProcessorIndexer(bulkProcessor, flushOnCheckpoint, numPendingRequests); + } + /** * Perform any necessary state cleanup. */ - void cleanup(); + public void cleanup() { + // nothing to cleanup + } } diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java index c49d726454a..1afbad2bbb6 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java @@ -152,7 +152,7 @@ public void setDelayMillis(long delayMillis) { private boolean flushOnCheckpoint = true; /** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */ - private transient BulkProcessorIndexer requestIndexer; + private transient RequestIndexer requestIndexer; // ------------------------------------------------------------------------ // Internals for the Flink Elasticsearch Sink @@ -280,7 +280,7 @@ public void disableFlushOnCheckpoint() { public void open(Configuration parameters) throws Exception { client = callBridge.createClient(userConfig); bulkProcessor = buildBulkProcessor(new BulkProcessorListener()); - requestIndexer = new BulkProcessorIndexer(bulkProcessor, flushOnCheckpoint, numPendingRequests); + requestIndexer = callBridge.createRequestIndex(bulkProcessor, flushOnCheckpoint, numPendingRequests); } @Override diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java index 5e5978569cb..bb7d322c7f8 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java @@ -510,7 +510,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { } } - private static class DummyElasticsearchApiCallBridge implements ElasticsearchApiCallBridge { + private static class DummyElasticsearchApiCallBridge extends ElasticsearchApiCallBridge { private static final long serialVersionUID = -4272760730959041699L; diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java index 5659ee651e8..e4a35f4627e 100644 --- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java +++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java @@ -40,7 +40,7 @@ /** * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 1.x. */ -public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge { +public class Elasticsearch1ApiCallBridge extends ElasticsearchApiCallBridge { private static final long serialVersionUID = -2632363720584123682L; diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java index 66b676c4fca..3a54f5c0387 100644 --- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java +++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java @@ -42,7 +42,7 @@ /** * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 2.x. */ -public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge { +public class Elasticsearch2ApiCallBridge extends ElasticsearchApiCallBridge { private static final long serialVersionUID = 2638252694744361079L; diff --git a/flink-connectors/flink-connector-elasticsearch5.3/pom.xml b/flink-connectors/flink-connector-elasticsearch5.3/pom.xml new file mode 100644 index 00000000000..3f0a0ca31db --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch5.3/pom.xml @@ -0,0 +1,162 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.4-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-elasticsearch5.3_${scala.binary.version}</artifactId> + <name>flink-connector-elasticsearch5.3</name> + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <elasticsearch.version>5.3.0</elasticsearch.version> + </properties> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <exclusions> + <!-- Elasticsearch Java Client has been moved to a different module in 5.x --> + <exclusion> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- Dependency for Elasticsearch 5.x Java Client --> + <dependency> + <groupId>org.elasticsearch.client</groupId> + <artifactId>transport</artifactId> + <version>${elasticsearch.version}</version> + </dependency> + + <!-- + Elasticsearch 5.x uses Log4j2 and no longer detects logging implementations, making + Log4j2 a strict dependency. The following is added so that the Log4j2 API in + Elasticsearch 5.x is routed to SLF4J. This way, user projects can remain flexible + in the logging implementation preferred. + --> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-to-slf4j</artifactId> + <version>2.7</version> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + </exclusion> + </exclusions> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <!-- + Including Log4j2 dependencies for tests is required for the + embedded Elasticsearch nodes used in tests to run correctly. + --> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>2.7</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>2.7</version> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <!-- + For the tests, we need to exclude the Log4j2 to slf4j adapter dependency + and let Elasticsearch directly use Log4j2, otherwise the embedded Elasticsearch node + used in tests will fail to work. + + In other words, the connector jar is routing Elasticsearch 5.x's Log4j2 API's to SLF4J, + but for the test builds, we still stick to directly using Log4j2. + --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.12.2</version> + <configuration> + <classpathDependencyExcludes> + <classpathDependencyExclude>org.apache.logging.log4j:log4j-to-slf4j</classpathDependencyExclude> + </classpathDependencyExcludes> + </configuration> + </plugin> + </plugins> + </build> + +</project> diff --git a/flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java new file mode 100644 index 00000000000..e0fd699af55 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch53; + +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkProcessor; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}. + * {@link ActionRequest ActionRequests} will be converted to {@link DocWriteRequest} + * and will be buffered before sending a bulk request to the Elasticsearch cluster. + */ +public class BulkProcessorIndexer implements RequestIndexer { + + private final BulkProcessor bulkProcessor; + private final boolean flushOnCheckpoint; + private final AtomicLong numPendingRequestsRef; + + public BulkProcessorIndexer(BulkProcessor bulkProcessor, + boolean flushOnCheckpoint, + AtomicLong numPendingRequests) { + this.bulkProcessor = bulkProcessor; + this.flushOnCheckpoint = flushOnCheckpoint; + this.numPendingRequestsRef = numPendingRequests; + } + + @Override + public void add(ActionRequest... actionRequests) { + for (ActionRequest actionRequest : actionRequests) { + if (flushOnCheckpoint) { + numPendingRequestsRef.getAndIncrement(); + } + this.bulkProcessor.add((DocWriteRequest) actionRequest); + } + } +} diff --git a/flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/Elasticsearch53ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/Elasticsearch53ApiCallBridge.java new file mode 100644 index 00000000000..10c742e9353 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/Elasticsearch53ApiCallBridge.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch53; + +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils; +import org.apache.flink.util.Preconditions; + +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.transport.Netty3Plugin; +import org.elasticsearch.transport.client.PreBuiltTransportClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 5.3 and later versions. + */ +public class Elasticsearch53ApiCallBridge extends ElasticsearchApiCallBridge { + + private static final long serialVersionUID = -5222683870097809633L; + + private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch53ApiCallBridge.class); + + /** + * User-provided transport addresses. + * + * <p>We are using {@link InetSocketAddress} because {@link TransportAddress} is not serializable in Elasticsearch 5.x. + */ + private final List<InetSocketAddress> transportAddresses; + + Elasticsearch53ApiCallBridge(List<InetSocketAddress> transportAddresses) { + Preconditions.checkArgument(transportAddresses != null && !transportAddresses.isEmpty()); + this.transportAddresses = transportAddresses; + } + + @Override + public Client createClient(Map<String, String> clientConfig) { + Settings settings = Settings.builder().put(clientConfig) + .put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME) + .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME) + .build(); + + TransportClient transportClient = new PreBuiltTransportClient(settings); + for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) { + transportClient.addTransportAddress(transport); + } + + // verify that we actually are connected to a cluster + if (transportClient.connectedNodes().isEmpty()) { + throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!"); + } + + if (LOG.isInfoEnabled()) { + LOG.info("Created Elasticsearch TransportClient with connected nodes {}", transportClient.connectedNodes()); + } + + return transportClient; + } + + @Override + public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) { + if (!bulkItemResponse.isFailed()) { + return null; + } else { + return bulkItemResponse.getFailure().getCause(); + } + } + + @Override + public void configureBulkProcessorBackoff( + BulkProcessor.Builder builder, + @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) { + + BackoffPolicy backoffPolicy; + if (flushBackoffPolicy != null) { + switch (flushBackoffPolicy.getBackoffType()) { + case CONSTANT: + backoffPolicy = BackoffPolicy.constantBackoff( + new TimeValue(flushBackoffPolicy.getDelayMillis()), + flushBackoffPolicy.getMaxRetryCount()); + break; + case EXPONENTIAL: + default: + backoffPolicy = BackoffPolicy.exponentialBackoff( + new TimeValue(flushBackoffPolicy.getDelayMillis()), + flushBackoffPolicy.getMaxRetryCount()); + } + } else { + backoffPolicy = BackoffPolicy.noBackoff(); + } + + builder.setBackoffPolicy(backoffPolicy); + } + + public RequestIndexer createRequestIndex( + BulkProcessor bulkProcessor, + boolean flushOnCheckpoint, + AtomicLong numPendingRequests) { + return new BulkProcessorIndexer(bulkProcessor, flushOnCheckpoint, numPendingRequests); + } +} diff --git a/flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/ElasticsearchSink.java new file mode 100644 index 00000000000..4e7aa99b382 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/ElasticsearchSink.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch53; + +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.client.transport.TransportClient; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; + +/** + * Elasticsearch 5.x sink that requests multiple {@link ActionRequest ActionRequests} + * against a cluster for each incoming element. + * + * <p>The sink internally uses a {@link TransportClient} to communicate with an Elasticsearch cluster. + * The sink will fail if no cluster can be connected to using the provided transport addresses passed to the constructor. + * + * <p>The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found + * in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is {@code cluster.name}, + * which should be set to the name of the cluster that the sink should emit to. + * + * <p>Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}. + * This will buffer elements before sending a request to the cluster. The behaviour of the + * {@code BulkProcessor} can be configured using these config keys: + * <ul> + * <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer + * <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer + * <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two + * settings in milliseconds + * </ul> + * + * <p>Note that the Elasticsearch 5.3 and later versions will convert {@link ActionRequest ActionRequest} to + * {@link DocWriteRequest DocWriteRequest} in {@link BulkProcessorIndexer}. + * + * <p>You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple + * {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation of + * {@link ElasticsearchSinkFunction} for an example. + * + * @param <T> Type of the elements handled by this sink + */ +public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> { + + private static final long serialVersionUID = 1L; + + /** + * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}. + * + * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor} + * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient} + * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element + */ + public ElasticsearchSink( + Map<String, String> userConfig, + List<InetSocketAddress> transportAddresses, + ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { + + this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpFailureHandler()); + } + + /** + * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}. + * + * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor} + * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient} + * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element + * @param failureHandler This is used to handle failed {@link ActionRequest} + */ + public ElasticsearchSink( + Map<String, String> userConfig, + List<InetSocketAddress> transportAddresses, + ElasticsearchSinkFunction<T> elasticsearchSinkFunction, + ActionRequestFailureHandler failureHandler) { + + super(new Elasticsearch53ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction, failureHandler); + } +} diff --git a/flink-connectors/flink-connector-elasticsearch5.3/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java b/flink-connectors/flink-connector-elasticsearch5.3/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java new file mode 100644 index 00000000000..5a30ef81d2e --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch5.3/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.streaming.connectors.elasticsearch53.ElasticsearchSinkITCase; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.InternalSettingsPreparer; +import org.elasticsearch.node.Node; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.transport.Netty3Plugin; + +import java.io.File; +import java.util.Collections; + +/** + * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for Elasticsearch 5.3. + * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for integration tests. + */ +public class EmbeddedElasticsearchNodeEnvironmentImpl implements EmbeddedElasticsearchNodeEnvironment { + + private Node node; + + @Override + public void start(File tmpDataFolder, String clusterName) throws Exception { + if (node == null) { + Settings settings = Settings.builder() + .put("cluster.name", clusterName) + .put("http.enabled", false) + .put("path.home", tmpDataFolder.getParent()) + .put("path.data", tmpDataFolder.getAbsolutePath()) + .put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME) + .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME) + .build(); + + node = new PluginNode(settings); + node.start(); + } + } + + @Override + public void close() throws Exception { + if (node != null && !node.isClosed()) { + node.close(); + node = null; + } + } + + @Override + public Client getClient() { + if (node != null && !node.isClosed()) { + return node.client(); + } else { + return null; + } + } + + private static class PluginNode extends Node { + public PluginNode(Settings settings) { + super(InternalSettingsPreparer.prepareEnvironment(settings, null), Collections.<Class<? extends Plugin>>singletonList(Netty3Plugin.class)); + } + } + +} diff --git a/flink-connectors/flink-connector-elasticsearch5.3/src/test/java/org/apache/flink/streaming/connectors/elasticsearch53/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch5.3/src/test/java/org/apache/flink/streaming/connectors/elasticsearch53/ElasticsearchSinkITCase.java new file mode 100644 index 00000000000..a92e5f1223d --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch5.3/src/test/java/org/apache/flink/streaming/connectors/elasticsearch53/ElasticsearchSinkITCase.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch53; + +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase; + +import org.junit.Test; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * IT cases for the {@link ElasticsearchSink}. + */ +public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase { + + @Test + public void testTransportClient() throws Exception { + runTransportClientTest(); + } + + @Test + public void testNullTransportClient() throws Exception { + runNullTransportClientTest(); + } + + @Test + public void testEmptyTransportClient() throws Exception { + runEmptyTransportClientTest(); + } + + @Test + public void testTransportClientFails() throws Exception { + runTransportClientFailsTest(); + } + + @Override + protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig, + List<InetSocketAddress> transportAddresses, + ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { + return new ElasticsearchSink<>(userConfig, transportAddresses, elasticsearchSinkFunction); + } + + @Override + protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode( + Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception { + + List<InetSocketAddress> transports = new ArrayList<>(); + transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); + + return new ElasticsearchSink<>(userConfig, transports, elasticsearchSinkFunction); + } + +} diff --git a/flink-connectors/flink-connector-elasticsearch5.3/src/test/java/org/apache/flink/streaming/connectors/elasticsearch53/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch5.3/src/test/java/org/apache/flink/streaming/connectors/elasticsearch53/examples/ElasticsearchSinkExample.java new file mode 100644 index 00000000000..5169e20b436 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch5.3/src/test/java/org/apache/flink/streaming/connectors/elasticsearch53/examples/ElasticsearchSinkExample.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch53.examples; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.apache.flink.streaming.connectors.elasticsearch53.ElasticsearchSink; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that + * you have a cluster named "elasticsearch" running or change the name of cluster in the config map. + */ +public class ElasticsearchSinkExample { + + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() { + @Override + public String map(Long value) throws Exception { + return "message #" + value; + } + }); + + Map<String, String> userConfig = new HashMap<>(); + userConfig.put("cluster.name", "elasticsearch"); + // This instructs the sink to emit after every element, otherwise they would be buffered + userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + + List<InetSocketAddress> transports = new ArrayList<>(); + transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); + + source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() { + @Override + public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { + indexer.add(createIndexRequest(element)); + } + })); + + env.execute("Elasticsearch Sink Example"); + } + + private static IndexRequest createIndexRequest(String element) { + Map<String, Object> json = new HashMap<>(); + json.put("data", element); + + return Requests.indexRequest() + .index("my-index") + .type("my-type") + .id(element) + .source(json); + } +} diff --git a/flink-connectors/flink-connector-elasticsearch5.3/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-elasticsearch5.3/src/test/resources/log4j-test.properties new file mode 100644 index 00000000000..20551848eea --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch5.3/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target=System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java index ffb572df839..e1f37d519dd 100644 --- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java +++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java @@ -45,7 +45,7 @@ /** * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 5.x. */ -public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge { +public class Elasticsearch5ApiCallBridge extends ElasticsearchApiCallBridge { private static final long serialVersionUID = -5222683870097809633L; diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml index bc3f82f686c..901117759b6 100644 --- a/flink-connectors/pom.xml +++ b/flink-connectors/pom.xml @@ -49,6 +49,7 @@ under the License. <module>flink-connector-elasticsearch</module> <module>flink-connector-elasticsearch2</module> <module>flink-connector-elasticsearch5</module> + <module>flink-connector-elasticsearch5.3</module> <module>flink-connector-rabbitmq</module> <module>flink-connector-twitter</module> <module>flink-connector-nifi</module> diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh index a3798456221..f3f12bd7f9c 100755 --- a/tools/travis_mvn_watchdog.sh +++ b/tools/travis_mvn_watchdog.sh @@ -80,6 +80,7 @@ flink-connectors/flink-connector-cassandra,\ flink-connectors/flink-connector-elasticsearch,\ flink-connectors/flink-connector-elasticsearch2,\ flink-connectors/flink-connector-elasticsearch5,\ +flink-connectors/flink-connector-elasticsearch5.3,\ flink-connectors/flink-connector-elasticsearch-base,\ flink-connectors/flink-connector-filesystem,\ flink-connectors/flink-connector-kafka-0.8,\ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ > client > -------------------------------------------------------------------------------- > > Key: FLINK-7386 > URL: https://issues.apache.org/jira/browse/FLINK-7386 > Project: Flink > Issue Type: Improvement > Components: ElasticSearch Connector > Reporter: Dawid Wysakowicz > Assignee: Fang Yong > Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0 > > > In Elasticsearch 5.2.0 client the class {{BulkProcessor}} was refactored and > has no longer the method {{add(ActionRequest)}}. > For more info see: https://github.com/elastic/elasticsearch/pull/20109 -- This message was sent by Atlassian JIRA (v7.6.3#76005)