This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ccd2d531d1cb577113d5021efd6277031eeef9d1 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Fri May 15 20:06:46 2020 +0200 [FLINK-17027] Introduce a new Elasticsearch 6 connector with new property keys This closes #12184 --- .../flink-connector-elasticsearch6/pom.xml | 22 ++ .../table/Elasticsearch6Configuration.java | 80 +++++++ .../table/Elasticsearch6DynamicSink.java | 251 ++++++++++++++++++++ .../table/Elasticsearch6DynamicSinkFactory.java | 155 ++++++++++++ .../org.apache.flink.table.factories.Factory | 16 ++ .../Elasticsearch6DynamicSinkFactoryTest.java | 207 ++++++++++++++++ .../table/Elasticsearch6DynamicSinkITCase.java | 262 +++++++++++++++++++++ .../table/Elasticsearch6DynamicSinkTest.java | 195 +++++++++++++++ 8 files changed, 1188 insertions(+) diff --git a/flink-connectors/flink-connector-elasticsearch6/pom.xml b/flink-connectors/flink-connector-elasticsearch6/pom.xml index 25a9f5a..272f9af 100644 --- a/flink-connectors/flink-connector-elasticsearch6/pom.xml +++ b/flink-connectors/flink-connector-elasticsearch6/pom.xml @@ -145,6 +145,14 @@ under the License. <scope>test</scope> </dependency> + <!-- Table API integration tests --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <!-- Elasticsearch table sink factory testing --> <dependency> <groupId>org.apache.flink</groupId> @@ -154,4 +162,18 @@ under the License. </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <!-- Enforce single fork execution because of spawning + Elasticsearch cluster multiple times --> + <forkCount>1</forkCount> + </configuration> + </plugin> + </plugins> + </build> </project> diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6Configuration.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6Configuration.java new file mode 100644 index 0000000..c06898e --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6Configuration.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.ValidationException; + +import org.apache.http.HttpHost; + +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION; + +/** + * Elasticsearch 7 specific configuration. + */ +@Internal +final class Elasticsearch6Configuration extends ElasticsearchConfiguration { + Elasticsearch6Configuration(ReadableConfig config, ClassLoader classLoader) { + super(config, classLoader); + } + + public List<HttpHost> getHosts() { + return config.get(HOSTS_OPTION).stream() + .map(Elasticsearch6Configuration::validateAndParseHostsString) + .collect(Collectors.toList()); + } + + /** + * Parse Hosts String to list. + * + * <p>Hosts String format was given as following: + * + * <pre> + * connector.hosts = http://host_name:9092;http://host_name:9093 + * </pre> + */ + private static HttpHost validateAndParseHostsString(String host) { + try { + HttpHost httpHost = HttpHost.create(host); + if (httpHost.getPort() < 0) { + throw new ValidationException(String.format( + "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing port.", + host, + HOSTS_OPTION.key())); + } + + if (httpHost.getSchemeName() == null) { + throw new ValidationException(String.format( + "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing scheme.", + host, + HOSTS_OPTION.key())); + } + return httpHost; + } catch (Exception e) { + throw new ValidationException(String.format( + "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'.", + host, + HOSTS_OPTION.key()), e); + } + } +} diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java new file mode 100644 index 0000000..eadf659 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; +import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.SinkFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.RowKind; + +import org.apache.http.HttpHost; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.common.xcontent.XContentType; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Objects; + +/** + * A {@link DynamicTableSink} that describes how to create a {@link ElasticsearchSink} from a logical + * description. + */ +@PublicEvolving +final class Elasticsearch6DynamicSink implements DynamicTableSink { + @VisibleForTesting + static final Elasticsearch7RequestFactory REQUEST_FACTORY = new Elasticsearch7RequestFactory(); + + private final SinkFormat<SerializationSchema<RowData>> format; + private final TableSchema schema; + private final Elasticsearch6Configuration config; + + public Elasticsearch6DynamicSink( + SinkFormat<SerializationSchema<RowData>> format, + Elasticsearch6Configuration config, + TableSchema schema) { + this(format, config, schema, (ElasticsearchSink.Builder::new)); + } + + //-------------------------------------------------------------- + // Hack to make configuration testing possible. + // + // The code in this block should never be used outside of tests. + // Having a way to inject a builder we can assert the builder in + // the test. We can not assert everything though, e.g. it is not + // possible to assert flushing on checkpoint, as it is configured + // on the sink itself. + //-------------------------------------------------------------- + + private final ElasticSearchBuilderProvider builderProvider; + + @FunctionalInterface + interface ElasticSearchBuilderProvider { + ElasticsearchSink.Builder<RowData> createBuilder( + List<HttpHost> httpHosts, + RowElasticsearchSinkFunction upsertSinkFunction); + } + + Elasticsearch6DynamicSink( + SinkFormat<SerializationSchema<RowData>> format, + Elasticsearch6Configuration config, + TableSchema schema, + ElasticSearchBuilderProvider builderProvider) { + this.format = format; + this.schema = schema; + this.config = config; + this.builderProvider = builderProvider; + } + + //-------------------------------------------------------------- + // End of hack to make configuration testing possible + //-------------------------------------------------------------- + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + ChangelogMode.Builder builder = ChangelogMode.newBuilder(); + for (RowKind kind : requestedMode.getContainedKinds()) { + if (kind != RowKind.UPDATE_BEFORE) { + builder.addContainedKind(kind); + } + } + return builder.build(); + } + + @Override + public SinkFunctionProvider getSinkRuntimeProvider(Context context) { + return () -> { + SerializationSchema<RowData> format = this.format.createSinkFormat(context, schema.toRowDataType()); + + final RowElasticsearchSinkFunction upsertFunction = + new RowElasticsearchSinkFunction( + IndexGeneratorFactory.createIndexGenerator(config.getIndex(), schema), + config.getDocumentType(), + format, + XContentType.JSON, + REQUEST_FACTORY, + KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()) + ); + + final ElasticsearchSink.Builder<RowData> builder = builderProvider.createBuilder( + config.getHosts(), + upsertFunction); + + builder.setFailureHandler(config.getFailureHandler()); + config.getBulkFlushMaxActions().ifPresent(builder::setBulkFlushMaxActions); + config.getBulkFlushMaxSize().ifPresent(builder::setBulkFlushMaxSizeMb); + config.getBulkFlushInterval().ifPresent(builder::setBulkFlushInterval); + builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled()); + config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType); + config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries); + config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay); + + config.getPathPrefix() + .ifPresent(pathPrefix -> builder.setRestClientFactory(new DefaultRestClientFactory(pathPrefix))); + + final ElasticsearchSink<RowData> sink = builder.build(); + + if (config.isDisableFlushOnCheckpoint()) { + sink.disableFlushOnCheckpoint(); + } + + return sink; + }; + } + + @Override + public DynamicTableSink copy() { + return this; + } + + @Override + public String asSummaryString() { + return "Elasticsearch7"; + } + + /** + * Serializable {@link RestClientFactory} used by the sink. + */ + @VisibleForTesting + static class DefaultRestClientFactory implements RestClientFactory { + + private final String pathPrefix; + + public DefaultRestClientFactory(@Nullable String pathPrefix) { + this.pathPrefix = pathPrefix; + } + + @Override + public void configureRestClientBuilder(RestClientBuilder restClientBuilder) { + if (pathPrefix != null) { + restClientBuilder.setPathPrefix(pathPrefix); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DefaultRestClientFactory that = (DefaultRestClientFactory) o; + return Objects.equals(pathPrefix, that.pathPrefix); + } + + @Override + public int hashCode() { + return Objects.hash(pathPrefix); + } + } + + /** + * Version-specific creation of {@link org.elasticsearch.action.ActionRequest}s used by the sink. + */ + private static class Elasticsearch7RequestFactory implements RequestFactory { + @Override + public UpdateRequest createUpdateRequest( + String index, + String docType, + String key, + XContentType contentType, + byte[] document) { + return new UpdateRequest(index, docType, key) + .doc(document, contentType) + .upsert(document, contentType); + } + + @Override + public IndexRequest createIndexRequest( + String index, + String docType, + String key, + XContentType contentType, + byte[] document) { + return new IndexRequest(index, docType, index) + .source(document, contentType); + } + + @Override + public DeleteRequest createDeleteRequest(String index, String docType, String key) { + return new DeleteRequest(index, docType, key); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Elasticsearch6DynamicSink that = (Elasticsearch6DynamicSink) o; + return Objects.equals(format, that.format) && + Objects.equals(schema, that.schema) && + Objects.equals(config, that.config) && + Objects.equals(builderProvider, that.builderProvider); + } + + @Override + public int hashCode() { + return Objects.hash(format, schema, config, builderProvider); + } +} diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java new file mode 100644 index 0000000..65c90b5 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.connector.format.SinkFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.utils.TableSchemaUtils; + +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_PATH_PREFIX; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.DOCUMENT_TYPE_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION; + +/** + * A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch6DynamicSink}. + */ +@Internal +public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory { + private static final Set<ConfigOption<?>> requiredOptions = Stream.of( + HOSTS_OPTION, + INDEX_OPTION, + DOCUMENT_TYPE_OPTION + ).collect(Collectors.toSet()); + private static final Set<ConfigOption<?>> optionalOptions = Stream.of( + KEY_DELIMITER_OPTION, + FAILURE_HANDLER_OPTION, + FLUSH_ON_CHECKPOINT_OPTION, + BULK_FLASH_MAX_SIZE_OPTION, + BULK_FLUSH_MAX_ACTIONS_OPTION, + BULK_FLUSH_INTERVAL_OPTION, + BULK_FLUSH_BACKOFF_TYPE_OPTION, + BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION, + BULK_FLUSH_BACKOFF_DELAY_OPTION, + CONNECTION_MAX_RETRY_TIMEOUT_OPTION, + CONNECTION_PATH_PREFIX, + FORMAT_OPTION + ).collect(Collectors.toSet()); + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + TableSchema tableSchema = context.getCatalogTable().getSchema(); + ElasticsearchValidationUtils.validatePrimaryKey(tableSchema); + final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + + final SinkFormat<SerializationSchema<RowData>> format = helper.discoverSinkFormat( + SerializationFormatFactory.class, + FORMAT_OPTION); + + helper.validate(); + Configuration configuration = new Configuration(); + context.getCatalogTable() + .getOptions() + .forEach(configuration::setString); + Elasticsearch6Configuration config = new Elasticsearch6Configuration(configuration, context.getClassLoader()); + + validate(config, configuration); + + return new Elasticsearch6DynamicSink( + format, + config, + TableSchemaUtils.getPhysicalSchema(tableSchema)); + } + + private void validate(Elasticsearch6Configuration config, Configuration originalConfiguration) { + config.getFailureHandler(); // checks if we can instantiate the custom failure handler + config.getHosts(); // validate hosts + validate( + config.getIndex().length() >= 1, + () -> String.format("'%s' must not be empty", INDEX_OPTION.key())); + validate( + config.getBulkFlushMaxActions().map(maxActions -> maxActions >= 1).orElse(true), + () -> String.format( + "'%s' must be at least 1 character. Got: %s", + BULK_FLUSH_MAX_ACTIONS_OPTION.key(), + config.getBulkFlushMaxActions().get()) + ); + validate( + config.getBulkFlushMaxSize().map(maxSize -> maxSize >= 1024 * 1024).orElse(true), + () -> String.format( + "'%s' must be at least 1mb character. Got: %s", + BULK_FLASH_MAX_SIZE_OPTION.key(), + originalConfiguration.get(BULK_FLASH_MAX_SIZE_OPTION).toHumanReadableString()) + ); + validate( + config.getBulkFlushBackoffRetries().map(retries -> retries >= 1).orElse(true), + () -> String.format( + "'%s' must be at least 1. Got: %s", + BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(), + config.getBulkFlushBackoffRetries().get()) + ); + } + + private static void validate(boolean condition, Supplier<String> message) { + if (!condition) { + throw new ValidationException(message.get()); + } + } + + @Override + public String factoryIdentifier() { + return "elasticsearch-6"; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return requiredOptions; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + return optionalOptions; + } +} diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connectors/flink-connector-elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000..29a8593 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch6DynamicSinkFactory diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java new file mode 100644 index 0000000..f1be1b2 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context; + +/** + * Tests for validation in {@link Elasticsearch6DynamicSinkFactory}. + */ +public class Elasticsearch6DynamicSinkFactoryTest { + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void validateEmptyConfiguration() { + Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory(); + + thrown.expect(ValidationException.class); + thrown.expectMessage( + "One or more required options are missing.\n" + + "\n" + + "Missing required options are:\n" + + "\n" + + "document-type\n" + + "hosts\n" + + "index"); + sinkFactory.createDynamicTableSink( + context() + .withSchema(TableSchema.builder() + .field("a", DataTypes.TIME()) + .build()) + .build() + ); + } + + @Test + public void validateWrongIndex() { + Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory(); + + thrown.expect(ValidationException.class); + thrown.expectMessage( + "'index' must not be empty"); + sinkFactory.createDynamicTableSink( + context() + .withSchema(TableSchema.builder() + .field("a", DataTypes.TIME()) + .build()) + .withOption("index", "") + .withOption("document-type", "MyType") + .withOption("hosts", "http://localhost:12345") + .build() + ); + } + + @Test + public void validateWrongHosts() { + Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory(); + + thrown.expect(ValidationException.class); + thrown.expectMessage( + "Could not parse host 'wrong-host' in option 'hosts'. It should follow the format 'http://host_name:port'."); + sinkFactory.createDynamicTableSink( + context() + .withSchema(TableSchema.builder() + .field("a", DataTypes.TIME()) + .build()) + .withOption("index", "MyIndex") + .withOption("document-type", "MyType") + .withOption("hosts", "wrong-host") + .build() + ); + } + + @Test + public void validateWrongFlushSize() { + Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory(); + + thrown.expect(ValidationException.class); + thrown.expectMessage( + "'sink.bulk-flush.max-size' must be at least 1mb character. Got: 1024 bytes"); + sinkFactory.createDynamicTableSink( + context() + .withSchema(TableSchema.builder() + .field("a", DataTypes.TIME()) + .build()) + .withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex") + .withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), "MyType") + .withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234") + .withOption(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION.key(), "1kb") + .build() + ); + } + + @Test + public void validateWrongRetries() { + Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory(); + + thrown.expect(ValidationException.class); + thrown.expectMessage( + "'sink.bulk-flush.back-off.max-retries' must be at least 1. Got: 0"); + sinkFactory.createDynamicTableSink( + context() + .withSchema(TableSchema.builder() + .field("a", DataTypes.TIME()) + .build()) + .withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex") + .withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), "MyType") + .withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234") + .withOption(ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(), "0") + .build() + ); + } + + @Test + public void validateWrongMaxActions() { + Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory(); + + thrown.expect(ValidationException.class); + thrown.expectMessage( + "'sink.bulk-flush.max-actions' must be at least 1 character. Got: 0"); + sinkFactory.createDynamicTableSink( + context() + .withSchema(TableSchema.builder() + .field("a", DataTypes.TIME()) + .build()) + .withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex") + .withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), "MyType") + .withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234") + .withOption(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(), "0") + .build() + ); + } + + @Test + public void validateWrongBackoffDelay() { + Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory(); + + thrown.expect(ValidationException.class); + thrown.expectMessage( + "Invalid value for option 'sink.bulk-flush.back-off.delay'."); + sinkFactory.createDynamicTableSink( + context() + .withSchema(TableSchema.builder() + .field("a", DataTypes.TIME()) + .build()) + .withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex") + .withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), "MyType") + .withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234") + .withOption(ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(), "-1s") + .build() + ); + } + + @Test + public void validatePrimaryKeyOnIllegalColumn() { + Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory(); + + thrown.expect(ValidationException.class); + thrown.expectMessage( + "The table has a primary key on columns of illegal types: " + + "[ARRAY, MAP, MULTISET, ROW, RAW, VARBINARY].\n" + + " Elasticsearch sink does not support primary keys on columns of types: " + + "[ARRAY, MAP, MULTISET, STRUCTURED_TYPE, ROW, RAW, BINARY, VARBINARY]."); + sinkFactory.createDynamicTableSink( + context() + .withSchema(TableSchema.builder() + .field("a", DataTypes.BIGINT().notNull()) + .field("b", DataTypes.ARRAY(DataTypes.BIGINT().notNull()).notNull()) + .field("c", DataTypes.MAP(DataTypes.BIGINT(), DataTypes.STRING()).notNull()) + .field("d", DataTypes.MULTISET(DataTypes.BIGINT().notNull()).notNull()) + .field("e", DataTypes.ROW(DataTypes.FIELD("a", DataTypes.BIGINT())).notNull()) + .field("f", DataTypes.RAW(Types.BIG_INT).notNull()) + .field("g", DataTypes.BYTES().notNull()) + .primaryKey("a", "b", "c", "d", "e", "f", "g") + .build()) + .withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex") + .withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234") + .withOption(ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(), "1s") + .build() + ); + } +} diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java new file mode 100644 index 0000000..3c09653 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.testutils.ElasticsearchResource; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; + +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.search.SearchHits; +import org.junit.ClassRule; +import org.junit.Test; + +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context; +import static org.apache.flink.table.api.Expressions.row; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +/** + * IT tests for {@link Elasticsearch6DynamicSink}. + */ +public class Elasticsearch6DynamicSinkITCase { + + @ClassRule + public static ElasticsearchResource elasticsearchResource = new ElasticsearchResource("es-6-dynamic-sink-tests"); + + @Test + public void testWritingDocuments() throws Exception { + TableSchema schema = TableSchema.builder() + .field("a", DataTypes.BIGINT().notNull()) + .field("b", DataTypes.TIME()) + .field("c", DataTypes.STRING().notNull()) + .field("d", DataTypes.FLOAT()) + .field("e", DataTypes.TINYINT().notNull()) + .field("f", DataTypes.DATE()) + .field("g", DataTypes.TIMESTAMP().notNull()) + .primaryKey("a", "g") + .build(); + GenericRowData rowData = GenericRowData.of( + 1L, + 12345, + StringData.fromString("ABCDE"), + 12.12f, + (byte) 2, + 12345, + TimestampData.fromLocalDateTime(LocalDateTime.parse("2012-12-12T12:12:12"))); + + String index = "writing-documents"; + String myType = "MyType"; + Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory(); + + SinkFunctionProvider sinkRuntimeProvider = (SinkFunctionProvider) sinkFactory.createDynamicTableSink( + context() + .withSchema(schema) + .withOption(ElasticsearchOptions.INDEX_OPTION.key(), index) + .withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), myType) + .withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200") + .withOption(ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false") + .build() + ).getSinkRuntimeProvider(new MockContext()); + + SinkFunction<RowData> sinkFunction = sinkRuntimeProvider.createSinkFunction(); + StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + rowData.setRowKind(RowKind.UPDATE_AFTER); + environment.<RowData>fromElements(rowData).addSink(sinkFunction); + environment.execute(); + + Client client = elasticsearchResource.getClient(); + Map<String, Object> response = client.get(new GetRequest(index, myType, "1_2012-12-12T12:12:12")).actionGet().getSource(); + Map<Object, Object> expectedMap = new HashMap<>(); + expectedMap.put("a", 1); + expectedMap.put("b", "00:00:12Z"); + expectedMap.put("c", "ABCDE"); + expectedMap.put("d", 12.12d); + expectedMap.put("e", 2); + expectedMap.put("f", "2003-10-20"); + expectedMap.put("g", "2012-12-12T12:12:12Z"); + assertThat(response, equalTo(expectedMap)); + } + + @Test + public void testWritingDocumentsFromTableApi() throws Exception { + TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance() + .useBlinkPlanner() + .inStreamingMode() + .build()); + + String index = "table-api"; + String myType = "MyType"; + tableEnvironment.executeSql("CREATE TABLE esTable (" + + "a BIGINT NOT NULL,\n" + + "b TIME,\n" + + "c STRING NOT NULL,\n" + + "d FLOAT,\n" + + "e TINYINT NOT NULL,\n" + + "f DATE,\n" + + "g TIMESTAMP NOT NULL,\n" + + "h as a + 2,\n" + + "PRIMARY KEY (a, g) NOT ENFORCED\n" + + ")\n" + + "WITH (\n" + + String.format("'%s'='%s',\n", "connector", "elasticsearch-6") + + String.format("'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index) + + String.format("'%s'='%s',\n", ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), myType) + + String.format("'%s'='%s',\n", ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200") + + String.format("'%s'='%s'\n", ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false") + + ")"); + + tableEnvironment.fromValues( + row( + 1L, + LocalTime.ofNanoOfDay(12345L * 1_000_000L), + "ABCDE", + 12.12f, + (byte) 2, + LocalDate.ofEpochDay(12345), + LocalDateTime.parse("2012-12-12T12:12:12")) + ).executeInsert("esTable") + .getJobClient() + .get() + .getJobExecutionResult(this.getClass().getClassLoader()) + .get(); + + Client client = elasticsearchResource.getClient(); + Map<String, Object> response = client.get(new GetRequest(index, myType, "1_2012-12-12T12:12:12")) + .actionGet() + .getSource(); + Map<Object, Object> expectedMap = new HashMap<>(); + expectedMap.put("a", 1); + expectedMap.put("b", "00:00:12Z"); + expectedMap.put("c", "ABCDE"); + expectedMap.put("d", 12.12d); + expectedMap.put("e", 2); + expectedMap.put("f", "2003-10-20"); + expectedMap.put("g", "2012-12-12T12:12:12Z"); + assertThat(response, equalTo(expectedMap)); + } + + @Test + public void testWritingDocumentsNoPrimaryKey() throws Exception { + TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance() + .useBlinkPlanner() + .inStreamingMode() + .build()); + + String index = "no-primary-key"; + String myType = "MyType"; + tableEnvironment.executeSql("CREATE TABLE esTable (" + + "a BIGINT NOT NULL,\n" + + "b TIME,\n" + + "c STRING NOT NULL,\n" + + "d FLOAT,\n" + + "e TINYINT NOT NULL,\n" + + "f DATE,\n" + + "g TIMESTAMP NOT NULL\n" + + ")\n" + + "WITH (\n" + + String.format("'%s'='%s',\n", "connector", "elasticsearch-6") + + String.format("'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index) + + String.format("'%s'='%s',\n", ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), myType) + + String.format("'%s'='%s',\n", ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200") + + String.format("'%s'='%s'\n", ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false") + + ")"); + + tableEnvironment.fromValues( + row( + 1L, + LocalTime.ofNanoOfDay(12345L * 1_000_000L), + "ABCDE", + 12.12f, + (byte) 2, + LocalDate.ofEpochDay(12345), + LocalDateTime.parse("2012-12-12T12:12:12")) + ).executeInsert("esTable") + .getJobClient() + .get() + .getJobExecutionResult(this.getClass().getClassLoader()) + .get(); + + Client client = elasticsearchResource.getClient(); + + // search API does not return documents that were not indexed, we might need to query + // the index a few times + Deadline deadline = Deadline.fromNow(Duration.ofSeconds(1)); + SearchHits hits; + do { + hits = client.prepareSearch(index) + .execute() + .actionGet() + .getHits(); + if (hits.getTotalHits() == 0) { + Thread.sleep(100); + } + } while (hits.getTotalHits() == 0 && deadline.hasTimeLeft()); + + Map<String, Object> result = hits.getAt(0).getSourceAsMap(); + Map<Object, Object> expectedMap = new HashMap<>(); + expectedMap.put("a", 1); + expectedMap.put("b", "00:00:12Z"); + expectedMap.put("c", "ABCDE"); + expectedMap.put("d", 12.12d); + expectedMap.put("e", 2); + expectedMap.put("f", "2003-10-20"); + expectedMap.put("g", "2012-12-12T12:12:12Z"); + assertThat(result, equalTo(expectedMap)); + } + + private static class MockContext implements DynamicTableSink.Context { + @Override + public boolean isBounded() { + return false; + } + + @Override + public TypeInformation<?> createTypeInformation(DataType consumedDataType) { + return null; + } + + @Override + public DynamicTableSink.DataStructureConverter createDataStructureConverter(DataType consumedDataType) { + return null; + } + } +} diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java new file mode 100644 index 0000000..df54147 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.SinkFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; + +import org.apache.http.HttpHost; +import org.elasticsearch.action.ActionRequest; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.List; + +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; + +/** + * Tests for {@link Elasticsearch6DynamicSink} parameters. + */ +public class Elasticsearch6DynamicSinkTest { + + private static final String FIELD_KEY = "key"; + private static final String FIELD_FRUIT_NAME = "fruit_name"; + private static final String FIELD_COUNT = "count"; + private static final String FIELD_TS = "ts"; + + private static final String HOSTNAME = "host1"; + private static final int PORT = 1234; + private static final String SCHEMA = "https"; + private static final String INDEX = "MyIndex"; + private static final String DOC_TYPE = "MyType"; + + @Test + public void testBuilder() { + final TableSchema schema = createTestSchema(); + + BuilderProvider provider = new BuilderProvider(); + final Elasticsearch6DynamicSink testSink = new Elasticsearch6DynamicSink( + new DummySinkFormat(), + new Elasticsearch6Configuration(getConfig(), this.getClass().getClassLoader()), + schema, + provider + ); + + testSink.getSinkRuntimeProvider(new MockSinkContext()).createSinkFunction(); + + verify(provider.builderSpy).setFailureHandler(new DummyFailureHandler()); + verify(provider.builderSpy).setBulkFlushBackoff(true); + verify(provider.builderSpy).setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL); + verify(provider.builderSpy).setBulkFlushBackoffDelay(123); + verify(provider.builderSpy).setBulkFlushBackoffRetries(3); + verify(provider.builderSpy).setBulkFlushInterval(100); + verify(provider.builderSpy).setBulkFlushMaxActions(1000); + verify(provider.builderSpy).setBulkFlushMaxSizeMb(1); + verify(provider.builderSpy).setRestClientFactory(new Elasticsearch6DynamicSink.DefaultRestClientFactory("/myapp")); + verify(provider.sinkSpy).disableFlushOnCheckpoint(); + } + + private Configuration getConfig() { + Configuration configuration = new Configuration(); + configuration.setString(ElasticsearchOptions.INDEX_OPTION.key(), INDEX); + configuration.setString(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), DOC_TYPE); + configuration.setString(ElasticsearchOptions.HOSTS_OPTION.key(), SCHEMA + "://" + HOSTNAME + ":" + PORT); + configuration.setString(ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION.key(), "exponential"); + configuration.setString(ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(), "123"); + configuration.setString(ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(), "3"); + configuration.setString(ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION.key(), "100"); + configuration.setString(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(), "1000"); + configuration.setString(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION.key(), "1mb"); + configuration.setString(ElasticsearchOptions.CONNECTION_PATH_PREFIX.key(), "/myapp"); + configuration.setString(ElasticsearchOptions.FAILURE_HANDLER_OPTION.key(), DummyFailureHandler.class.getName()); + configuration.setString(ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false"); + return configuration; + } + + private static class BuilderProvider implements Elasticsearch6DynamicSink.ElasticSearchBuilderProvider { + public ElasticsearchSink.Builder<RowData> builderSpy; + public ElasticsearchSink<RowData> sinkSpy; + + @Override + public ElasticsearchSink.Builder<RowData> createBuilder( + List<HttpHost> httpHosts, + RowElasticsearchSinkFunction upsertSinkFunction) { + builderSpy = Mockito.spy(new ElasticsearchSink.Builder<>(httpHosts, upsertSinkFunction)); + doAnswer( + invocation -> { + sinkSpy = Mockito.spy((ElasticsearchSink<RowData>) invocation.callRealMethod()); + return sinkSpy; + } + ).when(builderSpy).build(); + + return builderSpy; + } + } + + private TableSchema createTestSchema() { + return TableSchema.builder() + .field(FIELD_KEY, DataTypes.BIGINT()) + .field(FIELD_FRUIT_NAME, DataTypes.STRING()) + .field(FIELD_COUNT, DataTypes.DECIMAL(10, 4)) + .field(FIELD_TS, DataTypes.TIMESTAMP(3)) + .build(); + } + + private static class DummySerializationSchema implements SerializationSchema<RowData> { + + private static final DummySerializationSchema INSTANCE = new DummySerializationSchema(); + + @Override + public byte[] serialize(RowData element) { + return new byte[0]; + } + } + + private static class DummySinkFormat implements SinkFormat<SerializationSchema<RowData>> { + @Override + public SerializationSchema<RowData> createSinkFormat( + DynamicTableSink.Context context, + DataType consumedDataType) { + return DummySerializationSchema.INSTANCE; + } + + @Override + public ChangelogMode getChangelogMode() { + return null; + } + } + + private static class MockSinkContext implements DynamicTableSink.Context { + @Override + public boolean isBounded() { + return false; + } + + @Override + public TypeInformation<?> createTypeInformation(DataType consumedDataType) { + return null; + } + + @Override + public DynamicTableSink.DataStructureConverter createDataStructureConverter(DataType consumedDataType) { + return null; + } + } + + /** + * Custom failure handler for testing. + */ + public static class DummyFailureHandler implements ActionRequestFailureHandler { + + @Override + public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) { + // do nothing + } + + @Override + public boolean equals(Object o) { + return o instanceof DummyFailureHandler; + } + + @Override + public int hashCode() { + return DummyFailureHandler.class.hashCode(); + } + } +}