This is an automated email from the ASF dual-hosted git repository.

junrui pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git


The following commit(s) were added to refs/heads/main by this push:
     new c7f813c  [FLINK-38373][Connectors / ElasticSearch] ES index supports 
fields suffix
c7f813c is described below

commit c7f813c85b0d29b4ac2270958b9f4debc39530ff
Author: garyjgliu <[email protected]>
AuthorDate: Wed Sep 17 17:49:51 2025 +0800

    [FLINK-38373][Connectors / ElasticSearch] ES index supports fields suffix
---
 .../table/ElasticsearchConfiguration.java          |  8 +++
 .../table/ElasticsearchConnectorOptions.java       | 12 ++++
 .../table/ElasticsearchDynamicSink.java            |  2 +-
 .../ElasticsearchDynamicTableFactoryBase.java      |  6 +-
 .../elasticsearch/table/IndexGeneratorFactory.java | 50 ++++++++++++++
 .../elasticsearch/table/IndexGeneratorTest.java    | 79 +++++++++++++++++++++-
 6 files changed, 153 insertions(+), 4 deletions(-)

diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
index 6c74200..35db69f 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
@@ -135,6 +135,14 @@ public class ElasticsearchConfiguration {
         return config.get(RETRY_ON_CONFLICTS);
     }
 
