This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0ce2af04963b51397d9b0e210475e69e36d60f36 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Fri May 15 20:06:40 2020 +0200 [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property keys --- .../table/AbstractTimeIndexGenerator.java | 43 ++++ .../table/ElasticsearchConfiguration.java | 152 ++++++++++++ .../elasticsearch/table/ElasticsearchOptions.java | 134 ++++++++++ .../table/ElasticsearchValidationUtils.java | 93 +++++++ .../elasticsearch/table/IndexGenerator.java | 42 ++++ .../elasticsearch/table/IndexGeneratorBase.java | 54 ++++ .../elasticsearch/table/IndexGeneratorFactory.java | 276 +++++++++++++++++++++ .../elasticsearch/table/KeyExtractor.java | 131 ++++++++++ .../elasticsearch/table/RequestFactory.java | 67 +++++ .../table/RowElasticsearchSinkFunction.java | 142 +++++++++++ .../elasticsearch/table/StaticIndexGenerator.java | 37 +++ .../elasticsearch/ElasticsearchSinkTestBase.java | 42 +--- .../table/IndexGeneratorFactoryTest.java | 213 ++++++++++++++++ .../elasticsearch/table/KeyExtractorTest.java | 130 ++++++++++ .../elasticsearch/table/TestContext.java | 79 ++++++ .../testutils/ElasticsearchResource.java | 78 ++++++ .../flink-connector-elasticsearch7/pom.xml | 22 ++ .../table/Elasticsearch7Configuration.java | 71 ++++++ .../table/Elasticsearch7DynamicSink.java | 252 +++++++++++++++++++ .../table/Elasticsearch7DynamicSinkFactory.java | 154 ++++++++++++ .../org.apache.flink.table.factories.Factory | 16 ++ .../Elasticsearch7DynamicSinkFactoryTest.java | 200 +++++++++++++++ .../table/Elasticsearch7DynamicSinkITCase.java | 254 +++++++++++++++++++ .../table/Elasticsearch7DynamicSinkTest.java | 195 +++++++++++++++ .../apache/flink/table/utils/TableSchemaUtils.java | 3 + 25 files changed, 2841 insertions(+), 39 deletions(-) diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/AbstractTimeIndexGenerator.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/AbstractTimeIndexGenerator.java new file mode 100644 index 0000000..864d7fa --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/AbstractTimeIndexGenerator.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.annotation.Internal; + +import java.time.format.DateTimeFormatter; + +/** + * Abstract class for time related {@link IndexGenerator}. + */ +@Internal +abstract class AbstractTimeIndexGenerator extends IndexGeneratorBase { + + private final String dateTimeFormat; + protected transient DateTimeFormatter dateTimeFormatter; + + public AbstractTimeIndexGenerator(String index, String dateTimeFormat) { + super(index); + this.dateTimeFormat = dateTimeFormat; + } + + @Override + public void open() { + this.dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat); + } +} diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java new file mode 100644 index 0000000..48b848c --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.util.InstantiationUtil; + +import java.time.Duration; +import java.util.Objects; +import java.util.Optional; + +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION; + +/** + * Accessor methods to elasticsearch options. + */ +@Internal +class ElasticsearchConfiguration { + protected final ReadableConfig config; + private final ClassLoader classLoader; + + ElasticsearchConfiguration(ReadableConfig config, ClassLoader classLoader) { + this.config = config; + this.classLoader = classLoader; + } + + public ActionRequestFailureHandler getFailureHandler() { + final ActionRequestFailureHandler failureHandler; + String value = config.get(FAILURE_HANDLER_OPTION); + switch (value.toUpperCase()) { + case "FAIL": + failureHandler = new NoOpFailureHandler(); + break; + case "IGNORE": + failureHandler = new IgnoringFailureHandler(); + break; + case "RETRY-REJECTED": + failureHandler = new RetryRejectedExecutionFailureHandler(); + break; + default: + try { + Class<?> failureHandlerClass = Class.forName(value, false, classLoader); + failureHandler = (ActionRequestFailureHandler) InstantiationUtil.instantiate(failureHandlerClass); + } catch (ClassNotFoundException e) { + throw new ValidationException("Could not instantiate the failure handler class: " + value, e); + } + break; + } + return failureHandler; + } + + public String getDocumentType() { + return config.get(ElasticsearchOptions.DOCUMENT_TYPE_OPTION); + } + + public Optional<Integer> getBulkFlushMaxActions() { + return config.getOptional(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION); + } + + public Optional<Integer> getBulkFlushMaxSize() { + return config.getOptional(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION).map(MemorySize::getMebiBytes); + } + + public Optional<Long> getBulkFlushInterval() { + return config.getOptional(BULK_FLUSH_INTERVAL_OPTION).map(Duration::toMillis); + } + + public boolean isBulkFlushBackoffEnabled() { + return config.get(BULK_FLUSH_BACKOFF_TYPE_OPTION) != ElasticsearchOptions.BackOffType.DISABLED; + } + + public Optional<ElasticsearchSinkBase.FlushBackoffType> getBulkFlushBackoffType() { + switch (config.get(BULK_FLUSH_BACKOFF_TYPE_OPTION)) { + case CONSTANT: + return Optional.of(ElasticsearchSinkBase.FlushBackoffType.CONSTANT); + case EXPONENTIAL: + return Optional.of(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL); + default: + return Optional.empty(); + } + } + + public Optional<Integer> getBulkFlushBackoffRetries() { + return config.getOptional(BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION); + } + + public Optional<Long> getBulkFlushBackoffDelay() { + return config.getOptional(BULK_FLUSH_BACKOFF_DELAY_OPTION).map(Duration::toMillis); + } + + public boolean isDisableFlushOnCheckpoint() { + return !config.get(ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION); + } + + public String getIndex() { + return config.get(ElasticsearchOptions.INDEX_OPTION); + } + + public String getKeyDelimiter() { + return config.get(ElasticsearchOptions.KEY_DELIMITER_OPTION); + } + + public Optional<String> getPathPrefix() { + return config.getOptional(ElasticsearchOptions.CONNECTION_PATH_PREFIX); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ElasticsearchConfiguration that = (ElasticsearchConfiguration) o; + return Objects.equals(config, that.config) && + Objects.equals(classLoader, that.classLoader); + } + + @Override + public int hashCode() { + return Objects.hash(config, classLoader); + } +} diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java new file mode 100644 index 0000000..c68ca68 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.description.Description; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; + +import java.time.Duration; +import java.util.List; + +import static org.apache.flink.configuration.description.TextElement.text; + +/** + * Options for {@link org.apache.flink.table.factories.DynamicTableSinkFactory} for Elasticsearch. + */ +public class ElasticsearchOptions { + /** + * Backoff strategy. Extends {@link ElasticsearchSinkBase.FlushBackoffType} with + * {@code DISABLED} option. + */ + public enum BackOffType { + DISABLED, + CONSTANT, + EXPONENTIAL + } + + public static final ConfigOption<List<String>> HOSTS_OPTION = + ConfigOptions.key("hosts") + .stringType() + .asList() + .noDefaultValue() + .withDescription("Elasticseatch hosts to connect to."); + public static final ConfigOption<String> INDEX_OPTION = + ConfigOptions.key("index") + .stringType() + .noDefaultValue() + .withDescription("Elasticsearch index for every record."); + public static final ConfigOption<String> DOCUMENT_TYPE_OPTION = + ConfigOptions.key("document-type") + .stringType() + .noDefaultValue() + .withDescription("Elasticsearch document type."); + public static final ConfigOption<String> KEY_DELIMITER_OPTION = + ConfigOptions.key("document-id.key-delimiter") + .stringType() + .defaultValue("_") + .withDescription("Delimiter for composite keys e.g., \"$\" would result in IDs \"KEY1$KEY2$KEY3\"."); + public static final ConfigOption<String> FAILURE_HANDLER_OPTION = + ConfigOptions.key("failure-handler") + .stringType() + .defaultValue("fail") + .withDescription(Description.builder() + .text("Failure handling strategy in case a request to Elasticsearch fails") + .list( + text("\"fail\" (throws an exception if a request fails and thus causes a job failure),"), + text("\"ignore\" (ignores failures and drops the request),"), + text("\"retry_rejected\" (re-adds requests that have failed due to queue capacity saturation),"), + text("\"class name\" for failure handling with a ActionRequestFailureHandler subclass")) + .build()); + public static final ConfigOption<Boolean> FLUSH_ON_CHECKPOINT_OPTION = + ConfigOptions.key("sink.flush-on-checkpoint") + .booleanType() + .defaultValue(true) + .withDescription("Disables flushing on checkpoint"); + public static final ConfigOption<Integer> BULK_FLUSH_MAX_ACTIONS_OPTION = + ConfigOptions.key("sink.bulk-flush.max-actions") + .intType() + .noDefaultValue() + .withDescription("Maximum number of actions to buffer for each bulk request."); + public static final ConfigOption<MemorySize> BULK_FLASH_MAX_SIZE_OPTION = + ConfigOptions.key("sink.bulk-flush.max-size") + .memoryType() + .noDefaultValue() + .withDescription("Maximum size of buffered actions per bulk request"); + public static final ConfigOption<Duration> BULK_FLUSH_INTERVAL_OPTION = + ConfigOptions.key("sink.bulk-flush.interval") + .durationType() + .noDefaultValue() + .withDescription("Bulk flush interval"); + public static final ConfigOption<BackOffType> BULK_FLUSH_BACKOFF_TYPE_OPTION = + ConfigOptions.key("sink.bulk-flush.back-off.strategy") + .enumType(BackOffType.class) + .defaultValue(BackOffType.DISABLED) + .withDescription("Backoff strategy"); + public static final ConfigOption<Integer> BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION = + ConfigOptions.key("sink.bulk-flush.back-off.max-retries") + .intType() + .noDefaultValue() + .withDescription("Maximum number of retries."); + public static final ConfigOption<Duration> BULK_FLUSH_BACKOFF_DELAY_OPTION = + ConfigOptions.key("sink.bulk-flush.back-off.delay") + .durationType() + .noDefaultValue() + .withDescription("Delay between each backoff attempt."); + public static final ConfigOption<Duration> CONNECTION_MAX_RETRY_TIMEOUT_OPTION = + ConfigOptions.key("connection.max-retry-timeout") + .durationType() + .noDefaultValue() + .withDescription("Maximum timeout between retries."); + public static final ConfigOption<String> CONNECTION_PATH_PREFIX = + ConfigOptions.key("connection.path-prefix") + .stringType() + .noDefaultValue() + .withDescription("Prefix string to be added to every REST communication."); + public static final ConfigOption<String> FORMAT_OPTION = + ConfigOptions.key("format") + .stringType() + .defaultValue("json") + .withDescription("Elasticsearch connector requires to specify a format.\n" + + "The format must produce a valid json document. \n" + + "By default uses built-in 'json' format. Please refer to Table Formats section for more details."); + + private ElasticsearchOptions() { + } +} diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchValidationUtils.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchValidationUtils.java new file mode 100644 index 0000000..b5caa76 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchValidationUtils.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot; + +/** + * Utility methods for validating Elasticsearch properties. + */ +@Internal +class ElasticsearchValidationUtils { + + private static final Set<LogicalTypeRoot> ILLEGAL_PRIMARY_KEY_TYPES = new LinkedHashSet<>(); + + static { + ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.ARRAY); + ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.MAP); + ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.MULTISET); + ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.STRUCTURED_TYPE); + ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.ROW); + ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.RAW); + ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.BINARY); + ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.VARBINARY); + } + + /** + * Checks that the table does not have primary key defined on illegal types. + * In Elasticsearch the primary key is used to calculate the Elasticsearch document id, + * which is a string of up to 512 bytes. It cannot have whitespaces. As of now it is calculated + * by concatenating the fields. Certain types do not have a good string representation to be used + * in this scenario. The illegal types are mostly {@link LogicalTypeFamily#COLLECTION} types and + * {@link LogicalTypeRoot#RAW} type. + */ + public static void validatePrimaryKey(TableSchema schema) { + schema.getPrimaryKey().ifPresent( + key -> { + List<LogicalTypeRoot> illegalTypes = key.getColumns() + .stream() + .map(fieldName -> { + LogicalType logicalType = schema.getFieldDataType(fieldName).get().getLogicalType(); + if (hasRoot(logicalType, LogicalTypeRoot.DISTINCT_TYPE)) { + return ((DistinctType) logicalType).getSourceType().getTypeRoot(); + } else { + return logicalType.getTypeRoot(); + } + }) + .filter(ILLEGAL_PRIMARY_KEY_TYPES::contains) + .collect(Collectors.toList()); + + if (!illegalTypes.isEmpty()) { + throw new ValidationException( + String.format( + "The table has a primary key on columns of illegal types: %s.\n" + + " Elasticsearch sink does not support primary keys on columns of types: %s.", + illegalTypes, + ILLEGAL_PRIMARY_KEY_TYPES)); + } + } + ); + } + + private ElasticsearchValidationUtils() { + } +} diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGenerator.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGenerator.java new file mode 100644 index 0000000..f5faf84 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGenerator.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.Row; + +import java.io.Serializable; + +/** + * This interface is responsible to generate index name from given {@link Row} record. + */ +@Internal +interface IndexGenerator extends Serializable { + + /** + * Initialize the index generator, this will be called only once before {@link #generate(RowData)} is called. + */ + default void open() {} + + /** + * Generate index name according the the given row. + */ + String generate(RowData row); +} diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorBase.java new file mode 100644 index 0000000..5df3efd --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorBase.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.annotation.Internal; + +import java.util.Objects; + +/** + * Base class for {@link IndexGenerator}. + */ +@Internal +public abstract class IndexGeneratorBase implements IndexGenerator { + + private static final long serialVersionUID = 1L; + protected final String index; + + public IndexGeneratorBase(String index) { + this.index = index; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof IndexGeneratorBase)) { + return false; + } + IndexGeneratorBase that = (IndexGeneratorBase) o; + return index.equals(that.index); + } + + @Override + public int hashCode() { + return Objects.hash(index); + } +} diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java new file mode 100644 index 0000000..e60be72 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import javax.annotation.Nonnull; + +import java.io.Serializable; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Factory of {@link IndexGenerator}. + * + * <p>Flink supports both static index and dynamic index. + * + * <p>If you want to have a static index, this option value should be a plain string, e.g. 'myusers', + * all the records will be consistently written into "myusers" index. + * + * <p>If you want to have a dynamic index, you can use '{field_name}' to reference a field value in the + * record to dynamically generate a target index. You can also use '{field_name|date_format_string}' to + * convert a field value of TIMESTAMP/DATE/TIME type into the format specified by date_format_string. The + * date_format_string is compatible with {@link java.text.SimpleDateFormat}. For example, if the option + * value is 'myusers_{log_ts|yyyy-MM-dd}', then a record with log_ts field value 2020-03-27 12:25:55 will + * be written into "myusers-2020-03-27" index. + */ +@Internal +final class IndexGeneratorFactory { + + private IndexGeneratorFactory() {} + + public static IndexGenerator createIndexGenerator(String index, TableSchema schema) { + final IndexHelper indexHelper = new IndexHelper(); + if (indexHelper.checkIsDynamicIndex(index)) { + return createRuntimeIndexGenerator(index, schema.getFieldNames(), schema.getFieldDataTypes(), indexHelper); + } else { + return new StaticIndexGenerator(index); + } + } + + interface DynamicFormatter extends Serializable { + String format(@Nonnull Object fieldValue, DateTimeFormatter formatter); + } + + private static IndexGenerator createRuntimeIndexGenerator( + String index, + String[] fieldNames, + DataType[] fieldTypes, + IndexHelper indexHelper) { + final String dynamicIndexPatternStr = indexHelper.extractDynamicIndexPatternStr(index); + final String indexPrefix = index.substring(0, index.indexOf(dynamicIndexPatternStr)); + final String indexSuffix = index.substring(indexPrefix.length() + dynamicIndexPatternStr.length()); + + final boolean isDynamicIndexWithFormat = indexHelper.checkIsDynamicIndexWithFormat(index); + final int indexFieldPos = indexHelper.extractIndexFieldPos(index, fieldNames, isDynamicIndexWithFormat); + final LogicalType indexFieldType = fieldTypes[indexFieldPos].getLogicalType(); + final LogicalTypeRoot indexFieldLogicalTypeRoot = indexFieldType.getTypeRoot(); + + // validate index field type + indexHelper.validateIndexFieldType(indexFieldLogicalTypeRoot); + + // time extract dynamic index pattern + final RowData.FieldGetter fieldGetter = RowData.createFieldGetter(indexFieldType, indexFieldPos); + + if (isDynamicIndexWithFormat) { + final String dateTimeFormat = indexHelper.extractDateFormat(index, indexFieldLogicalTypeRoot); + DynamicFormatter formatFunction = createFormatFunction( + indexFieldType, + indexFieldLogicalTypeRoot); + + return new AbstractTimeIndexGenerator(index, dateTimeFormat) { + @Override + public String generate(RowData row) { + Object fieldOrNull = fieldGetter.getFieldOrNull(row); + final String formattedField; + // TODO we can possibly optimize it to use the nullability of the field + if (fieldOrNull != null) { + formattedField = formatFunction.format(fieldOrNull, dateTimeFormatter); + } else { + formattedField = "null"; + } + return indexPrefix.concat(formattedField).concat(indexSuffix); + } + }; + } + // general dynamic index pattern + return new IndexGeneratorBase(index) { + @Override + public String generate(RowData row) { + Object indexField = fieldGetter.getFieldOrNull(row); + return indexPrefix.concat(indexField == null ? "null" : indexField.toString()).concat(indexSuffix); + } + }; + } + + private static DynamicFormatter createFormatFunction( + LogicalType indexFieldType, + LogicalTypeRoot indexFieldLogicalTypeRoot) { + switch (indexFieldLogicalTypeRoot) { + case DATE: + return (value, dateTimeFormatter) -> { + Integer indexField = (Integer) value; + return LocalDate.ofEpochDay(indexField).format(dateTimeFormatter); + }; + case TIME_WITHOUT_TIME_ZONE: + return (value, dateTimeFormatter) -> { + Integer indexField = (Integer) value; + return LocalTime.ofNanoOfDay(indexField * 1_000_000L) + .format(dateTimeFormatter); + }; + case TIMESTAMP_WITHOUT_TIME_ZONE: + return (value, dateTimeFormatter) -> { + TimestampData indexField = (TimestampData) value; + return indexField.toLocalDateTime().format(dateTimeFormatter); + }; + case TIMESTAMP_WITH_TIME_ZONE: + throw new UnsupportedOperationException("TIMESTAMP_WITH_TIME_ZONE is not supported yet"); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return (value, dateTimeFormatter) -> { + TimestampData indexField = (TimestampData) value; + return indexField.toInstant().atZone(ZoneOffset.UTC).format(dateTimeFormatter); + }; + default: + throw new TableException(String.format( + "Unsupported type '%s' found in Elasticsearch dynamic index field, " + + "time-related pattern only support types are: DATE,TIME,TIMESTAMP.", + indexFieldType)); + } + } + + /** + * Helper class for {@link IndexGeneratorFactory}, this helper can use to validate index field type + * ans parse index format from pattern. + */ + private static class IndexHelper { + private static final Pattern dynamicIndexPattern = Pattern.compile("\\{[^\\{\\}]+\\}?"); + private static final Pattern dynamicIndexTimeExtractPattern = Pattern.compile(".*\\{.+\\|.*\\}.*"); + private static final List<LogicalTypeRoot> supportedTypes = new ArrayList<>(); + private static final Map<LogicalTypeRoot, String> defaultFormats = new HashMap<>(); + + static { + //time related types + supportedTypes.add(LogicalTypeRoot.DATE); + supportedTypes.add(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE); + supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE); + supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE); + supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE); + //general types + supportedTypes.add(LogicalTypeRoot.VARCHAR); + supportedTypes.add(LogicalTypeRoot.CHAR); + supportedTypes.add(LogicalTypeRoot.TINYINT); + supportedTypes.add(LogicalTypeRoot.INTEGER); + supportedTypes.add(LogicalTypeRoot.BIGINT); + } + + static { + + defaultFormats.put(LogicalTypeRoot.DATE, "yyyy_MM_dd"); + defaultFormats.put(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, "HH_mm_ss"); + defaultFormats.put(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, "yyyy_MM_dd_HH_mm_ss"); + defaultFormats.put(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, "yyyy_MM_dd_HH_mm_ss"); + defaultFormats.put(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, "yyyy_MM_dd_HH_mm_ssX"); + } + + /** + * Validate the index field Type. + */ + void validateIndexFieldType(LogicalTypeRoot logicalType) { + if (!supportedTypes.contains(logicalType)) { + throw new IllegalArgumentException(String.format("Unsupported type %s of index field, " + + "Supported types are: %s", logicalType, supportedTypes)); + } + } + + /** + * Get the default date format. + */ + String getDefaultFormat(LogicalTypeRoot logicalType) { + return defaultFormats.get(logicalType); + } + + /** + * Check general dynamic index is enabled or not by index pattern. + */ + boolean checkIsDynamicIndex(String index) { + final Matcher matcher = dynamicIndexPattern.matcher(index); + int count = 0; + while (matcher.find()) { + count++; + } + if (count > 1) { + throw new TableException(String.format("Chaining dynamic index pattern %s is not supported," + + " only support single dynamic index pattern.", index)); + } + return count == 1; + } + + /** + * Check time extract dynamic index is enabled or not by index pattern. + */ + boolean checkIsDynamicIndexWithFormat(String index) { + return dynamicIndexTimeExtractPattern.matcher(index).matches(); + } + + /** + * Extract dynamic index pattern string from index pattern string. + */ + String extractDynamicIndexPatternStr(String index) { + int start = index.indexOf("{"); + int end = index.lastIndexOf("}"); + return index.substring(start, end + 1); + } + + /** + * Extract index field position in a fieldNames, return the field position. + */ + int extractIndexFieldPos(String index, String[] fieldNames, boolean isDynamicIndexWithFormat) { + List<String> fieldList = Arrays.asList(fieldNames); + String indexFieldName; + if (isDynamicIndexWithFormat) { + indexFieldName = index.substring(index.indexOf("{") + 1, index.indexOf("|")); + } else { + indexFieldName = index.substring(index.indexOf("{") + 1, index.indexOf("}")); + } + if (!fieldList.contains(indexFieldName)) { + throw new TableException(String.format("Unknown field '%s' in index pattern '%s', please check the field name.", + indexFieldName, index)); + } + return fieldList.indexOf(indexFieldName); + } + + /** + * Extract dateTime format by the date format that extracted from index pattern string. + */ + private String extractDateFormat(String index, LogicalTypeRoot logicalType) { + String format = index.substring(index.indexOf("|") + 1, index.indexOf("}")); + if ("".equals(format)) { + format = getDefaultFormat(logicalType); + } + return format; + } + } +} diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java new file mode 100644 index 0000000..db28ff1 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.LogicalType; + +import java.io.Serializable; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.Period; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +/** + * An extractor for a Elasticsearch key from a {@link RowData}. + */ +@Internal +class KeyExtractor implements Function<RowData, String>, Serializable { + private final FieldFormatter[] fieldFormatters; + private final String keyDelimiter; + + private interface FieldFormatter extends Serializable { + String format(RowData rowData); + } + + private KeyExtractor( + FieldFormatter[] fieldFormatters, + String keyDelimiter) { + this.fieldFormatters = fieldFormatters; + this.keyDelimiter = keyDelimiter; + } + + @Override + public String apply(RowData rowData) { + final StringBuilder builder = new StringBuilder(); + for (int i = 0; i < fieldFormatters.length; i++) { + if (i > 0) { + builder.append(keyDelimiter); + } + final String value = fieldFormatters[i].format(rowData); + builder.append(value); + } + return builder.toString(); + } + + private static class ColumnWithIndex { + public TableColumn column; + public int index; + + public ColumnWithIndex(TableColumn column, int index) { + this.column = column; + this.index = index; + } + + public LogicalType getType() { + return column.getType().getLogicalType(); + } + + public int getIndex() { + return index; + } + } + + public static Function<RowData, String> createKeyExtractor( + TableSchema schema, + String keyDelimiter) { + return schema.getPrimaryKey().map(key -> { + Map<String, ColumnWithIndex> namesToColumns = new HashMap<>(); + List<TableColumn> tableColumns = schema.getTableColumns(); + for (int i = 0; i < schema.getFieldCount(); i++) { + TableColumn column = tableColumns.get(i); + namesToColumns.put(column.getName(), new ColumnWithIndex(column, i)); + } + + FieldFormatter[] fieldFormatters = key.getColumns() + .stream() + .map(namesToColumns::get) + .map(column -> toFormatter(column.index, column.getType())) + .toArray(FieldFormatter[]::new); + + return (Function<RowData, String>) new KeyExtractor( + fieldFormatters, + keyDelimiter + ); + }).orElseGet(() -> (Function<RowData, String> & Serializable) (row) -> null); + } + + private static FieldFormatter toFormatter(int index, LogicalType type) { + switch (type.getTypeRoot()) { + case DATE: + return (row) -> LocalDate.ofEpochDay(row.getInt(index)).toString(); + case TIME_WITHOUT_TIME_ZONE: + return (row) -> LocalTime.ofNanoOfDay((long) row.getInt(index) * 1_000_000L).toString(); + case INTERVAL_YEAR_MONTH: + return (row) -> Period.ofDays(row.getInt(index)).toString(); + case INTERVAL_DAY_TIME: + return (row) -> Duration.ofMillis(row.getLong(index)).toString(); + case DISTINCT_TYPE: + return toFormatter(index, ((DistinctType) type).getSourceType()); + default: + RowData.FieldGetter fieldGetter = RowData.createFieldGetter( + type, + index); + return (row) -> fieldGetter.getFieldOrNull(row).toString(); + } + } +} diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RequestFactory.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RequestFactory.java new file mode 100644 index 0000000..35d69eb --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RequestFactory.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.xcontent.XContentType; + +import java.io.Serializable; + +/** + * For version-agnostic creating of {@link ActionRequest}s. + */ +@Internal +interface RequestFactory extends Serializable { + /** + * Creates an update request to be added to a {@link RequestIndexer}. + * Note: the type field has been deprecated since Elasticsearch 7.x and it would not take any effort. + */ + UpdateRequest createUpdateRequest( + String index, + String docType, + String key, + XContentType contentType, + byte[] document); + + /** + * Creates an index request to be added to a {@link RequestIndexer}. + * Note: the type field has been deprecated since Elasticsearch 7.x and it would not take any effort. + */ + IndexRequest createIndexRequest( + String index, + String docType, + String key, + XContentType contentType, + byte[] document); + + /** + * Creates a delete request to be added to a {@link RequestIndexer}. + * Note: the type field has been deprecated since Elasticsearch 7.x and it would not take any effort. + */ + DeleteRequest createDeleteRequest( + String index, + String docType, + String key); +} diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java new file mode 100644 index 0000000..4eaba48 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Preconditions; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.xcontent.XContentType; + +import javax.annotation.Nullable; + +import java.util.Objects; +import java.util.function.Function; + +/** + * Sink function for converting upserts into Elasticsearch {@link ActionRequest}s. + */ +@Internal +class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<RowData> { + + private static final long serialVersionUID = 1L; + + private final IndexGenerator indexGenerator; + private final String docType; + private final SerializationSchema<RowData> serializationSchema; + private final XContentType contentType; + private final RequestFactory requestFactory; + private final Function<RowData, String> createKey; + + public RowElasticsearchSinkFunction( + IndexGenerator indexGenerator, + @Nullable String docType, // this is deprecated in es 7+ + SerializationSchema<RowData> serializationSchema, + XContentType contentType, + RequestFactory requestFactory, + Function<RowData, String> createKey) { + this.indexGenerator = Preconditions.checkNotNull(indexGenerator); + this.docType = docType; + this.serializationSchema = Preconditions.checkNotNull(serializationSchema); + this.contentType = Preconditions.checkNotNull(contentType); + this.requestFactory = Preconditions.checkNotNull(requestFactory); + this.createKey = Preconditions.checkNotNull(createKey); + } + + @Override + public void process( + RowData element, + RuntimeContext ctx, + RequestIndexer indexer) { + switch (element.getRowKind()) { + case INSERT: + case UPDATE_AFTER: + processUpsert(element, indexer); + break; + case UPDATE_BEFORE: + case DELETE: + processDelete(element, indexer); + break; + default: + throw new TableException("Unsupported message kind: " + element.getRowKind()); + } + } + + private void processUpsert(RowData row, RequestIndexer indexer) { + final byte[] document = serializationSchema.serialize(row); + final String key = createKey.apply(row); + if (key != null) { + final UpdateRequest updateRequest = requestFactory.createUpdateRequest( + indexGenerator.generate(row), + docType, + key, + contentType, + document); + indexer.add(updateRequest); + } else { + final IndexRequest indexRequest = requestFactory.createIndexRequest( + indexGenerator.generate(row), + docType, + key, + contentType, + document); + indexer.add(indexRequest); + } + } + + private void processDelete(RowData row, RequestIndexer indexer) { + final String key = createKey.apply(row); + final DeleteRequest deleteRequest = requestFactory.createDeleteRequest( + indexGenerator.generate(row), + docType, + key); + indexer.add(deleteRequest); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RowElasticsearchSinkFunction that = (RowElasticsearchSinkFunction) o; + return Objects.equals(indexGenerator, that.indexGenerator) && + Objects.equals(docType, that.docType) && + Objects.equals(serializationSchema, that.serializationSchema) && + contentType == that.contentType && + Objects.equals(requestFactory, that.requestFactory) && + Objects.equals(createKey, that.createKey); + } + + @Override + public int hashCode() { + return Objects.hash(indexGenerator, docType, serializationSchema, contentType, requestFactory, createKey); + } +} diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/StaticIndexGenerator.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/StaticIndexGenerator.java new file mode 100644 index 0000000..196b64c --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/StaticIndexGenerator.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.RowData; + +/** + * A static {@link IndexGenerator} which generate fixed index name. + */ +@Internal +final class StaticIndexGenerator extends IndexGeneratorBase { + + public StaticIndexGenerator(String index) { + super(index); + } + + public String generate(RowData row) { + return index; + } +} diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java index 8b56896..35bdca7 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java @@ -21,17 +21,12 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.elasticsearch.testutils.ElasticsearchResource; import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit; import org.apache.flink.test.util.AbstractTestBase; -import org.apache.flink.util.InstantiationUtil; import org.elasticsearch.client.Client; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.rules.TemporaryFolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashMap; @@ -49,41 +44,10 @@ import static org.junit.Assert.fail; */ public abstract class ElasticsearchSinkTestBase<C extends AutoCloseable, A> extends AbstractTestBase { - private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkTestBase.class); - protected static final String CLUSTER_NAME = "test-cluster"; - protected static EmbeddedElasticsearchNodeEnvironment embeddedNodeEnv; - @ClassRule - public static TemporaryFolder tempFolder = new TemporaryFolder(); - - @BeforeClass - public static void prepare() throws Exception { - - LOG.info("-------------------------------------------------------------------------"); - LOG.info(" Starting embedded Elasticsearch node "); - LOG.info("-------------------------------------------------------------------------"); - - // dynamically load version-specific implementation of the Elasticsearch embedded node environment - Class<?> clazz = Class.forName( - "org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl"); - embeddedNodeEnv = (EmbeddedElasticsearchNodeEnvironment) InstantiationUtil.instantiate(clazz); - - embeddedNodeEnv.start(tempFolder.newFolder(), CLUSTER_NAME); - - } - - @AfterClass - public static void shutdown() throws Exception { - - LOG.info("-------------------------------------------------------------------------"); - LOG.info(" Shutting down embedded Elasticsearch node "); - LOG.info("-------------------------------------------------------------------------"); - - embeddedNodeEnv.close(); - - } + public static ElasticsearchResource elasticsearchResource = new ElasticsearchResource(CLUSTER_NAME); /** * Tests that the Elasticsearch sink works properly with json. @@ -128,7 +92,7 @@ public abstract class ElasticsearchSinkTestBase<C extends AutoCloseable, A> exte env.execute("Elasticsearch Sink Test"); // verify the results - Client client = embeddedNodeEnv.getClient(); + Client client = elasticsearchResource.getClient(); SourceSinkDataTestKit.verifyProducedSinkData(client, index); client.close(); diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java new file mode 100644 index 0000000..2f0eedd --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.temporal.UnsupportedTemporalTypeException; +import java.util.ArrayList; +import java.util.List; + +/** + * Tests for {@link IndexGeneratorFactory}. + */ +public class IndexGeneratorFactoryTest extends TestLogger { + + private TableSchema schema; + private List<RowData> rows; + + @Before + public void prepareData() { + schema = new TableSchema.Builder() + .field("id", DataTypes.INT()) + .field("item", DataTypes.STRING()) + .field("log_ts", DataTypes.BIGINT()) + .field("log_date", DataTypes.DATE()) + .field("log_time", DataTypes.TIME()) + .field("order_timestamp", DataTypes.TIMESTAMP()) + .field("local_timestamp", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()) + .field("status", DataTypes.BOOLEAN()) + .build(); + + rows = new ArrayList<>(); + rows.add(GenericRowData.of( + 1, + StringData.fromString("apple"), + Timestamp.valueOf("2020-03-18 12:12:14").getTime(), + (int) LocalDate.parse("2020-03-18").toEpochDay(), + (int) (LocalTime.parse("12:12:14").toNanoOfDay() / 1_000_000L), + TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-03-18T12:12:14")), + TimestampData.fromInstant(Instant.parse("2020-03-18T12:12:14Z")), + true)); + rows.add(GenericRowData.of( + 2, + StringData.fromString("peanut"), + Timestamp.valueOf("2020-03-19 12:12:14").getTime(), + (int) LocalDate.parse("2020-03-19").toEpochDay(), + (int) (LocalTime.parse("12:22:21").toNanoOfDay() / 1_000_000L), + TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-03-19T12:22:14")), + TimestampData.fromInstant(Instant.parse("2020-03-19T12:12:14Z")), + false)); + } + + @Test + public void testDynamicIndexFromTimestamp() { + IndexGenerator indexGenerator = IndexGeneratorFactory + .createIndexGenerator( + "{order_timestamp|yyyy_MM_dd_HH-ss}_index", schema); + indexGenerator.open(); + Assert.assertEquals("2020_03_18_12-14_index", indexGenerator.generate(rows.get(0))); + IndexGenerator indexGenerator1 = IndexGeneratorFactory + .createIndexGenerator( + "{order_timestamp|yyyy_MM_dd_HH_mm}_index", schema); + indexGenerator1.open(); + Assert.assertEquals("2020_03_19_12_22_index", indexGenerator1.generate(rows.get(1))); + } + + @Test + public void testDynamicIndexFromDate() { + IndexGenerator indexGenerator = IndexGeneratorFactory + .createIndexGenerator( + "my-index-{log_date|yyyy/MM/dd}", schema); + indexGenerator.open(); + Assert.assertEquals("my-index-2020/03/18", indexGenerator.generate(rows.get(0))); + Assert.assertEquals("my-index-2020/03/19", indexGenerator.generate(rows.get(1))); + } + + @Test + public void testDynamicIndexFromTime() { + IndexGenerator indexGenerator = IndexGeneratorFactory + .createIndexGenerator( + "my-index-{log_time|HH-mm}", schema); + indexGenerator.open(); + Assert.assertEquals("my-index-12-12", indexGenerator.generate(rows.get(0))); + Assert.assertEquals("my-index-12-22", indexGenerator.generate(rows.get(1))); + } + + @Test + public void testDynamicIndexDefaultFormat() { + IndexGenerator indexGenerator = IndexGeneratorFactory + .createIndexGenerator( + "my-index-{log_time|}", schema); + indexGenerator.open(); + Assert.assertEquals("my-index-12_12_14", indexGenerator.generate(rows.get(0))); + Assert.assertEquals("my-index-12_22_21", indexGenerator.generate(rows.get(1))); + } + + @Test + public void testDynamicIndexDefaultFormatTimestampWithLocalTimeZone() { + IndexGenerator indexGenerator = IndexGeneratorFactory + .createIndexGenerator( + "my-index-{local_timestamp|}", schema); + indexGenerator.open(); + Assert.assertEquals("my-index-2020_03_18_12_12_14Z", indexGenerator.generate(rows.get(0))); + Assert.assertEquals("my-index-2020_03_19_12_12_14Z", indexGenerator.generate(rows.get(1))); + } + + @Test + public void testGeneralDynamicIndex() { + IndexGenerator indexGenerator = IndexGeneratorFactory + .createIndexGenerator( + "index_{item}", schema); + indexGenerator.open(); + Assert.assertEquals("index_apple", indexGenerator.generate(rows.get(0))); + Assert.assertEquals("index_peanut", indexGenerator.generate(rows.get(1))); + } + + @Test + public void testStaticIndex() { + IndexGenerator indexGenerator = IndexGeneratorFactory.createIndexGenerator( + "my-index", schema); + indexGenerator.open(); + Assert.assertEquals("my-index", indexGenerator.generate(rows.get(0))); + Assert.assertEquals("my-index", indexGenerator.generate(rows.get(1))); + } + + @Test + public void testUnknownField() { + String expectedExceptionMsg = "Unknown field 'unknown_ts' in index pattern 'my-index-{unknown_ts|yyyy-MM-dd}'," + + " please check the field name."; + try { + IndexGeneratorFactory.createIndexGenerator("my-index-{unknown_ts|yyyy-MM-dd}", schema); + } catch (TableException e) { + Assert.assertEquals(e.getMessage(), expectedExceptionMsg); + } + } + + @Test + public void testUnsupportedTimeType() { + String expectedExceptionMsg = "Unsupported type 'INT' found in Elasticsearch dynamic index field, " + + "time-related pattern only support types are: DATE,TIME,TIMESTAMP."; + try { + IndexGeneratorFactory.createIndexGenerator("my-index-{id|yyyy-MM-dd}", schema); + } catch (TableException e) { + Assert.assertEquals(expectedExceptionMsg, e.getMessage()); + } + } + + @Test + public void testUnsupportedMultiParametersType() { + String expectedExceptionMsg = "Chaining dynamic index pattern my-index-{local_date}-{local_time} is not supported," + + " only support single dynamic index pattern."; + try { + IndexGeneratorFactory.createIndexGenerator("my-index-{local_date}-{local_time}", schema); + } catch (TableException e) { + Assert.assertEquals(expectedExceptionMsg, e.getMessage()); + } + } + + @Test + public void testDynamicIndexUnsupportedFormat() { + String expectedExceptionMsg = "Unsupported field: HourOfDay"; + try { + IndexGeneratorFactory.createIndexGenerator( + "my-index-{log_date|yyyy/MM/dd HH:mm}", schema); + } catch (UnsupportedTemporalTypeException e) { + Assert.assertEquals(expectedExceptionMsg, e.getMessage()); + } + } + + @Test + public void testUnsupportedIndexFieldType() { + String expectedExceptionMsg = "Unsupported type BOOLEAN of index field, Supported types are:" + + " [DATE, TIME_WITHOUT_TIME_ZONE, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_TIME_ZONE," + + " TIMESTAMP_WITH_LOCAL_TIME_ZONE, VARCHAR, CHAR, TINYINT, INTEGER, BIGINT]"; + try { + IndexGeneratorFactory.createIndexGenerator("index_{status}", schema); + } catch (IllegalArgumentException e) { + Assert.assertEquals(expectedExceptionMsg, e.getMessage()); + } + } +} diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractorTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractorTest.java new file mode 100644 index 0000000..96b6e3e --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractorTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; + +import org.junit.Test; + +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.function.Function; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link KeyExtractor}. + */ +public class KeyExtractorTest { + @Test + public void testSimpleKey() { + TableSchema schema = TableSchema.builder() + .field("a", DataTypes.BIGINT().notNull()) + .field("b", DataTypes.STRING()) + .primaryKey("a") + .build(); + + Function<RowData, String> keyExtractor = KeyExtractor.createKeyExtractor(schema, "_"); + + String key = keyExtractor.apply(GenericRowData.of(12L, StringData.fromString("ABCD"))); + assertThat(key, equalTo("12")); + } + + @Test + public void testNoPrimaryKey() { + TableSchema schema = TableSchema.builder() + .field("a", DataTypes.BIGINT().notNull()) + .field("b", DataTypes.STRING()) + .build(); + + Function<RowData, String> keyExtractor = KeyExtractor.createKeyExtractor(schema, "_"); + + String key = keyExtractor.apply(GenericRowData.of(12L, StringData.fromString("ABCD"))); + assertThat(key, nullValue()); + } + + @Test + public void testTwoFieldsKey() { + TableSchema schema = TableSchema.builder() + .field("a", DataTypes.BIGINT().notNull()) + .field("b", DataTypes.STRING()) + .field("c", DataTypes.TIMESTAMP().notNull()) + .primaryKey("a", "c") + .build(); + + Function<RowData, String> keyExtractor = KeyExtractor.createKeyExtractor(schema, "_"); + + String key = keyExtractor.apply( + GenericRowData.of( + 12L, + StringData.fromString("ABCD"), + TimestampData.fromLocalDateTime(LocalDateTime.parse("2012-12-12T12:12:12")) + )); + assertThat(key, equalTo("12_2012-12-12T12:12:12")); + } + + @Test + public void testAllTypesKey() { + TableSchema schema = TableSchema.builder() + .field("a", DataTypes.TINYINT().notNull()) + .field("b", DataTypes.SMALLINT().notNull()) + .field("c", DataTypes.INT().notNull()) + .field("d", DataTypes.BIGINT().notNull()) + .field("e", DataTypes.BOOLEAN().notNull()) + .field("f", DataTypes.FLOAT().notNull()) + .field("g", DataTypes.DOUBLE().notNull()) + .field("h", DataTypes.STRING().notNull()) + .field("i", DataTypes.TIMESTAMP().notNull()) + .field("j", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE().notNull()) + .field("k", DataTypes.TIME().notNull()) + .field("l", DataTypes.DATE().notNull()) + .primaryKey("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l") + .build(); + + Function<RowData, String> keyExtractor = KeyExtractor.createKeyExtractor(schema, "_"); + + String key = keyExtractor.apply( + GenericRowData.of( + (byte) 1, + (short) 2, + 3, + (long) 4, + true, + 1.0f, + 2.0d, + StringData.fromString("ABCD"), + TimestampData.fromLocalDateTime(LocalDateTime.parse("2012-12-12T12:12:12")), + TimestampData.fromInstant(Instant.parse("2013-01-13T13:13:13Z")), + (int) (LocalTime.parse("14:14:14").toNanoOfDay() / 1_000_000), + (int) LocalDate.parse("2015-05-15").toEpochDay() + )); + assertThat( + key, + equalTo("1_2_3_4_true_1.0_2.0_ABCD_2012-12-12T12:12:12_2013-01-13T13:13:13_14:14:14_2015-05-15")); + } +} diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/TestContext.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/TestContext.java new file mode 100644 index 0000000..e4978fc --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/TestContext.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.factories.DynamicTableFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * A utility class for mocking {@link DynamicTableFactory.Context}. + */ +class TestContext { + private TableSchema schema; + private Map<String, String> properties = new HashMap<>(); + + public static TestContext context() { + return new TestContext(); + } + + public TestContext withSchema(TableSchema schema) { + this.schema = schema; + return this; + } + + DynamicTableFactory.Context build() { + return new DynamicTableFactory.Context() { + @Override + public ObjectIdentifier getObjectIdentifier() { + return null; + } + + @Override + public CatalogTable getCatalogTable() { + return new CatalogTableImpl( + schema, + properties, + "" + ); + } + + @Override + public ReadableConfig getConfiguration() { + return null; + } + + @Override + public ClassLoader getClassLoader() { + return TestContext.class.getClassLoader(); + } + }; + } + + public TestContext withOption(String key, String value) { + properties.put(key, value); + return this; + } +} diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java new file mode 100644 index 0000000..6f185d3 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.testutils; + +import org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironment; +import org.apache.flink.util.InstantiationUtil; + +import org.elasticsearch.client.Client; +import org.junit.rules.ExternalResource; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A resource that starts an embedded elasticsearch cluster. + */ +public class ElasticsearchResource extends ExternalResource { + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchResource.class); + private EmbeddedElasticsearchNodeEnvironment embeddedNodeEnv; + private final TemporaryFolder tempFolder = new TemporaryFolder(); + + private final String clusterName; + + public ElasticsearchResource(String clusterName) { + this.clusterName = clusterName; + } + + @Override + protected void before() throws Throwable { + + LOG.info("-------------------------------------------------------------------------"); + LOG.info(" Starting embedded Elasticsearch node "); + LOG.info("-------------------------------------------------------------------------"); + + // dynamically load version-specific implementation of the Elasticsearch embedded node environment + Class<?> clazz = Class.forName( + "org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl"); + embeddedNodeEnv = (EmbeddedElasticsearchNodeEnvironment) InstantiationUtil.instantiate(clazz); + + tempFolder.create(); + embeddedNodeEnv.start(tempFolder.newFolder(), clusterName); + } + + @Override + protected void after() { + + LOG.info("-------------------------------------------------------------------------"); + LOG.info(" Shutting down embedded Elasticsearch node "); + LOG.info("-------------------------------------------------------------------------"); + + try { + embeddedNodeEnv.close(); + tempFolder.delete(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public Client getClient() { + return embeddedNodeEnv.getClient(); + } +} diff --git a/flink-connectors/flink-connector-elasticsearch7/pom.xml b/flink-connectors/flink-connector-elasticsearch7/pom.xml index 5f25f46..ef340b9 100644 --- a/flink-connectors/flink-connector-elasticsearch7/pom.xml +++ b/flink-connectors/flink-connector-elasticsearch7/pom.xml @@ -139,6 +139,14 @@ under the License. <scope>test</scope> </dependency> + <!-- Table API integration tests --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <!-- Elasticsearch table sink factory testing --> <dependency> <groupId>org.apache.flink</groupId> @@ -148,4 +156,18 @@ under the License. </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <!-- Enforce single fork execution because of spawning + Elasticsearch cluster multiple times --> + <forkCount>1</forkCount> + </configuration> + </plugin> + </plugins> + </build> </project> diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7Configuration.java b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7Configuration.java new file mode 100644 index 0000000..0aff575 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7Configuration.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.ValidationException; + +import org.apache.http.HttpHost; + +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION; + +/** + * Elasticsearch 7 specific configuration. + */ +@Internal +final class Elasticsearch7Configuration extends ElasticsearchConfiguration { + Elasticsearch7Configuration(ReadableConfig config, ClassLoader classLoader) { + super(config, classLoader); + } + + public List<HttpHost> getHosts() { + return config.get(HOSTS_OPTION).stream() + .map(Elasticsearch7Configuration::validateAndParseHostsString) + .collect(Collectors.toList()); + } + + private static HttpHost validateAndParseHostsString(String host) { + try { + HttpHost httpHost = HttpHost.create(host); + if (httpHost.getPort() < 0) { + throw new ValidationException(String.format( + "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing port.", + host, + HOSTS_OPTION.key())); + } + + if (httpHost.getSchemeName() == null) { + throw new ValidationException(String.format( + "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing scheme.", + host, + HOSTS_OPTION.key())); + } + return httpHost; + } catch (Exception e) { + throw new ValidationException(String.format( + "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'.", + host, + HOSTS_OPTION.key()), e); + } + } +} diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java new file mode 100644 index 0000000..4076b63 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; +import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.SinkFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.RowKind; + +import org.apache.http.HttpHost; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.common.xcontent.XContentType; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Objects; + +/** + * A {@link DynamicTableSink} that describes how to create a {@link ElasticsearchSink} from a logical + * description. + */ +@Internal +final class Elasticsearch7DynamicSink implements DynamicTableSink { + @VisibleForTesting + static final Elasticsearch7RequestFactory REQUEST_FACTORY = new Elasticsearch7DynamicSink.Elasticsearch7RequestFactory(); + + private final SinkFormat<SerializationSchema<RowData>> format; + private final TableSchema schema; + private final Elasticsearch7Configuration config; + + public Elasticsearch7DynamicSink( + SinkFormat<SerializationSchema<RowData>> format, + Elasticsearch7Configuration config, + TableSchema schema) { + this(format, config, schema, (ElasticsearchSink.Builder::new)); + } + + //-------------------------------------------------------------- + // Hack to make configuration testing possible. + // + // The code in this block should never be used outside of tests. + // Having a way to inject a builder we can assert the builder in + // the test. We can not assert everything though, e.g. it is not + // possible to assert flushing on checkpoint, as it is configured + // on the sink itself. + //-------------------------------------------------------------- + + private final ElasticSearchBuilderProvider builderProvider; + + @FunctionalInterface + interface ElasticSearchBuilderProvider { + ElasticsearchSink.Builder<RowData> createBuilder( + List<HttpHost> httpHosts, + RowElasticsearchSinkFunction upsertSinkFunction); + } + + Elasticsearch7DynamicSink( + SinkFormat<SerializationSchema<RowData>> format, + Elasticsearch7Configuration config, + TableSchema schema, + ElasticSearchBuilderProvider builderProvider) { + this.format = format; + this.schema = schema; + this.config = config; + this.builderProvider = builderProvider; + } + + //-------------------------------------------------------------- + // End of hack to make configuration testing possible + //-------------------------------------------------------------- + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + ChangelogMode.Builder builder = ChangelogMode.newBuilder(); + for (RowKind kind : requestedMode.getContainedKinds()) { + if (kind != RowKind.UPDATE_BEFORE) { + builder.addContainedKind(kind); + } + } + return builder.build(); + } + + @Override + public SinkFunctionProvider getSinkRuntimeProvider(Context context) { + return () -> { + SerializationSchema<RowData> format = this.format.createSinkFormat(context, schema.toRowDataType()); + + final RowElasticsearchSinkFunction upsertFunction = + new RowElasticsearchSinkFunction( + IndexGeneratorFactory.createIndexGenerator(config.getIndex(), schema), + null, // this is deprecated in es 7+ + format, + XContentType.JSON, + REQUEST_FACTORY, + KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()) + ); + + final ElasticsearchSink.Builder<RowData> builder = builderProvider.createBuilder( + config.getHosts(), + upsertFunction); + + builder.setFailureHandler(config.getFailureHandler()); + config.getBulkFlushMaxActions().ifPresent(builder::setBulkFlushMaxActions); + config.getBulkFlushMaxSize().ifPresent(builder::setBulkFlushMaxSizeMb); + config.getBulkFlushInterval().ifPresent(builder::setBulkFlushInterval); + builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled()); + config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType); + config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries); + config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay); + + config.getPathPrefix() + .ifPresent(pathPrefix -> builder.setRestClientFactory(new DefaultRestClientFactory(pathPrefix))); + + final ElasticsearchSink<RowData> sink = builder.build(); + + if (config.isDisableFlushOnCheckpoint()) { + sink.disableFlushOnCheckpoint(); + } + + return sink; + }; + } + + @Override + public DynamicTableSink copy() { + return this; + } + + @Override + public String asSummaryString() { + return "Elasticsearch7"; + } + + /** + * Serializable {@link RestClientFactory} used by the sink. + */ + @VisibleForTesting + static class DefaultRestClientFactory implements RestClientFactory { + + private final String pathPrefix; + + public DefaultRestClientFactory(@Nullable String pathPrefix) { + this.pathPrefix = pathPrefix; + } + + @Override + public void configureRestClientBuilder(RestClientBuilder restClientBuilder) { + if (pathPrefix != null) { + restClientBuilder.setPathPrefix(pathPrefix); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DefaultRestClientFactory that = (DefaultRestClientFactory) o; + return Objects.equals(pathPrefix, that.pathPrefix); + } + + @Override + public int hashCode() { + return Objects.hash(pathPrefix); + } + } + + /** + * Version-specific creation of {@link org.elasticsearch.action.ActionRequest}s used by the sink. + */ + private static class Elasticsearch7RequestFactory implements RequestFactory { + @Override + public UpdateRequest createUpdateRequest( + String index, + String docType, + String key, + XContentType contentType, + byte[] document) { + return new UpdateRequest(index, key) + .doc(document, contentType) + .upsert(document, contentType); + } + + @Override + public IndexRequest createIndexRequest( + String index, + String docType, + String key, + XContentType contentType, + byte[] document) { + return new IndexRequest(index) + .id(key) + .source(document, contentType); + } + + @Override + public DeleteRequest createDeleteRequest(String index, String docType, String key) { + return new DeleteRequest(index, key); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Elasticsearch7DynamicSink that = (Elasticsearch7DynamicSink) o; + return Objects.equals(format, that.format) && + Objects.equals(schema, that.schema) && + Objects.equals(config, that.config) && + Objects.equals(builderProvider, that.builderProvider); + } + + @Override + public int hashCode() { + return Objects.hash(format, schema, config, builderProvider); + } +} diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java new file mode 100644 index 0000000..055989b --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.connector.format.SinkFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.utils.TableSchemaUtils; + +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_PATH_PREFIX; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION; + +/** + * A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch7DynamicSink}. + */ +@Internal +public class Elasticsearch7DynamicSinkFactory implements DynamicTableSinkFactory { + private static final Set<ConfigOption<?>> requiredOptions = Stream.of( + HOSTS_OPTION, + INDEX_OPTION + ).collect(Collectors.toSet()); + private static final Set<ConfigOption<?>> optionalOptions = Stream.of( + KEY_DELIMITER_OPTION, + FAILURE_HANDLER_OPTION, + FLUSH_ON_CHECKPOINT_OPTION, + BULK_FLASH_MAX_SIZE_OPTION, + BULK_FLUSH_MAX_ACTIONS_OPTION, + BULK_FLUSH_INTERVAL_OPTION, + BULK_FLUSH_BACKOFF_TYPE_OPTION, + BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION, + BULK_FLUSH_BACKOFF_DELAY_OPTION, + CONNECTION_MAX_RETRY_TIMEOUT_OPTION, + CONNECTION_PATH_PREFIX, + FORMAT_OPTION + ).collect(Collectors.toSet()); + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + TableSchema tableSchema = context.getCatalogTable().getSchema(); + ElasticsearchValidationUtils.validatePrimaryKey(tableSchema); + + final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + + final SinkFormat<SerializationSchema<RowData>> format = helper.discoverSinkFormat( + SerializationFormatFactory.class, + FORMAT_OPTION); + + helper.validate(); + Configuration configuration = new Configuration(); + context.getCatalogTable() + .getOptions() + .forEach(configuration::setString); + Elasticsearch7Configuration config = new Elasticsearch7Configuration(configuration, context.getClassLoader()); + + validateOptions(config, configuration); + + return new Elasticsearch7DynamicSink( + format, + config, + TableSchemaUtils.getPhysicalSchema(tableSchema)); + } + + private void validateOptions(Elasticsearch7Configuration config, Configuration originalConfiguration) { + config.getFailureHandler(); // checks if we can instantiate the custom failure handler + config.getHosts(); // validate hosts + validateOptions( + config.getIndex().length() >= 1, + () -> String.format("'%s' must not be empty", INDEX_OPTION.key())); + validateOptions( + config.getBulkFlushMaxActions().map(maxActions -> maxActions >= 1).orElse(true), + () -> String.format( + "'%s' must be at least 1 character. Got: %s", + BULK_FLUSH_MAX_ACTIONS_OPTION.key(), + config.getBulkFlushMaxActions().get()) + ); + validateOptions( + config.getBulkFlushMaxSize().map(maxSize -> maxSize >= 1024 * 1024).orElse(true), + () -> String.format( + "'%s' must be at least 1mb character. Got: %s", + BULK_FLASH_MAX_SIZE_OPTION.key(), + originalConfiguration.get(BULK_FLASH_MAX_SIZE_OPTION).toHumanReadableString()) + ); + validateOptions( + config.getBulkFlushBackoffRetries().map(retries -> retries >= 1).orElse(true), + () -> String.format( + "'%s' must be at least 1. Got: %s", + BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(), + config.getBulkFlushBackoffRetries().get()) + ); + } + + private static void validateOptions(boolean condition, Supplier<String> message) { + if (!condition) { + throw new ValidationException(message.get()); + } + } + + @Override + public String factoryIdentifier() { + return "elasticsearch-7"; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return requiredOptions; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + return optionalOptions; + } +} diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connectors/flink-connector-elasticsearch7/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000..10e4846 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch7/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSinkFactory diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java new file mode 100644 index 0000000..a830fa3 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context; + +/** + * Tests for validation in {@link Elasticsearch7DynamicSinkFactory}. + */ +public class Elasticsearch7DynamicSinkFactoryTest { + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void validateEmptyConfiguration() { + Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory(); + + thrown.expect(ValidationException.class); + thrown.expectMessage( + "One or more required options are missing.\n" + + "\n" + + "Missing required options are:\n" + + "\n" + + "hosts\n" + + "index"); + sinkFactory.createDynamicTableSink( + context() + .withSchema(TableSchema.builder() + .field("a", DataTypes.TIME()) + .build()) + .build() + ); + } + + @Test + public void validateWrongIndex() { + Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory(); + + thrown.expect(ValidationException.class); + thrown.expectMessage( + "'index' must not be empty"); + sinkFactory.createDynamicTableSink( + context() + .withSchema(TableSchema.builder() + .field("a", DataTypes.TIME()) + .build()) + .withOption("index", "") + .withOption("hosts", "http://localhost:12345") + .build() + ); + } + + @Test + public void validateWrongHosts() { + Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory(); + + thrown.expect(ValidationException.class); + thrown.expectMessage( + "Could not parse host 'wrong-host' in option 'hosts'. It should follow the format 'http://host_name:port'."); + sinkFactory.createDynamicTableSink( + context() + .withSchema(TableSchema.builder() + .field("a", DataTypes.TIME()) + .build()) + .withOption("index", "MyIndex") + .withOption("hosts", "wrong-host") + .build() + ); + } + + @Test + public void validateWrongFlushSize() { + Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory(); + + thrown.expect(ValidationException.class); + thrown.expectMessage( + "'sink.bulk-flush.max-size' must be at least 1mb character. Got: 1024 bytes"); + sinkFactory.createDynamicTableSink( + context() + .withSchema(TableSchema.builder() + .field("a", DataTypes.TIME()) + .build()) + .withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex") + .withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234") + .withOption(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION.key(), "1kb") + .build() + ); + } + + @Test + public void validateWrongRetries() { + Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory(); + + thrown.expect(ValidationException.class); + thrown.expectMessage( + "'sink.bulk-flush.back-off.max-retries' must be at least 1. Got: 0"); + sinkFactory.createDynamicTableSink( + context() + .withSchema(TableSchema.builder() + .field("a", DataTypes.TIME()) + .build()) + .withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex") + .withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234") + .withOption(ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(), "0") + .build() + ); + } + + @Test + public void validateWrongMaxActions() { + Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory(); + + thrown.expect(ValidationException.class); + thrown.expectMessage( + "'sink.bulk-flush.max-actions' must be at least 1 character. Got: 0"); + sinkFactory.createDynamicTableSink( + context() + .withSchema(TableSchema.builder() + .field("a", DataTypes.TIME()) + .build()) + .withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex") + .withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234") + .withOption(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(), "0") + .build() + ); + } + + @Test + public void validateWrongBackoffDelay() { + Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory(); + + thrown.expect(ValidationException.class); + thrown.expectMessage( + "Invalid value for option 'sink.bulk-flush.back-off.delay'."); + sinkFactory.createDynamicTableSink( + context() + .withSchema(TableSchema.builder() + .field("a", DataTypes.TIME()) + .build()) + .withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex") + .withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234") + .withOption(ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(), "-1s") + .build() + ); + } + + @Test + public void validatePrimaryKeyOnIllegalColumn() { + Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory(); + + thrown.expect(ValidationException.class); + thrown.expectMessage( + "The table has a primary key on columns of illegal types: " + + "[ARRAY, MAP, MULTISET, ROW, RAW, VARBINARY].\n" + + " Elasticsearch sink does not support primary keys on columns of types: " + + "[ARRAY, MAP, MULTISET, STRUCTURED_TYPE, ROW, RAW, BINARY, VARBINARY]."); + sinkFactory.createDynamicTableSink( + context() + .withSchema(TableSchema.builder() + .field("a", DataTypes.BIGINT().notNull()) + .field("b", DataTypes.ARRAY(DataTypes.BIGINT().notNull()).notNull()) + .field("c", DataTypes.MAP(DataTypes.BIGINT(), DataTypes.STRING()).notNull()) + .field("d", DataTypes.MULTISET(DataTypes.BIGINT().notNull()).notNull()) + .field("e", DataTypes.ROW(DataTypes.FIELD("a", DataTypes.BIGINT())).notNull()) + .field("f", DataTypes.RAW(Types.BIG_INT).notNull()) + .field("g", DataTypes.BYTES().notNull()) + .primaryKey("a", "b", "c", "d", "e", "f", "g") + .build()) + .withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex") + .withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234") + .withOption(ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(), "1s") + .build() + ); + } +} diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java new file mode 100644 index 0000000..3b667dc --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.testutils.ElasticsearchResource; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; + +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.search.SearchHits; +import org.junit.ClassRule; +import org.junit.Test; + +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context; +import static org.apache.flink.table.api.Expressions.row; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +/** + * IT tests for {@link Elasticsearch7DynamicSink}. + */ +public class Elasticsearch7DynamicSinkITCase { + + @ClassRule + public static ElasticsearchResource elasticsearchResource = new ElasticsearchResource("es-dynamic-sink-it-test"); + + @Test + public void testWritingDocuments() throws Exception { + TableSchema schema = TableSchema.builder() + .field("a", DataTypes.BIGINT().notNull()) + .field("b", DataTypes.TIME()) + .field("c", DataTypes.STRING().notNull()) + .field("d", DataTypes.FLOAT()) + .field("e", DataTypes.TINYINT().notNull()) + .field("f", DataTypes.DATE()) + .field("g", DataTypes.TIMESTAMP().notNull()) + .primaryKey("a", "g") + .build(); + GenericRowData rowData = GenericRowData.of( + 1L, + 12345, + StringData.fromString("ABCDE"), + 12.12f, + (byte) 2, + 12345, + TimestampData.fromLocalDateTime(LocalDateTime.parse("2012-12-12T12:12:12"))); + + String index = "writing-documents"; + Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory(); + + SinkFunctionProvider sinkRuntimeProvider = (SinkFunctionProvider) sinkFactory.createDynamicTableSink( + context() + .withSchema(schema) + .withOption(ElasticsearchOptions.INDEX_OPTION.key(), index) + .withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200") + .withOption(ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false") + .build() + ).getSinkRuntimeProvider(new MockContext()); + + SinkFunction<RowData> sinkFunction = sinkRuntimeProvider.createSinkFunction(); + StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + rowData.setRowKind(RowKind.UPDATE_AFTER); + environment.<RowData>fromElements(rowData).addSink(sinkFunction); + environment.execute(); + + Client client = elasticsearchResource.getClient(); + Map<String, Object> response = client.get(new GetRequest(index, "1_2012-12-12T12:12:12")).actionGet().getSource(); + Map<Object, Object> expectedMap = new HashMap<>(); + expectedMap.put("a", 1); + expectedMap.put("b", "00:00:12Z"); + expectedMap.put("c", "ABCDE"); + expectedMap.put("d", 12.12d); + expectedMap.put("e", 2); + expectedMap.put("f", "2003-10-20"); + expectedMap.put("g", "2012-12-12T12:12:12Z"); + assertThat(response, equalTo(expectedMap)); + } + + @Test + public void testWritingDocumentsFromTableApi() throws Exception { + TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance() + .useBlinkPlanner() + .inStreamingMode() + .build()); + + String index = "table-api"; + tableEnvironment.executeSql("CREATE TABLE esTable (" + + "a BIGINT NOT NULL,\n" + + "b TIME,\n" + + "c STRING NOT NULL,\n" + + "d FLOAT,\n" + + "e TINYINT NOT NULL,\n" + + "f DATE,\n" + + "g TIMESTAMP NOT NULL," + + "h as a + 2,\n" + + "PRIMARY KEY (a, g) NOT ENFORCED\n" + + ")\n" + + "WITH (\n" + + String.format("'%s'='%s',\n", "connector", "elasticsearch-7") + + String.format("'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index) + + String.format("'%s'='%s',\n", ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200") + + String.format("'%s'='%s'\n", ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false") + + ")"); + + tableEnvironment.fromValues( + row( + 1L, + LocalTime.ofNanoOfDay(12345L * 1_000_000L), + "ABCDE", + 12.12f, + (byte) 2, + LocalDate.ofEpochDay(12345), + LocalDateTime.parse("2012-12-12T12:12:12")) + ).executeInsert("esTable") + .getJobClient() + .get() + .getJobExecutionResult(this.getClass().getClassLoader()) + .get(); + + Client client = elasticsearchResource.getClient(); + Map<String, Object> response = client.get(new GetRequest(index, "1_2012-12-12T12:12:12")).actionGet().getSource(); + Map<Object, Object> expectedMap = new HashMap<>(); + expectedMap.put("a", 1); + expectedMap.put("b", "00:00:12Z"); + expectedMap.put("c", "ABCDE"); + expectedMap.put("d", 12.12d); + expectedMap.put("e", 2); + expectedMap.put("f", "2003-10-20"); + expectedMap.put("g", "2012-12-12T12:12:12Z"); + assertThat(response, equalTo(expectedMap)); + } + + @Test + public void testWritingDocumentsNoPrimaryKey() throws Exception { + TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance() + .useBlinkPlanner() + .inStreamingMode() + .build()); + + String index = "no-primary-key"; + tableEnvironment.executeSql("CREATE TABLE esTable (" + + "a BIGINT NOT NULL,\n" + + "b TIME,\n" + + "c STRING NOT NULL,\n" + + "d FLOAT,\n" + + "e TINYINT NOT NULL,\n" + + "f DATE,\n" + + "g TIMESTAMP NOT NULL\n" + + ")\n" + + "WITH (\n" + + String.format("'%s'='%s',\n", "connector", "elasticsearch-7") + + String.format("'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index) + + String.format("'%s'='%s',\n", ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200") + + String.format("'%s'='%s'\n", ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false") + + ")"); + + tableEnvironment.fromValues( + row( + 1L, + LocalTime.ofNanoOfDay(12345L * 1_000_000L), + "ABCDE", + 12.12f, + (byte) 2, + LocalDate.ofEpochDay(12345), + LocalDateTime.parse("2012-12-12T12:12:12")) + ).executeInsert("esTable") + .getJobClient() + .get() + .getJobExecutionResult(this.getClass().getClassLoader()) + .get(); + + Client client = elasticsearchResource.getClient(); + + // search API does not return documents that were not indexed, we might need to query + // the index a few times + Deadline deadline = Deadline.fromNow(Duration.ofSeconds(1)); + SearchHits hits; + do { + hits = client.prepareSearch(index) + .execute() + .actionGet() + .getHits(); + if (hits.getTotalHits().value == 0) { + Thread.sleep(100); + } + } while (hits.getTotalHits().value == 0 && deadline.hasTimeLeft()); + + Map<String, Object> result = hits.getAt(0).getSourceAsMap(); + Map<Object, Object> expectedMap = new HashMap<>(); + expectedMap.put("a", 1); + expectedMap.put("b", "00:00:12Z"); + expectedMap.put("c", "ABCDE"); + expectedMap.put("d", 12.12d); + expectedMap.put("e", 2); + expectedMap.put("f", "2003-10-20"); + expectedMap.put("g", "2012-12-12T12:12:12Z"); + assertThat(result, equalTo(expectedMap)); + } + + private static class MockContext implements DynamicTableSink.Context { + @Override + public boolean isBounded() { + return false; + } + + @Override + public TypeInformation<?> createTypeInformation(DataType consumedDataType) { + return null; + } + + @Override + public DynamicTableSink.DataStructureConverter createDataStructureConverter(DataType consumedDataType) { + return null; + } + } +} diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java new file mode 100644 index 0000000..466ede3 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.table; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.SinkFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; + +import org.apache.http.HttpHost; +import org.elasticsearch.action.ActionRequest; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.List; + +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; + +/** + * Tests for {@link Elasticsearch7DynamicSink} parameters. + */ +public class Elasticsearch7DynamicSinkTest { + + private static final String FIELD_KEY = "key"; + private static final String FIELD_FRUIT_NAME = "fruit_name"; + private static final String FIELD_COUNT = "count"; + private static final String FIELD_TS = "ts"; + + private static final String HOSTNAME = "host1"; + private static final int PORT = 1234; + private static final String SCHEMA = "https"; + private static final String INDEX = "MyIndex"; + private static final String DOC_TYPE = "MyType"; + + @Test + public void testBuilder() { + final TableSchema schema = createTestSchema(); + + BuilderProvider provider = new BuilderProvider(); + final Elasticsearch7DynamicSink testSink = new Elasticsearch7DynamicSink( + new DummySinkFormat(), + new Elasticsearch7Configuration(getConfig(), this.getClass().getClassLoader()), + schema, + provider + ); + + testSink.getSinkRuntimeProvider(new MockSinkContext()).createSinkFunction(); + + verify(provider.builderSpy).setFailureHandler(new DummyFailureHandler()); + verify(provider.builderSpy).setBulkFlushBackoff(true); + verify(provider.builderSpy).setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL); + verify(provider.builderSpy).setBulkFlushBackoffDelay(123); + verify(provider.builderSpy).setBulkFlushBackoffRetries(3); + verify(provider.builderSpy).setBulkFlushInterval(100); + verify(provider.builderSpy).setBulkFlushMaxActions(1000); + verify(provider.builderSpy).setBulkFlushMaxSizeMb(1); + verify(provider.builderSpy).setRestClientFactory(new Elasticsearch7DynamicSink.DefaultRestClientFactory("/myapp")); + verify(provider.sinkSpy).disableFlushOnCheckpoint(); + } + + private Configuration getConfig() { + Configuration configuration = new Configuration(); + configuration.setString(ElasticsearchOptions.INDEX_OPTION.key(), INDEX); + configuration.setString(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), DOC_TYPE); + configuration.setString(ElasticsearchOptions.HOSTS_OPTION.key(), SCHEMA + "://" + HOSTNAME + ":" + PORT); + configuration.setString(ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION.key(), "exponential"); + configuration.setString(ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(), "123"); + configuration.setString(ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(), "3"); + configuration.setString(ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION.key(), "100"); + configuration.setString(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(), "1000"); + configuration.setString(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION.key(), "1mb"); + configuration.setString(ElasticsearchOptions.CONNECTION_PATH_PREFIX.key(), "/myapp"); + configuration.setString(ElasticsearchOptions.FAILURE_HANDLER_OPTION.key(), DummyFailureHandler.class.getName()); + configuration.setString(ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false"); + return configuration; + } + + private static class BuilderProvider implements Elasticsearch7DynamicSink.ElasticSearchBuilderProvider { + public ElasticsearchSink.Builder<RowData> builderSpy; + public ElasticsearchSink<RowData> sinkSpy; + + @Override + public ElasticsearchSink.Builder<RowData> createBuilder( + List<HttpHost> httpHosts, + RowElasticsearchSinkFunction upsertSinkFunction) { + builderSpy = Mockito.spy(new ElasticsearchSink.Builder<>(httpHosts, upsertSinkFunction)); + doAnswer( + invocation -> { + sinkSpy = Mockito.spy((ElasticsearchSink<RowData>) invocation.callRealMethod()); + return sinkSpy; + } + ).when(builderSpy).build(); + + return builderSpy; + } + } + + private TableSchema createTestSchema() { + return TableSchema.builder() + .field(FIELD_KEY, DataTypes.BIGINT()) + .field(FIELD_FRUIT_NAME, DataTypes.STRING()) + .field(FIELD_COUNT, DataTypes.DECIMAL(10, 4)) + .field(FIELD_TS, DataTypes.TIMESTAMP(3)) + .build(); + } + + private static class DummySerializationSchema implements SerializationSchema<RowData> { + + private static final DummySerializationSchema INSTANCE = new DummySerializationSchema(); + + @Override + public byte[] serialize(RowData element) { + return new byte[0]; + } + } + + private static class DummySinkFormat implements SinkFormat<SerializationSchema<RowData>> { + @Override + public SerializationSchema<RowData> createSinkFormat( + DynamicTableSink.Context context, + DataType consumedDataType) { + return DummySerializationSchema.INSTANCE; + } + + @Override + public ChangelogMode getChangelogMode() { + return null; + } + } + + private static class MockSinkContext implements DynamicTableSink.Context { + @Override + public boolean isBounded() { + return false; + } + + @Override + public TypeInformation<?> createTypeInformation(DataType consumedDataType) { + return null; + } + + @Override + public DynamicTableSink.DataStructureConverter createDataStructureConverter(DataType consumedDataType) { + return null; + } + } + + /** + * Custom failure handler for testing. + */ + public static class DummyFailureHandler implements ActionRequestFailureHandler { + + @Override + public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) { + // do nothing + } + + @Override + public boolean equals(Object o) { + return o instanceof DummyFailureHandler; + } + + @Override + public int hashCode() { + return DummyFailureHandler.class.hashCode(); + } + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java index 377cfb8..4863cb2 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java @@ -55,6 +55,9 @@ public class TableSchemaUtils { builder.field(tableColumn.getName(), tableColumn.getType()); } }); + tableSchema.getPrimaryKey().ifPresent( + uniqueConstraint -> builder.primaryKey(uniqueConstraint.getColumns().toArray(new String[0])) + ); return builder.build(); }