This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git
commit a9328dfcd7793b73cddc428d4c4a0aa827d220ca Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Tue Sep 13 12:42:47 2022 +0200 [FLINK-28410][tests] Sync E2E tests --- .github/workflows/ci.yml | 13 +- .../pom.xml | 77 +++++++++ .../flink/streaming/tests/ElasticsearchClient.java | 59 +++++++ .../streaming/tests/ElasticsearchDataReader.java | 57 +++++++ .../tests/ElasticsearchSinkE2ECaseBase.java | 101 ++++++++++++ .../ElasticsearchSinkExternalContextBase.java | 123 +++++++++++++++ ...lasticsearchSinkExternalContextFactoryBase.java | 59 +++++++ .../streaming/tests/ElasticsearchTestEmitter.java | 51 ++++++ .../org/apache/flink/streaming/tests/KeyValue.java | 92 +++++++++++ .../apache/flink/streaming/tests/QueryParams.java | 174 +++++++++++++++++++++ .../streaming/tests/UpdateRequestFactory.java | 43 +++++ .../flink/test/parameters/ParameterProperty.java | 58 +++++++ .../org/apache/flink/tests/util/TestUtils.java | 85 ++++++++++ .../pom.xml | 120 ++++++++++++++ .../streaming/tests/Elasticsearch6Client.java | 149 ++++++++++++++++++ .../streaming/tests/UpdateRequest6Factory.java | 48 ++++++ .../streaming/tests/Elasticsearch6SinkE2ECase.java | 60 +++++++ .../tests/Elasticsearch6SinkExternalContext.java | 68 ++++++++ .../Elasticsearch6SinkExternalContextFactory.java | 48 ++++++ .../src/test/resources/log4j2-test.properties | 34 ++++ .../pom.xml | 121 ++++++++++++++ .../streaming/tests/Elasticsearch7Client.java | 147 +++++++++++++++++ .../streaming/tests/UpdateRequest7Factory.java | 46 ++++++ .../streaming/tests/Elasticsearch7SinkE2ECase.java | 60 +++++++ .../tests/Elasticsearch7SinkExternalContext.java | 68 ++++++++ .../Elasticsearch7SinkExternalContextFactory.java | 48 ++++++ .../src/test/resources/log4j2-test.properties | 35 +++++ flink-connector-elasticsearch-e2e-tests/pom.xml | 128 +++++++++++++++ 28 files changed, 2171 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c23d82c..9aff109 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,6 +26,7 @@ jobs: jdk: [8, 11] env: MVN_CONNECTION_OPTIONS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 + FLINK_URL: https://s3.amazonaws.com/flink-nightly/flink-1.16-SNAPSHOT-bin-scala_2.12.tgz steps: - run: echo "Running CI pipeline for JDK version ${{ matrix.jdk }}" @@ -45,4 +46,14 @@ jobs: maven-version: 3.8.6 - name: Compile and test flink-connector-elasticsearch - run: mvn clean install -Dscala-2.12 -Dflink.convergence.phase=install -Pcheck-convergence -U -B ${{ env.MVN_CONNECTION_OPTIONS }} -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties + run: | + pushd .. \ + && wget -q -c ${{ env.FLINK_URL }} -O - | tar -xz \ + && popd + + mvn clean install -U -B \ + -Dscala-2.12 \ + -Prun-end-to-end-tests -DdistDir=$(pwd)/../flink-1.16-SNAPSHOT \ + -Dflink.convergence.phase=install -Pcheck-convergence \ + ${{ env.MVN_CONNECTION_OPTIONS }} \ + -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/pom.xml b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/pom.xml new file mode 100644 index 0000000..b7b6e1d --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/pom.xml @@ -0,0 +1,77 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch-e2e-tests</artifactId> + <version>3.0-SNAPSHOT</version> + </parent> + + <artifactId>flink-connector-elasticsearch-e2e-tests-common</artifactId> + <name>Flink : Connectors : Elasticsearch : E2E tests : Common</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-test-utils</artifactId> + <version>${flink.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${flink.version}</version> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch-base</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>elasticsearch</artifactId> + <version>${testcontainers.version}</version> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter</artifactId> + <scope>compile</scope> + </dependency> + </dependencies> + +</project> diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchClient.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchClient.java new file mode 100644 index 0000000..ad97915 --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchClient.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import java.util.List; + +/** The version-agnostic Elasticsearch client interface. */ +public interface ElasticsearchClient { + + /** + * Delete the index. + * + * @param indexName The index name. + */ + void deleteIndex(String indexName); + + /** + * Refresh the index. + * + * @param indexName The index name. + */ + void refreshIndex(String indexName); + + /** + * Create index if it does not exist. + * + * @param indexName The index name. + * @param shards The number of shards. + * @param replicas The number of replicas. + */ + void createIndexIfDoesNotExist(String indexName, int shards, int replicas); + + /** Close the client. @throws Exception The exception. */ + void close() throws Exception; + + /** + * Fetch all results from the index. + * + * @param params The parameters of the query. + * @return All documents from the index. + */ + List<KeyValue<Integer, String>> fetchAll(QueryParams params); +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchDataReader.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchDataReader.java new file mode 100644 index 0000000..165ca6d --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchDataReader.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.connector.testframe.external.ExternalSystemDataReader; + +import java.time.Duration; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Elasticsearch data reader. */ +public class ElasticsearchDataReader + implements ExternalSystemDataReader<KeyValue<Integer, String>> { + private final ElasticsearchClient client; + private final String indexName; + private final int pageLength; + + public ElasticsearchDataReader(ElasticsearchClient client, String indexName, int pageLength) { + this.client = checkNotNull(client); + this.indexName = checkNotNull(indexName); + this.pageLength = pageLength; + } + + @Override + public List<KeyValue<Integer, String>> poll(Duration timeout) { + client.refreshIndex(indexName); + QueryParams params = + QueryParams.newBuilder(indexName) + .sortField("key") + .pageLength(pageLength) + .trackTotalHits(true) + .build(); + return client.fetchAll(params); + } + + @Override + public void close() throws Exception { + client.close(); + } +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkE2ECaseBase.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkE2ECaseBase.java new file mode 100644 index 0000000..b6f4a9f --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkE2ECaseBase.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment; +import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem; +import org.apache.flink.connector.testframe.external.ExternalSystemDataReader; +import org.apache.flink.connector.testframe.junit.annotations.TestEnv; +import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; +import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; +import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase; +import org.apache.flink.streaming.api.CheckpointingMode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertThat; +import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition; + +/** Base classs for end to end ElasticsearchSink tests based on connector testing framework. */ +@SuppressWarnings("unused") +public abstract class ElasticsearchSinkE2ECaseBase<T extends Comparable<T>> + extends SinkTestSuiteBase<T> { + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkE2ECaseBase.class); + private static final int READER_RETRY_ATTEMPTS = 10; + private static final int READER_TIMEOUT = -1; // Not used + + protected static final String ELASTICSEARCH_HOSTNAME = "elasticsearch"; + + @TestSemantics + CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE}; + + // Defines TestEnvironment + @TestEnv + protected FlinkContainerTestEnvironment flink = new FlinkContainerTestEnvironment(1, 6); + + // Defines ConnectorExternalSystem + @TestExternalSystem + DefaultContainerizedExternalSystem<ElasticsearchContainer> elasticsearch = + DefaultContainerizedExternalSystem.builder() + .fromContainer( + new ElasticsearchContainer( + DockerImageName.parse(getElasticsearchContainerName())) + .withEnv( + "cluster.routing.allocation.disk.threshold_enabled", + "false") + .withNetworkAliases(ELASTICSEARCH_HOSTNAME)) + .bindWithFlinkContainer(flink.getFlinkContainers().getJobManager()) + .build(); + + @Override + protected void checkResultWithSemantic( + ExternalSystemDataReader<T> reader, List<T> testData, CheckpointingMode semantic) + throws Exception { + waitUntilCondition( + () -> { + try { + List<T> result = reader.poll(Duration.ofMillis(READER_TIMEOUT)); + assertThat(sort(result).iterator()) + .matchesRecordsFromSource( + Collections.singletonList(sort(testData)), semantic); + return true; + } catch (Throwable t) { + LOG.warn("Polled results not as expected", t); + return false; + } + }, + 5000, + READER_RETRY_ATTEMPTS); + } + + private List<T> sort(List<T> list) { + return list.stream().sorted().collect(Collectors.toList()); + } + + abstract String getElasticsearchContainerName(); +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextBase.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextBase.java new file mode 100644 index 0000000..598cf43 --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextBase.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.testframe.external.ExternalSystemDataReader; +import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext; +import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings; + +import org.apache.commons.lang3.RandomStringUtils; + +import java.net.URL; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** The base class for Elasticsearch sink context. */ +abstract class ElasticsearchSinkExternalContextBase + implements DataStreamSinkV2ExternalContext<KeyValue<Integer, String>> { + /** The constant INDEX_NAME_PREFIX. */ + protected static final String INDEX_NAME_PREFIX = "es-index"; + + private static final int RANDOM_STRING_MAX_LENGTH = 50; + private static final int NUM_RECORDS_UPPER_BOUND = 500; + private static final int NUM_RECORDS_LOWER_BOUND = 100; + protected static final int BULK_BUFFER = 100; + protected static final int PAGE_LENGTH = NUM_RECORDS_UPPER_BOUND + 1; + /** The index name. */ + protected final String indexName; + + /** The address reachable from Flink (internal to the testing environment). */ + protected final String addressInternal; + + /** The connector jar paths. */ + protected final List<URL> connectorJarPaths; + + /** The client. */ + protected final ElasticsearchClient client; + + /** + * Instantiates a new Elasticsearch sink context base. + * + * @param addressInternal The address to access Elasticsearch from within Flink. When running in + * a containerized environment, should correspond to the network alias that resolves within + * the environment's network together with the exposed port. + * @param connectorJarPaths The connector jar paths. + * @param client The Elasticsearch client. + */ + ElasticsearchSinkExternalContextBase( + String addressInternal, List<URL> connectorJarPaths, ElasticsearchClient client) { + this.addressInternal = checkNotNull(addressInternal); + this.connectorJarPaths = checkNotNull(connectorJarPaths); + this.client = checkNotNull(client); + this.indexName = + INDEX_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); + } + + @Override + public List<KeyValue<Integer, String>> generateTestData( + TestingSinkSettings sinkSettings, long seed) { + Random random = new Random(seed); + int recordNum = + random.nextInt(NUM_RECORDS_UPPER_BOUND - NUM_RECORDS_LOWER_BOUND) + + NUM_RECORDS_LOWER_BOUND; + + return IntStream.range(0, recordNum) + .boxed() + .map( + i -> { + int valueLength = random.nextInt(RANDOM_STRING_MAX_LENGTH) + 1; + String value = RandomStringUtils.random(valueLength, true, true); + return KeyValue.of(i, value); + }) + .collect(Collectors.toList()); + } + + @Override + public void close() { + client.deleteIndex(indexName); + } + + @Override + public List<URL> getConnectorJarPaths() { + return connectorJarPaths; + } + + @Override + public TypeInformation<KeyValue<Integer, String>> getProducedType() { + return TypeInformation.of(new TypeHint<KeyValue<Integer, String>>() {}); + } + + @Override + public abstract Sink<KeyValue<Integer, String>> createSink(TestingSinkSettings sinkSettings); + + @Override + public abstract ExternalSystemDataReader<KeyValue<Integer, String>> createSinkDataReader( + TestingSinkSettings sinkSettings); + + @Override + public abstract String toString(); +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextFactoryBase.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextFactoryBase.java new file mode 100644 index 0000000..0da9b2b --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextFactoryBase.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.connector.testframe.external.ExternalContext; +import org.apache.flink.connector.testframe.external.ExternalContextFactory; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.elasticsearch.ElasticsearchContainer; + +import java.net.URL; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** The base class for Elasticsearch sink context factory base. */ +public abstract class ElasticsearchSinkExternalContextFactoryBase<T extends ExternalContext> + implements ExternalContextFactory<T> { + + /** The Elasticsearch container. */ + protected final ElasticsearchContainer elasticsearchContainer; + + /** The connector jars. */ + protected final List<URL> connectorJars; + + /** + * Instantiates a new Elasticsearch sink context factory. + * + * @param elasticsearchContainer The Elasticsearch container. + * @param connectorJars The connector jars. + */ + ElasticsearchSinkExternalContextFactoryBase( + ElasticsearchContainer elasticsearchContainer, List<URL> connectorJars) { + this.elasticsearchContainer = checkNotNull(elasticsearchContainer); + this.connectorJars = checkNotNull(connectorJars); + } + + protected static String formatInternalAddress( + GenericContainer<ElasticsearchContainer> container) { + return String.format( + "%s:%d", container.getNetworkAliases().get(0), container.getExposedPorts().get(0)); + } +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchTestEmitter.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchTestEmitter.java new file mode 100644 index 0000000..7a5c36c --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchTestEmitter.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter; +import org.apache.flink.connector.elasticsearch.sink.RequestIndexer; + +import org.elasticsearch.action.update.UpdateRequest; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Test emitter for performing ElasticSearch indexing requests. */ +public class ElasticsearchTestEmitter implements ElasticsearchEmitter<KeyValue<Integer, String>> { + + private static final long serialVersionUID = 1L; + + private final UpdateRequestFactory factory; + + /** + * Instantiates a new Elasticsearch test emitter. + * + * @param factory The factory for creating {@link UpdateRequest}s. + */ + public ElasticsearchTestEmitter(UpdateRequestFactory factory) { + this.factory = checkNotNull(factory); + } + + @Override + public void emit( + KeyValue<Integer, String> element, SinkWriter.Context context, RequestIndexer indexer) { + UpdateRequest updateRequest = factory.createUpdateRequest(element); + indexer.add(updateRequest); + } +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/KeyValue.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/KeyValue.java new file mode 100644 index 0000000..43db0c4 --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/KeyValue.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.util.StringUtils; + +import java.io.Serializable; +import java.util.Objects; + +/** A {@link Comparable} holder for key-value pairs. */ +public class KeyValue<K extends Comparable<? super K>, V extends Comparable<? super V>> + implements Comparable<KeyValue<K, V>>, Serializable { + + private static final long serialVersionUID = 1L; + + /** The key of the key-value pair. */ + public K key; + /** The value the key-value pair. */ + public V value; + + /** Creates a new key-value pair where all fields are null. */ + public KeyValue() {} + + private KeyValue(K key, V value) { + this.key = key; + this.value = value; + } + + @Override + public int compareTo(KeyValue<K, V> other) { + int d = this.key.compareTo(other.key); + if (d == 0) { + return this.value.compareTo(other.value); + } + return d; + } + + /** Creates a new key-value pair. */ + public static <K extends Comparable<? super K>, T1 extends Comparable<? super T1>> + KeyValue<K, T1> of(K key, T1 value) { + return new KeyValue<>(key, value); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof KeyValue)) { + return false; + } + @SuppressWarnings("rawtypes") + KeyValue keyValue = (KeyValue) o; + if (key != null ? !key.equals(keyValue.key) : keyValue.key != null) { + return false; + } + if (value != null ? !value.equals(keyValue.value) : keyValue.value != null) { + return false; + } + return true; + } + + @Override + public int hashCode() { + return Objects.hash(key, value); + } + + @Override + public String toString() { + return "(" + + StringUtils.arrayAwareToString(this.key) + + "," + + StringUtils.arrayAwareToString(this.value) + + ")"; + } +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/QueryParams.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/QueryParams.java new file mode 100644 index 0000000..22cf7cf --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/QueryParams.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Holder class for Elasticsearch query parameters. */ +public class QueryParams { + private final String indexName; + private final String sortField; + private final int from; + private final int pageLength; + private final boolean trackTotalHits; + + private QueryParams(Builder builder) { + indexName = builder.indexName; + sortField = builder.sortField; + from = builder.from; + pageLength = builder.pageLength; + trackTotalHits = builder.trackTotalHits; + } + + /** + * New {@code QueryParams} builder. + * + * @param indexName The index name. This parameter is mandatory. + * @return The builder. + */ + public static Builder newBuilder(String indexName) { + return new Builder(indexName); + } + + /** {@code QueryParams} builder static inner class. */ + public static final class Builder { + private String sortField; + private int from; + private int pageLength; + private boolean trackTotalHits; + private String indexName; + + private Builder(String indexName) { + this.indexName = checkNotNull(indexName); + } + + /** + * Sets the {@code sortField} and returns a reference to this Builder enabling method + * chaining. + * + * @param sortField The {@code sortField} to set. + * @return A reference to this Builder. + */ + public Builder sortField(String sortField) { + this.sortField = checkNotNull(sortField); + return this; + } + + /** + * Sets the {@code from} and returns a reference to this Builder enabling method chaining. + * + * @param from The {@code from} to set. + * @return A reference to this Builder. + */ + public Builder from(int from) { + this.from = from; + return this; + } + + /** + * Sets the {@code pageLength} and returns a reference to this Builder enabling method + * chaining. + * + * @param pageLength The {@code pageLength} to set. + * @return A reference to this Builder. + */ + public Builder pageLength(int pageLength) { + this.pageLength = pageLength; + return this; + } + + /** + * Sets the {@code trackTotalHits} and returns a reference to this Builder enabling method + * chaining. + * + * @param trackTotalHits The {@code trackTotalHits} to set. + * @return A reference to this Builder. + */ + public Builder trackTotalHits(boolean trackTotalHits) { + this.trackTotalHits = trackTotalHits; + return this; + } + + /** + * Returns a {@code QueryParams} built from the parameters previously set. + * + * @return A {@code QueryParams} built with parameters of this {@code QueryParams.Builder} + */ + public QueryParams build() { + return new QueryParams(this); + } + + /** + * Sets the {@code indexName} and returns a reference to this Builder enabling method + * chaining. + * + * @param indexName The {@code indexName} to set. + * @return A reference to this Builder. + */ + public Builder indexName(String indexName) { + this.indexName = checkNotNull(indexName); + return this; + } + } + + /** + * Sort field string. + * + * @return The string. + */ + public String sortField() { + return sortField; + } + + /** + * From index to start the search from. Defaults to {@code 0}. + * + * @return The int. + */ + public int from() { + return from; + } + + /** + * Page length int. + * + * @return The int. + */ + public int pageLength() { + return pageLength; + } + + /** + * Track total hits boolean. + * + * @return The boolean. + */ + public boolean trackTotalHits() { + return trackTotalHits; + } + + /** + * Index name string. + * + * @return The string. + */ + public String indexName() { + return indexName; + } +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/UpdateRequestFactory.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/UpdateRequestFactory.java new file mode 100644 index 0000000..d7b8bb7 --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/UpdateRequestFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.elasticsearch.action.update.UpdateRequest; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** Factory for creating UpdateRequests. */ +public interface UpdateRequestFactory extends Serializable { + UpdateRequest createUpdateRequest(KeyValue<Integer, String> element); + + /** + * Utility to convert {@link KeyValue} elements into Elasticsearch-compatible format. + * + * @param element The element to be converted. + * @return The map with the element's fields. + */ + static Map<String, Object> prepareDoc(KeyValue<Integer, String> element) { + Map<String, Object> json = new HashMap<>(); + json.put("key", element.key); + json.put("value", element.value); + return json; + } +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/test/parameters/ParameterProperty.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/test/parameters/ParameterProperty.java new file mode 100644 index 0000000..a2bcfdf --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/test/parameters/ParameterProperty.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.parameters; + +import java.util.Optional; +import java.util.function.Function; + +/** System-property based parameters for tests and resources. */ +public class ParameterProperty<V> { + + private final String propertyName; + private final Function<String, V> converter; + + public ParameterProperty(final String propertyName, final Function<String, V> converter) { + this.propertyName = propertyName; + this.converter = converter; + } + + public String getPropertyName() { + return propertyName; + } + + /** + * Retrieves the value of this property. + * + * @return Optional containing the value of this property + */ + public Optional<V> get() { + final String value = System.getProperty(propertyName); + return value == null ? Optional.empty() : Optional.of(converter.apply(value)); + } + + /** + * Retrieves the value of this property, or the given default if no value was set. + * + * @return the value of this property, or the given default if no value was set + */ + public V get(final V defaultValue) { + final String value = System.getProperty(propertyName); + return value == null ? defaultValue : converter.apply(value); + } +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java new file mode 100644 index 0000000..980aaa9 --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/tests/util/TestUtils.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util; + +import org.apache.flink.test.parameters.ParameterProperty; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** General test utilities. */ +public enum TestUtils { + ; + + private static final ParameterProperty<Path> MODULE_DIRECTORY = + new ParameterProperty<>("moduleDir", Paths::get); + + /** + * Searches for a resource file matching the given regex in the given directory. This method is + * primarily intended to be used for the initialization of static {@link Path} fields for + * resource file(i.e. jar, config file) that reside in the modules {@code target} directory. + * + * @param resourceNameRegex regex pattern to match against + * @return Path pointing to the matching jar + * @throws RuntimeException if none or multiple resource files could be found + */ + public static Path getResource(final String resourceNameRegex) { + // if the property is not set then we are most likely running in the IDE, where the working + // directory is the + // module of the test that is currently running, which is exactly what we want + Path moduleDirectory = MODULE_DIRECTORY.get(Paths.get("").toAbsolutePath()); + + try (Stream<Path> dependencyResources = Files.walk(moduleDirectory)) { + final List<Path> matchingResources = + dependencyResources + .filter( + jar -> + Pattern.compile(resourceNameRegex) + .matcher(jar.toAbsolutePath().toString()) + .find()) + .collect(Collectors.toList()); + switch (matchingResources.size()) { + case 0: + throw new RuntimeException( + new FileNotFoundException( + String.format( + "No resource file could be found that matches the pattern %s. " + + "This could mean that the test module must be rebuilt via maven.", + resourceNameRegex))); + case 1: + return matchingResources.get(0); + default: + throw new RuntimeException( + new IOException( + String.format( + "Multiple resource files were found matching the pattern %s. Matches=%s", + resourceNameRegex, matchingResources))); + } + } catch (final IOException ioe) { + throw new RuntimeException("Could not search for resource resource files.", ioe); + } + } +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/pom.xml b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/pom.xml new file mode 100644 index 0000000..c7d706f --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/pom.xml @@ -0,0 +1,120 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch-e2e-tests</artifactId> + <version>3.0-SNAPSHOT</version> + </parent> + + <artifactId>flink-connector-elasticsearch6-e2e-tests</artifactId> + <name>Flink : Connectors : Elasticsearch 6 : E2E tests</name> + <packaging>jar</packaging> + + <properties> + <elasticsearch.version>6.8.20</elasticsearch.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch6</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch-e2e-tests-common</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>${log4j.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <finalName>elasticsearch6-end-to-end-test</finalName> + <outputDirectory>dependencies</outputDirectory> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy</id> + <phase>pre-integration-test</phase> + <goals> + <goal>copy</goal> + </goals> + </execution> + </executions> + <configuration> + <artifactItems> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch-e2e-tests-common</artifactId> + <version>${project.version}</version> + <destFileName>flink-connector-elasticsearch-test-utils.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies + </outputDirectory> + </artifactItem> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-test-utils</artifactId> + <version>${flink.version}</version> + <destFileName>flink-connector-test-utils.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies + </outputDirectory> + </artifactItem> + </artifactItems> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6Client.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6Client.java new file mode 100644 index 0000000..b2c80fc --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6Client.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.http.HttpHost; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortOrder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** The type Elasticsearch 6 client. */ +public class Elasticsearch6Client implements ElasticsearchClient { + + private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6Client.class); + + private final RestHighLevelClient restClient; + + /** + * Instantiates a new Elasticsearch 6 client. + * + * @param addressExternal The address to access Elasticsearch from the host machine (outside of + * the containerized environment). + */ + public Elasticsearch6Client(String addressExternal) { + checkNotNull(addressExternal); + HttpHost httpHost = HttpHost.create(addressExternal); + RestClientBuilder restClientBuilder = RestClient.builder(httpHost); + this.restClient = new RestHighLevelClient(restClientBuilder); + checkNotNull(restClient); + } + + @Override + public void deleteIndex(String indexName) { + DeleteIndexRequest request = new DeleteIndexRequest(indexName); + try { + restClient.indices().delete(request, RequestOptions.DEFAULT); + } catch (IOException e) { + LOG.error("Cannot delete index {}", indexName, e); + } + // This is needed to avoid race conditions between tests that reuse the same index + refreshIndex(indexName); + } + + @Override + public void refreshIndex(String indexName) { + RefreshRequest refresh = new RefreshRequest(indexName); + refresh.indicesOptions(IndicesOptions.strictSingleIndexNoExpandForbidClosed()); + try { + restClient.indices().refresh(refresh, RequestOptions.DEFAULT); + } catch (IOException e) { + LOG.error("Cannot refresh index {}", indexName, e); + } catch (ElasticsearchException e) { + if (e.status() == RestStatus.NOT_FOUND) { + LOG.info("Index {} not found", indexName); + } + } + } + + @Override + public void createIndexIfDoesNotExist(String indexName, int shards, int replicas) { + GetIndexRequest request = new GetIndexRequest(indexName); + CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); + createIndexRequest.settings( + Settings.builder() + .put("index.number_of_shards", shards) + .put("index.number_of_replicas", replicas)); + try { + boolean exists = restClient.indices().exists(request, RequestOptions.DEFAULT); + if (!exists) { + restClient.indices().create(createIndexRequest, RequestOptions.DEFAULT); + } else { + LOG.info("Index already exists {}", indexName); + } + } catch (IOException e) { + LOG.error("Cannot create index {}", indexName, e); + } + } + + @Override + public void close() throws Exception { + restClient.close(); + } + + @Override + public List<KeyValue<Integer, String>> fetchAll(QueryParams params) { + try { + SearchResponse response = + restClient.search( + new SearchRequest(params.indexName()) + .source( + new SearchSourceBuilder() + .sort(params.sortField(), SortOrder.ASC) + .from(params.from()) + .size(params.pageLength()) + .trackTotalHits(params.trackTotalHits())), + RequestOptions.DEFAULT); + SearchHit[] searchHits = response.getHits().getHits(); + return Arrays.stream(searchHits) + .map( + searchHit -> + KeyValue.of( + Integer.valueOf(searchHit.getId()), + searchHit.getSourceAsMap().get("value").toString())) + .collect(Collectors.toList()); + } catch (IOException e) { + LOG.error("Fetching records failed", e); + return Collections.emptyList(); + } + } +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest6Factory.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest6Factory.java new file mode 100644 index 0000000..9659913 --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest6Factory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.elasticsearch.action.update.UpdateRequest; + +import java.util.Map; + +/** Factory for creating UpdateRequests of Elasticsearch6. */ +public class UpdateRequest6Factory implements UpdateRequestFactory { + + private static final long serialVersionUID = 1L; + + private final String indexName; + + /** + * Instantiates a new update request factory for of Elasticsearch6. + * + * @param indexName The index name. + */ + public UpdateRequest6Factory(String indexName) { + this.indexName = indexName; + } + + @Override + public UpdateRequest createUpdateRequest(KeyValue<Integer, String> element) { + Map<String, Object> json = UpdateRequestFactory.prepareDoc(element); + return new UpdateRequest(indexName, "doc", String.valueOf(element.key)) + .doc(json) + .upsert(json); + } +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkE2ECase.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkE2ECase.java new file mode 100644 index 0000000..cc95275 --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkE2ECase.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.connector.testframe.junit.annotations.TestContext; +import org.apache.flink.tests.util.TestUtils; +import org.apache.flink.util.DockerImageVersions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; + +/** End to end test for Elasticsearch6Sink based on connector testing framework. */ +@SuppressWarnings("unused") +public class Elasticsearch6SinkE2ECase + extends ElasticsearchSinkE2ECaseBase<KeyValue<Integer, String>> { + + private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6SinkE2ECase.class); + + public Elasticsearch6SinkE2ECase() throws Exception {} + + String getElasticsearchContainerName() { + return DockerImageVersions.ELASTICSEARCH_6; + } + + @TestContext + Elasticsearch6SinkExternalContextFactory contextFactory = + new Elasticsearch6SinkExternalContextFactory( + elasticsearch.getContainer(), + Arrays.asList( + TestUtils.getResource("dependencies/elasticsearch6-end-to-end-test.jar") + .toAbsolutePath() + .toUri() + .toURL(), + TestUtils.getResource("dependencies/flink-connector-test-utils.jar") + .toAbsolutePath() + .toUri() + .toURL(), + TestUtils.getResource( + "dependencies/flink-connector-elasticsearch-test-utils.jar") + .toAbsolutePath() + .toUri() + .toURL())); +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContext.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContext.java new file mode 100644 index 0000000..1ccbcd2 --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContext.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder; +import org.apache.flink.connector.testframe.external.ExternalSystemDataReader; +import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings; + +import org.apache.http.HttpHost; + +import java.net.URL; +import java.util.List; + +class Elasticsearch6SinkExternalContext extends ElasticsearchSinkExternalContextBase { + + /** + * Instantiates a new Elasticsearch 6 sink context base. + * + * @param addressExternal The address to access Elasticsearch from the host machine (outside of + * the containerized environment). + * @param addressInternal The address to access Elasticsearch from Flink. When running in a + * containerized environment, should correspond to the network alias that resolves within + * the environment's network together with the exposed port. + * @param connectorJarPaths The connector jar paths. + */ + Elasticsearch6SinkExternalContext( + String addressExternal, String addressInternal, List<URL> connectorJarPaths) { + super(addressInternal, connectorJarPaths, new Elasticsearch6Client(addressExternal)); + } + + @Override + public Sink<KeyValue<Integer, String>> createSink(TestingSinkSettings sinkSettings) { + client.createIndexIfDoesNotExist(indexName, 1, 0); + return new Elasticsearch6SinkBuilder<KeyValue<Integer, String>>() + .setHosts(HttpHost.create(this.addressInternal)) + .setEmitter(new ElasticsearchTestEmitter(new UpdateRequest6Factory(indexName))) + .setBulkFlushMaxActions(BULK_BUFFER) + .build(); + } + + @Override + public ExternalSystemDataReader<KeyValue<Integer, String>> createSinkDataReader( + TestingSinkSettings sinkSettings) { + return new ElasticsearchDataReader(client, indexName, PAGE_LENGTH); + } + + @Override + public String toString() { + return "Elasticsearch 6 sink context."; + } +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContextFactory.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContextFactory.java new file mode 100644 index 0000000..690fbd5 --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExternalContextFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.testcontainers.elasticsearch.ElasticsearchContainer; + +import java.net.URL; +import java.util.List; + +/** Elasticsearch sink external context factory. */ +class Elasticsearch6SinkExternalContextFactory + extends ElasticsearchSinkExternalContextFactoryBase<Elasticsearch6SinkExternalContext> { + + /** + * Instantiates a new Elasticsearch 6 sink external context factory. + * + * @param elasticsearchContainer The Elasticsearch container. + * @param connectorJars The connector jars. + */ + Elasticsearch6SinkExternalContextFactory( + ElasticsearchContainer elasticsearchContainer, List<URL> connectorJars) { + super(elasticsearchContainer, connectorJars); + } + + @Override + public Elasticsearch6SinkExternalContext createExternalContext(String testName) { + return new Elasticsearch6SinkExternalContext( + elasticsearchContainer.getHttpHostAddress(), + formatInternalAddress(elasticsearchContainer), + connectorJars); + } +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/resources/log4j2-test.properties b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000..4da8bba --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch6-e2e-tests/src/test/resources/log4j2-test.properties @@ -0,0 +1,34 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level=OFF +rootLogger.appenderRef.test.ref=TestLogger +appender.testlogger.name=TestLogger +appender.testlogger.type=CONSOLE +appender.testlogger.target=SYSTEM_ERR +appender.testlogger.layout.type=PatternLayout +appender.testlogger.layout.pattern=DOCKER> %m%n +# It is recommended to uncomment these lines when enabling the logger. The below package used +# by testcontainers is quite verbose +logger.yarn.name=org.testcontainers.shaded.com.github.dockerjava.core +logger.yarn.level=WARN +logger.yarn.appenderRef.console.ref=TestLogger +logger.testutils.name=org.apache.flink.runtime.testutils.CommonTestUtils +logger.testutils.level=WARN +logger.testutils.appenderRef.console.ref=TestLogger diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/pom.xml b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/pom.xml new file mode 100644 index 0000000..9e020f1 --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/pom.xml @@ -0,0 +1,121 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch-e2e-tests</artifactId> + <version>3.0-SNAPSHOT</version> + </parent> + + <artifactId>flink-connector-elasticsearch7-e2e-tests</artifactId> + <name>Flink : Connectors : Elasticsearch 7 : E2E Tests</name> + <packaging>jar</packaging> + + <properties> + <elasticsearch.version>7.10.2</elasticsearch.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch7</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch-e2e-tests-common</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>${log4j.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <finalName>elasticsearch7-end-to-end-test</finalName> + <outputDirectory>dependencies</outputDirectory> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy</id> + <phase>pre-integration-test</phase> + <goals> + <goal>copy</goal> + </goals> + </execution> + </executions> + <configuration> + <artifactItems> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch-e2e-tests-common</artifactId> + <version>${project.version}</version> + <destFileName>flink-connector-elasticsearch-test-utils.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies + </outputDirectory> + </artifactItem> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-test-utils</artifactId> + <version>${flink.version}</version> + <destFileName>flink-connector-test-utils.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies + </outputDirectory> + </artifactItem> + </artifactItems> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7Client.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7Client.java new file mode 100644 index 0000000..7faba25 --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7Client.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.http.HttpHost; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortOrder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** The type Elasticsearch 7 client. */ +public class Elasticsearch7Client implements ElasticsearchClient { + + private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch7Client.class); + + private final RestHighLevelClient restClient; + + /** + * Instantiates a new Elasticsearch 7 client. + * + * @param addressExternal The address to access Elasticsearch from the host machine (outside of + * the containerized environment). + */ + public Elasticsearch7Client(String addressExternal) { + checkNotNull(addressExternal); + HttpHost httpHost = HttpHost.create(addressExternal); + RestClientBuilder restClientBuilder = RestClient.builder(httpHost); + this.restClient = new RestHighLevelClient(restClientBuilder); + checkNotNull(restClient); + } + + @Override + public void deleteIndex(String indexName) { + DeleteIndexRequest request = new DeleteIndexRequest(indexName); + try { + restClient.indices().delete(request, RequestOptions.DEFAULT); + } catch (IOException e) { + LOG.error("Cannot delete index {}", indexName, e); + } + // This is needed to avoid race conditions between tests that reuse the same index + refreshIndex(indexName); + } + + @Override + public void refreshIndex(String indexName) { + RefreshRequest refresh = new RefreshRequest(indexName); + try { + restClient.indices().refresh(refresh, RequestOptions.DEFAULT); + } catch (IOException e) { + LOG.error("Cannot delete index {}", indexName, e); + } catch (ElasticsearchException e) { + if (e.status() == RestStatus.NOT_FOUND) { + LOG.info("Index {} not found", indexName); + } + } + } + + @Override + public void createIndexIfDoesNotExist(String indexName, int shards, int replicas) { + GetIndexRequest request = new GetIndexRequest(indexName); + CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); + createIndexRequest.settings( + Settings.builder() + .put("index.number_of_shards", shards) + .put("index.number_of_replicas", replicas)); + try { + boolean exists = restClient.indices().exists(request, RequestOptions.DEFAULT); + if (!exists) { + restClient.indices().create(createIndexRequest, RequestOptions.DEFAULT); + } else { + LOG.info("Index already exists {}", indexName); + } + } catch (IOException e) { + LOG.error("Cannot create index {}", indexName, e); + } + } + + @Override + public void close() throws Exception { + restClient.close(); + } + + @Override + public List<KeyValue<Integer, String>> fetchAll(QueryParams params) { + try { + SearchResponse response = + restClient.search( + new SearchRequest(params.indexName()) + .source( + new SearchSourceBuilder() + .sort(params.sortField(), SortOrder.ASC) + .from(params.from()) + .size(params.pageLength()) + .trackTotalHits(params.trackTotalHits())), + RequestOptions.DEFAULT); + SearchHit[] searchHits = response.getHits().getHits(); + return Arrays.stream(searchHits) + .map( + searchHit -> + KeyValue.of( + Integer.valueOf(searchHit.getId()), + searchHit.getSourceAsMap().get("value").toString())) + .collect(Collectors.toList()); + } catch (IOException e) { + LOG.error("Fetching records failed", e); + return Collections.emptyList(); + } + } +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest7Factory.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest7Factory.java new file mode 100644 index 0000000..875d09a --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/main/java/org/apache/flink/streaming/tests/UpdateRequest7Factory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.elasticsearch.action.update.UpdateRequest; + +import java.util.Map; + +/** Factory for creating UpdateRequests of Elasticsearch7. */ +public class UpdateRequest7Factory implements UpdateRequestFactory { + + private static final long serialVersionUID = 1L; + + private final String indexName; + + /** + * Instantiates a new update request factory for of Elasticsearch7. + * + * @param indexName The index name. + */ + public UpdateRequest7Factory(String indexName) { + this.indexName = indexName; + } + + @Override + public UpdateRequest createUpdateRequest(KeyValue<Integer, String> element) { + Map<String, Object> json = UpdateRequestFactory.prepareDoc(element); + return new UpdateRequest(indexName, String.valueOf(element.key)).doc(json).upsert(json); + } +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkE2ECase.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkE2ECase.java new file mode 100644 index 0000000..59be31c --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkE2ECase.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.connector.testframe.junit.annotations.TestContext; +import org.apache.flink.tests.util.TestUtils; +import org.apache.flink.util.DockerImageVersions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; + +/** End to end test for Elasticsearch7Sink based on connector testing framework. */ +@SuppressWarnings("unused") +public class Elasticsearch7SinkE2ECase + extends ElasticsearchSinkE2ECaseBase<KeyValue<Integer, String>> { + + private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch7SinkE2ECase.class); + + public Elasticsearch7SinkE2ECase() throws Exception {} + + String getElasticsearchContainerName() { + return DockerImageVersions.ELASTICSEARCH_7; + } + + @TestContext + Elasticsearch7SinkExternalContextFactory contextFactory = + new Elasticsearch7SinkExternalContextFactory( + elasticsearch.getContainer(), + Arrays.asList( + TestUtils.getResource("dependencies/elasticsearch7-end-to-end-test.jar") + .toAbsolutePath() + .toUri() + .toURL(), + TestUtils.getResource("dependencies/flink-connector-test-utils.jar") + .toAbsolutePath() + .toUri() + .toURL(), + TestUtils.getResource( + "dependencies/flink-connector-elasticsearch-test-utils.jar") + .toAbsolutePath() + .toUri() + .toURL())); +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContext.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContext.java new file mode 100644 index 0000000..aa31ea0 --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContext.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder; +import org.apache.flink.connector.testframe.external.ExternalSystemDataReader; +import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings; + +import org.apache.http.HttpHost; + +import java.net.URL; +import java.util.List; + +class Elasticsearch7SinkExternalContext extends ElasticsearchSinkExternalContextBase { + + /** + * Instantiates a new Elasticsearch 7 sink context base. + * + * @param addressExternal The address to access Elasticsearch from the host machine (outside of + * the containerized environment). + * @param addressInternal The address to access Elasticsearch from Flink. When running in a + * containerized environment, should correspond to the network alias that resolves within + * the environment's network together with the exposed port. + * @param connectorJarPaths The connector jar paths. + */ + Elasticsearch7SinkExternalContext( + String addressExternal, String addressInternal, List<URL> connectorJarPaths) { + super(addressInternal, connectorJarPaths, new Elasticsearch7Client(addressExternal)); + } + + @Override + public Sink<KeyValue<Integer, String>> createSink(TestingSinkSettings sinkSettings) { + client.createIndexIfDoesNotExist(indexName, 1, 0); + return new Elasticsearch7SinkBuilder<KeyValue<Integer, String>>() + .setHosts(HttpHost.create(this.addressInternal)) + .setEmitter(new ElasticsearchTestEmitter(new UpdateRequest7Factory(indexName))) + .setBulkFlushMaxActions(BULK_BUFFER) + .build(); + } + + @Override + public ExternalSystemDataReader<KeyValue<Integer, String>> createSinkDataReader( + TestingSinkSettings sinkSettings) { + return new ElasticsearchDataReader(client, indexName, PAGE_LENGTH); + } + + @Override + public String toString() { + return "Elasticsearch 7 sink context."; + } +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContextFactory.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContextFactory.java new file mode 100644 index 0000000..c0e3f41 --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExternalContextFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.testcontainers.elasticsearch.ElasticsearchContainer; + +import java.net.URL; +import java.util.List; + +/** Elasticsearch sink external context factory. */ +class Elasticsearch7SinkExternalContextFactory + extends ElasticsearchSinkExternalContextFactoryBase<Elasticsearch7SinkExternalContext> { + + /** + * Instantiates a new Elasticsearch 7 sink external context factory. + * + * @param elasticsearchContainer The Elasticsearch container. + * @param connectorJars The connector jars. + */ + Elasticsearch7SinkExternalContextFactory( + ElasticsearchContainer elasticsearchContainer, List<URL> connectorJars) { + super(elasticsearchContainer, connectorJars); + } + + @Override + public Elasticsearch7SinkExternalContext createExternalContext(String testName) { + return new Elasticsearch7SinkExternalContext( + elasticsearchContainer.getHttpHostAddress(), + formatInternalAddress(elasticsearchContainer), + connectorJars); + } +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/resources/log4j2-test.properties b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000..e48d6c0 --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch7-e2e-tests/src/test/resources/log4j2-test.properties @@ -0,0 +1,35 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level=OFF +rootLogger.appenderRef.test.ref=TestLogger +appender.testlogger.name=TestLogger +appender.testlogger.type=CONSOLE +appender.testlogger.target=SYSTEM_ERR +appender.testlogger.layout.type=PatternLayout +appender.testlogger.layout.pattern=DOCKER> %m%n +# It is recommended to uncomment these lines when enabling the logger. The below package used +# by testcontainers is quite verbose +logger.yarn.name=org.testcontainers.shaded.com.github.dockerjava.core +logger.yarn.level=WARN +logger.yarn.appenderRef.console.ref=TestLogger +logger.testutils.name=org.apache.flink.runtime.testutils.CommonTestUtils +logger.testutils.level=WARN +logger.testutils.appenderRef.console.ref=TestLogger + diff --git a/flink-connector-elasticsearch-e2e-tests/pom.xml b/flink-connector-elasticsearch-e2e-tests/pom.xml new file mode 100644 index 0000000..87756e1 --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/pom.xml @@ -0,0 +1,128 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch-parent</artifactId> + <version>3.0-SNAPSHOT</version> + </parent> + + <packaging>pom</packaging> + + <artifactId>flink-connector-elasticsearch-e2e-tests</artifactId> + <name>Flink : Connectors : Elasticsearch : E2E Tests</name> + + <modules> + <module>flink-connector-elasticsearch-e2e-tests-common</module> + <module>flink-connector-elasticsearch6-e2e-tests</module> + <module>flink-connector-elasticsearch7-e2e-tests</module> + </modules> + + <profiles> + <profile> + <id>run-end-to-end-tests</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <executions> + <execution> + <id>end-to-end-tests</id> + <phase>integration-test</phase> + <goals> + <goal>test</goal> + </goals> + <configuration> + <includes> + <include>**/*.*</include> + </includes> + <!-- E2E tests must not access flink-dist concurrently. --> + <forkCount>1</forkCount> + <systemPropertyVariables> + <moduleDir>${project.basedir}</moduleDir> + </systemPropertyVariables> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-deploy-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <executions> + <execution> + <id>default-test</id> + <phase>none</phase> + </execution> + <execution> + <id>integration-tests</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + </plugins> + + <pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <configuration> + <artifactSet> + <excludes combine.children="append"> + <exclude>com.google.code.findbugs:jsr305</exclude> + <exclude>org.slf4j:slf4j-api</exclude> + </excludes> + </artifactSet> + <filters combine.children="append"> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </plugin> + </plugins> + </pluginManagement> + </build> + +</project>