This is an automated email from the ASF dual-hosted git repository.
junrui pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git
The following commit(s) were added to refs/heads/main by this push:
new c7f813c [FLINK-38373][Connectors / ElasticSearch] ES index supports
fields suffix
c7f813c is described below
commit c7f813c85b0d29b4ac2270958b9f4debc39530ff
Author: garyjgliu <[email protected]>
AuthorDate: Wed Sep 17 17:49:51 2025 +0800
[FLINK-38373][Connectors / ElasticSearch] ES index supports fields suffix
---
.../table/ElasticsearchConfiguration.java | 8 +++
.../table/ElasticsearchConnectorOptions.java | 12 ++++
.../table/ElasticsearchDynamicSink.java | 2 +-
.../ElasticsearchDynamicTableFactoryBase.java | 6 +-
.../elasticsearch/table/IndexGeneratorFactory.java | 50 ++++++++++++++
.../elasticsearch/table/IndexGeneratorTest.java | 79 +++++++++++++++++++++-
6 files changed, 153 insertions(+), 4 deletions(-)
diff --git
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
index 6c74200..35db69f 100644
---
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
+++
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
@@ -135,6 +135,14 @@ public class ElasticsearchConfiguration {
return config.get(RETRY_ON_CONFLICTS);
}
+ public String getIndexSuffixFieldName() {
+ return
config.get(ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_NAME_OPTION);
+ }
+
+ public int getIndexSuffixFieldLength() {
+ return
config.get(ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_LENGTH_OPTION);
+ }
+
/**
* Parse Hosts String to list.
*
diff --git
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
index f34cb87..7817a0d 100644
---
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
+++
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
@@ -152,4 +152,16 @@ public class ElasticsearchConnectorOptions {
.defaultValue(-1)
.withDescription(
"The number of retry when conflicts with
concurrent requests.");
+
+ public static final ConfigOption<String> INDEX_SUFFIX_FIELD_NAME_OPTION =
+ ConfigOptions.key("index.suffix.field.name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The index suffix field name");
+
+ public static final ConfigOption<Integer> INDEX_SUFFIX_FIELD_LENGTH_OPTION
=
+ ConfigOptions.key("index.suffix.field.length")
+ .intType()
+ .defaultValue(-1)
+ .withDescription("The length(exclusive) of index suffix
field value");
}
diff --git
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
index d6d04a6..a522613 100644
---
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
+++
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
@@ -98,7 +98,7 @@ class ElasticsearchDynamicSink implements DynamicTableSink {
IndexGenerator createIndexGenerator() {
return IndexGeneratorFactory.createIndexGenerator(
- config.getIndex(),
+ config,
DataType.getFieldNames(physicalRowDataType),
DataType.getFieldDataTypes(physicalRowDataType),
localTimeZoneId);
diff --git
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
index 2686e2e..f223380 100644
---
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
+++
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
@@ -70,6 +70,8 @@ import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnec
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.FORMAT_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_LENGTH_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_NAME_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT;
@@ -287,7 +289,9 @@ abstract class ElasticsearchDynamicTableFactoryBase
PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
PARTIAL_CACHE_MAX_ROWS,
PARTIAL_CACHE_CACHE_MISSING_KEY,
- MAX_RETRIES)
+ MAX_RETRIES,
+ INDEX_SUFFIX_FIELD_LENGTH_OPTION,
+ INDEX_SUFFIX_FIELD_NAME_OPTION)
.collect(Collectors.toSet());
}
diff --git
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
index 92886f4..b628b10 100644
---
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
+++
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
@@ -86,6 +86,56 @@ final class IndexGeneratorFactory {
return createIndexGenerator(index, fieldNames, dataTypes,
ZoneId.systemDefault());
}
+ public static IndexGenerator createIndexGenerator(
+ ElasticsearchConfiguration config,
+ List<String> fieldNames,
+ List<DataType> dataTypes,
+ ZoneId localTimeZoneId) {
+ if (config.getIndexSuffixFieldName() != null) {
+ return createSuffixIndexGenerator(
+ config.getIndex(),
+ config.getIndexSuffixFieldName(),
+ config.getIndexSuffixFieldLength(),
+ fieldNames,
+ dataTypes);
+ } else {
+ return createIndexGenerator(config.getIndex(), fieldNames,
dataTypes, localTimeZoneId);
+ }
+ }
+
+ private static IndexGenerator createSuffixIndexGenerator(
+ String indexPrefix,
+ String indexSuffixFieldName,
+ int indexSuffixFieldLength,
+ List<String> fieldNames,
+ List<DataType> fieldTypes) {
+ int indexFieldPos = fieldNames.indexOf(indexSuffixFieldName);
+ if (indexFieldPos < 0) {
+ throw new TableException(
+ String.format(
+ "Unknown index field '%s' of '%s', please check
the field name.",
+ indexSuffixFieldName, String.join(",",
fieldNames)));
+ }
+ final LogicalType indexFieldType =
fieldTypes.get(indexFieldPos).getLogicalType();
+ final RowData.FieldGetter fieldGetter =
+ RowData.createFieldGetter(indexFieldType, indexFieldPos);
+ return row -> {
+ Object fieldOrNull = fieldGetter.getFieldOrNull(row);
+ final String indexSuffix;
+ if (fieldOrNull != null) {
+ if (indexSuffixFieldLength > 0) {
+ indexSuffix = String.valueOf(fieldOrNull).substring(0,
indexSuffixFieldLength);
+ } else {
+ indexSuffix = String.valueOf(fieldOrNull);
+ }
+ } else {
+ throw new RuntimeException(
+ "Index suffix field " + indexSuffixFieldName + " is
null");
+ }
+ return String.format("%s%s", indexPrefix, indexSuffix);
+ };
+ }
+
interface DynamicFormatter extends Serializable {
String format(@Nonnull Object fieldValue, DateTimeFormatter formatter);
}
diff --git
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java
index 44062a3..acbd4b6 100644
---
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java
+++
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.elasticsearch.table;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.GenericRowData;
@@ -39,6 +40,9 @@ import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_LENGTH_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_NAME_OPTION;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.junit.jupiter.api.Assumptions.assumingThat;
@@ -81,7 +85,10 @@ public class IndexGeneratorTest {
GenericRowData.of(
1,
StringData.fromString("apple"),
- Timestamp.valueOf("2020-03-18 12:12:14").getTime(),
+ LocalDateTime.of(2020, 3, 18, 12, 12, 14)
+ .atZone(ZoneId.of("Asia/Shanghai"))
+ .toInstant()
+ .toEpochMilli(),
(int)
Date.valueOf("2020-03-18").toLocalDate().toEpochDay(),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-18 12:12:14")),
(int)
@@ -100,7 +107,10 @@ public class IndexGeneratorTest {
GenericRowData.of(
2,
StringData.fromString("peanut"),
- Timestamp.valueOf("2020-03-19 12:22:14").getTime(),
+ LocalDateTime.of(2020, 3, 19, 12, 22, 14)
+ .atZone(ZoneId.of("Asia/Shanghai"))
+ .toInstant()
+ .toEpochMilli(),
(int)
Date.valueOf("2020-03-19").toLocalDate().toEpochDay(),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-19 12:22:21")),
(int)
@@ -367,4 +377,69 @@ public class IndexGeneratorTest {
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(expectedExceptionMsg);
}
+
+ @Test
+ public void testSuffixIndexGenerator() {
+ Configuration config = new Configuration();
+ config.set(INDEX_OPTION, "index_");
+ config.set(INDEX_SUFFIX_FIELD_NAME_OPTION, "log_ts");
+ config.set(INDEX_SUFFIX_FIELD_LENGTH_OPTION, 10);
+ IndexGenerator indexGenerator =
+ IndexGeneratorFactory.createIndexGenerator(
+ new ElasticsearchConfiguration(config),
+ fieldNames,
+ dataTypes,
+ ZoneId.systemDefault());
+ // 1584504734000
+
assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("index_1584504734");
+ // 1584591734000
+
assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("index_1584591734");
+ }
+
+ @Test
+ public void testSuffixIndexGeneratorWithoutLengthLimitation() {
+ Configuration config = new Configuration();
+ config.set(INDEX_OPTION, "index_");
+ config.set(INDEX_SUFFIX_FIELD_NAME_OPTION, "log_ts");
+ IndexGenerator indexGenerator =
+ IndexGeneratorFactory.createIndexGenerator(
+ new ElasticsearchConfiguration(config),
+ fieldNames,
+ dataTypes,
+ ZoneId.systemDefault());
+ // 1584504734000
+
assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("index_1584504734000");
+ // 1584591734000
+
assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("index_1584591734000");
+ }
+
+ @Test
+ public void testStaticIndexCompatibility() {
+ Configuration config = new Configuration();
+ config.set(INDEX_OPTION, "my-index");
+ IndexGenerator indexGenerator =
+ IndexGeneratorFactory.createIndexGenerator(
+ new ElasticsearchConfiguration(config),
+ fieldNames,
+ dataTypes,
+ ZoneId.systemDefault());
+ indexGenerator.open();
+ assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("my-index");
+ assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("my-index");
+ }
+
+ @Test
+ public void testDynamicIndexFromDateCompatibility() {
+ Configuration config = new Configuration();
+ config.set(INDEX_OPTION, "my-index-{log_date|yyyy/MM/dd}");
+ IndexGenerator indexGenerator =
+ IndexGeneratorFactory.createIndexGenerator(
+ new ElasticsearchConfiguration(config),
+ fieldNames,
+ dataTypes,
+ ZoneId.systemDefault());
+ indexGenerator.open();
+
assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("my-index-2020/03/18");
+
assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("my-index-2020/03/19");
+ }
}