+    public String getIndexSuffixFieldName() {
+        return 
config.get(ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_NAME_OPTION);
+    }
+
+    public int getIndexSuffixFieldLength() {
+        return 
config.get(ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_LENGTH_OPTION);
+    }
+
     /**
      * Parse Hosts String to list.
      *
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
index f34cb87..7817a0d 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
@@ -152,4 +152,16 @@ public class ElasticsearchConnectorOptions {
                     .defaultValue(-1)
                     .withDescription(
                             "The number of retry when conflicts with 
concurrent requests.");
+
+    public static final ConfigOption<String> INDEX_SUFFIX_FIELD_NAME_OPTION =
+            ConfigOptions.key("index.suffix.field.name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The index suffix field name");
+
+    public static final ConfigOption<Integer> INDEX_SUFFIX_FIELD_LENGTH_OPTION 
=
+            ConfigOptions.key("index.suffix.field.length")
+                    .intType()
+                    .defaultValue(-1)
+                    .withDescription("The length(exclusive) of index suffix 
field value");
 }
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
index d6d04a6..a522613 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
@@ -98,7 +98,7 @@ class ElasticsearchDynamicSink implements DynamicTableSink {
 
     IndexGenerator createIndexGenerator() {
         return IndexGeneratorFactory.createIndexGenerator(
-                config.getIndex(),
+                config,
                 DataType.getFieldNames(physicalRowDataType),
                 DataType.getFieldDataTypes(physicalRowDataType),
                 localTimeZoneId);
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
index 2686e2e..f223380 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
@@ -70,6 +70,8 @@ import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnec
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.FORMAT_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_LENGTH_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_NAME_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT;
@@ -287,7 +289,9 @@ abstract class ElasticsearchDynamicTableFactoryBase
                         PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
                         PARTIAL_CACHE_MAX_ROWS,
                         PARTIAL_CACHE_CACHE_MISSING_KEY,
-                        MAX_RETRIES)
+                        MAX_RETRIES,
+                        INDEX_SUFFIX_FIELD_LENGTH_OPTION,
+                        INDEX_SUFFIX_FIELD_NAME_OPTION)
                 .collect(Collectors.toSet());
     }
 
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
index 92886f4..b628b10 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
@@ -86,6 +86,56 @@ final class IndexGeneratorFactory {
         return createIndexGenerator(index, fieldNames, dataTypes, 
ZoneId.systemDefault());
     }
 
+    public static IndexGenerator createIndexGenerator(
+            ElasticsearchConfiguration config,
+            List<String> fieldNames,
+            List<DataType> dataTypes,
+            ZoneId localTimeZoneId) {
+        if (config.getIndexSuffixFieldName() != null) {
+            return createSuffixIndexGenerator(
+                    config.getIndex(),
+                    config.getIndexSuffixFieldName(),
+                    config.getIndexSuffixFieldLength(),
+                    fieldNames,
+                    dataTypes);
+        } else {
+            return createIndexGenerator(config.getIndex(), fieldNames, 
dataTypes, localTimeZoneId);
+        }
+    }
+
+    private static IndexGenerator createSuffixIndexGenerator(
+            String indexPrefix,
+            String indexSuffixFieldName,
+            int indexSuffixFieldLength,
+            List<String> fieldNames,
+            List<DataType> fieldTypes) {
+        int indexFieldPos = fieldNames.indexOf(indexSuffixFieldName);
+        if (indexFieldPos < 0) {
+            throw new TableException(
+                    String.format(
+                            "Unknown index field '%s' of '%s', please check 
the field name.",
+                            indexSuffixFieldName, String.join(",", 
fieldNames)));
+        }
+        final LogicalType indexFieldType = 
fieldTypes.get(indexFieldPos).getLogicalType();
+        final RowData.FieldGetter fieldGetter =
+                RowData.createFieldGetter(indexFieldType, indexFieldPos);
+        return row -> {
+            Object fieldOrNull = fieldGetter.getFieldOrNull(row);
+            final String indexSuffix;
+            if (fieldOrNull != null) {
+                if (indexSuffixFieldLength > 0) {
+                    indexSuffix = String.valueOf(fieldOrNull).substring(0, 
indexSuffixFieldLength);
+                } else {
+                    indexSuffix = String.valueOf(fieldOrNull);
+                }
+            } else {
+                throw new RuntimeException(
+                        "Index suffix field " + indexSuffixFieldName + " is 
null");
+            }
+            return String.format("%s%s", indexPrefix, indexSuffix);
+        };
+    }
+
     interface DynamicFormatter extends Serializable {
         String format(@Nonnull Object fieldValue, DateTimeFormatter formatter);
     }
diff --git 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java
 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java
index 44062a3..acbd4b6 100644
--- 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java
+++ 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.elasticsearch.table;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.GenericRowData;
@@ -39,6 +40,9 @@ import java.time.format.DateTimeFormatter;
 import java.util.Arrays;
 import java.util.List;
 
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_LENGTH_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_NAME_OPTION;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
 import static org.junit.jupiter.api.Assumptions.assumingThat;
@@ -81,7 +85,10 @@ public class IndexGeneratorTest {
                     GenericRowData.of(
                             1,
                             StringData.fromString("apple"),
-                            Timestamp.valueOf("2020-03-18 12:12:14").getTime(),
+                            LocalDateTime.of(2020, 3, 18, 12, 12, 14)
+                                    .atZone(ZoneId.of("Asia/Shanghai"))
+                                    .toInstant()
+                                    .toEpochMilli(),
                             (int) 
Date.valueOf("2020-03-18").toLocalDate().toEpochDay(),
                             
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-18 12:12:14")),
                             (int)
@@ -100,7 +107,10 @@ public class IndexGeneratorTest {
                     GenericRowData.of(
                             2,
                             StringData.fromString("peanut"),
-                            Timestamp.valueOf("2020-03-19 12:22:14").getTime(),
+                            LocalDateTime.of(2020, 3, 19, 12, 22, 14)
+                                    .atZone(ZoneId.of("Asia/Shanghai"))
+                                    .toInstant()
+                                    .toEpochMilli(),
                             (int) 
Date.valueOf("2020-03-19").toLocalDate().toEpochDay(),
                             
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-19 12:22:21")),
                             (int)
@@ -367,4 +377,69 @@ public class IndexGeneratorTest {
                 .isInstanceOf(IllegalArgumentException.class)
                 .hasMessage(expectedExceptionMsg);
     }
+
+    @Test
+    public void testSuffixIndexGenerator() {
+        Configuration config = new Configuration();
+        config.set(INDEX_OPTION, "index_");
+        config.set(INDEX_SUFFIX_FIELD_NAME_OPTION, "log_ts");
+        config.set(INDEX_SUFFIX_FIELD_LENGTH_OPTION, 10);
+        IndexGenerator indexGenerator =
+                IndexGeneratorFactory.createIndexGenerator(
+                        new ElasticsearchConfiguration(config),
+                        fieldNames,
+                        dataTypes,
+                        ZoneId.systemDefault());
+        // 1584504734000
+        
assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("index_1584504734");
+        // 1584591734000
+        
assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("index_1584591734");
+    }
+
+    @Test
+    public void testSuffixIndexGeneratorWithoutLengthLimitation() {
+        Configuration config = new Configuration();
+        config.set(INDEX_OPTION, "index_");
+        config.set(INDEX_SUFFIX_FIELD_NAME_OPTION, "log_ts");
+        IndexGenerator indexGenerator =
+                IndexGeneratorFactory.createIndexGenerator(
+                        new ElasticsearchConfiguration(config),
+                        fieldNames,
+                        dataTypes,
+                        ZoneId.systemDefault());
+        // 1584504734000
+        
assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("index_1584504734000");
+        // 1584591734000
+        
assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("index_1584591734000");
+    }
+
+    @Test
+    public void testStaticIndexCompatibility() {
+        Configuration config = new Configuration();
+        config.set(INDEX_OPTION, "my-index");
+        IndexGenerator indexGenerator =
+                IndexGeneratorFactory.createIndexGenerator(
+                        new ElasticsearchConfiguration(config),
+                        fieldNames,
+                        dataTypes,
+                        ZoneId.systemDefault());
+        indexGenerator.open();
+        assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("my-index");
+        assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("my-index");
+    }
+
+    @Test
+    public void testDynamicIndexFromDateCompatibility() {
+        Configuration config = new Configuration();
+        config.set(INDEX_OPTION, "my-index-{log_date|yyyy/MM/dd}");
+        IndexGenerator indexGenerator =
+                IndexGeneratorFactory.createIndexGenerator(
+                        new ElasticsearchConfiguration(config),
+                        fieldNames,
+                        dataTypes,
+                        ZoneId.systemDefault());
+        indexGenerator.open();
+        
assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("my-index-2020/03/18");
+        
assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("my-index-2020/03/19");
+    }
 }

Reply via email to