This is an automated email from the ASF dual-hosted git repository.

wenjin272 pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git


The following commit(s) were added to refs/heads/main by this push:
     new 08b7d66  [FLINK-38721] Support vector search for es connector. (#137)
08b7d66 is described below

commit 08b7d661c631e8d8cc3b08785dcc37be2e7aca36
Author: Wenjin Xie <[email protected]>
AuthorDate: Tue May 19 19:46:51 2026 +0800

    [FLINK-38721] Support vector search for es connector. (#137)
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .github/workflows/push_pr.yml                      |   2 +-
 .../table/ElasticsearchDynamicSource.java          |  44 +-
 .../ElasticsearchDynamicTableFactoryBase.java      |   2 +-
 .../AbstractElasticsearchVectorSearchFunction.java | 151 +++++++
 .../elasticsearch/table/search/SearchMetric.java   |  39 ++
 .../table/search/VectorSearchUtils.java            |  78 ++++
 .../connector/elasticsearch/ElasticsearchUtil.java |  16 +-
 flink-connector-elasticsearch7/pom.xml             |   8 +
 ...ctory.java => Elasticsearch7Configuration.java} |  28 +-
 .../table/Elasticsearch7ConnectorOptions.java      |  48 +++
 .../table/Elasticsearch7DynamicSource.java         | 101 +++++
 .../table/Elasticsearch7DynamicTableFactory.java   | 101 ++++-
 .../ElasticsearchRowDataVectorSearchFunction.java  | 124 ++++++
 .../table/Elasticsearch7VectorSearchITCase.java    | 347 +++++++++++++++
 .../src/test/resources/testcontainers.properties   |  17 +
 flink-connector-elasticsearch8/pom.xml             |  40 +-
 .../elasticsearch/sink/NetworkConfig.java          |   6 +
 .../table/Elasticsearch8Configuration.java         |  12 +
 .../table/Elasticsearch8ConnectorOptions.java      |  17 +
 .../table/Elasticsearch8DynamicSource.java         | 145 +++++++
 ...java => Elasticsearch8DynamicTableFactory.java} | 140 +++---
 .../ElasticsearchRowDataVectorSearchFunction.java  | 101 +++++
 .../org.apache.flink.table.factories.Factory       |   3 +-
 .../table/Elasticsearch8DynamicSinkITCase.java     |   3 +-
 .../table/Elasticsearch8VectorSearchITCase.java    | 473 +++++++++++++++++++++
 .../src/test/resources/testcontainers.properties   |  17 +
 pom.xml                                            |   1 +
 27 files changed, 1955 insertions(+), 109 deletions(-)

diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index a21e7eb..676d1c0 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -28,7 +28,7 @@ jobs:
   compile_and_test:
     strategy:
       matrix:
-        flink: [ 2.2.1, 2.1.2 ]
+        flink: [ 2.2.1 ]
         jdk: [ '11', '17, 21' ]
     uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
     with:
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSource.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSource.java
index 83a95f9..5250b4b 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSource.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSource.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.connector.elasticsearch.table;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -24,20 +42,20 @@ import javax.annotation.Nullable;
  * from a logical description.
  */
 public class ElasticsearchDynamicSource implements LookupTableSource, 
SupportsProjectionPushDown {
-    private final DecodingFormat<DeserializationSchema<RowData>> format;
-    private final ElasticsearchConfiguration config;
-    private final int lookupMaxRetryTimes;
-    private final LookupCache lookupCache;
-    private final String docType;
-    private final String summaryString;
-    private final ElasticsearchApiCallBridge<?> apiCallBridge;
-    private DataType physicalRowDataType;
+    protected final DecodingFormat<DeserializationSchema<RowData>> format;
+    protected final ElasticsearchConfiguration config;
+    protected final int maxRetryTimes;
+    protected final LookupCache lookupCache;
+    protected final String docType;
+    protected final String summaryString;
+    protected final ElasticsearchApiCallBridge<?> apiCallBridge;
+    protected DataType physicalRowDataType;
 
     public ElasticsearchDynamicSource(
             DecodingFormat<DeserializationSchema<RowData>> format,
             ElasticsearchConfiguration config,
             DataType physicalRowDataType,
-            int lookupMaxRetryTimes,
+            int maxRetryTimes,
             String summaryString,
             ElasticsearchApiCallBridge<?> apiCallBridge,
             @Nullable LookupCache lookupCache,
@@ -45,7 +63,7 @@ public class ElasticsearchDynamicSource implements 
LookupTableSource, SupportsPr
         this.format = format;
         this.config = config;
         this.physicalRowDataType = physicalRowDataType;
-        this.lookupMaxRetryTimes = lookupMaxRetryTimes;
+        this.maxRetryTimes = maxRetryTimes;
         this.summaryString = summaryString;
         this.apiCallBridge = apiCallBridge;
         this.lookupCache = lookupCache;
@@ -68,7 +86,7 @@ public class ElasticsearchDynamicSource implements 
LookupTableSource, SupportsPr
         ElasticsearchRowDataLookupFunction<?> lookupFunction =
                 new ElasticsearchRowDataLookupFunction<>(
                         this.format.createRuntimeDecoder(context, 
physicalRowDataType),
-                        lookupMaxRetryTimes,
+                        maxRetryTimes,
                         config.getIndex(),
                         docType,
                         
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
@@ -84,7 +102,7 @@ public class ElasticsearchDynamicSource implements 
LookupTableSource, SupportsPr
         }
     }
 
-    private NetworkClientConfig buildNetworkClientConfig() {
+    protected NetworkClientConfig buildNetworkClientConfig() {
         NetworkClientConfig.Builder builder = new 
NetworkClientConfig.Builder();
         if (config.getUsername().isPresent()
                 && 
!StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())) {
@@ -123,7 +141,7 @@ public class ElasticsearchDynamicSource implements 
LookupTableSource, SupportsPr
                 format,
                 config,
                 physicalRowDataType,
-                lookupMaxRetryTimes,
+                maxRetryTimes,
                 summaryString,
                 apiCallBridge,
                 lookupCache,
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
index f223380..8a72893 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
@@ -165,7 +165,7 @@ abstract class ElasticsearchDynamicTableFactoryBase
     }
 
     @Nullable
-    private LookupCache getLookupCache(ReadableConfig tableOptions) {
+    protected LookupCache getLookupCache(ReadableConfig tableOptions) {
         LookupCache cache = null;
         if (tableOptions
                 .get(LookupOptions.CACHE_TYPE)
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/AbstractElasticsearchVectorSearchFunction.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/AbstractElasticsearchVectorSearchFunction.java
new file mode 100644
index 0000000..1478307
--- /dev/null
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/AbstractElasticsearchVectorSearchFunction.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.table.search;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.VectorSearchFunction;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base {@link VectorSearchFunction} implementation for Elasticsearch. Shared 
retry loop, result
+ * decoding and null-source filtering live here; version-specific subclasses 
only need to provide
+ * the client initialization and the search call.
+ */
+public abstract class AbstractElasticsearchVectorSearchFunction extends 
VectorSearchFunction {
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(AbstractElasticsearchVectorSearchFunction.class);
+    private static final long serialVersionUID = 1L;
+
+    protected final DeserializationSchema<RowData> deserializationSchema;
+    protected final String index;
+    protected final String searchColumn;
+    protected final String[] producedNames;
+    protected final int maxRetryTimes;
+
+    protected AbstractElasticsearchVectorSearchFunction(
+            DeserializationSchema<RowData> deserializationSchema,
+            int maxRetryTimes,
+            String index,
+            String searchColumn,
+            String[] producedNames) {
+        this.deserializationSchema =
+                checkNotNull(deserializationSchema, "No DeserializationSchema 
supplied.");
+        this.producedNames = checkNotNull(producedNames, "No fieldNames 
supplied.");
+        this.maxRetryTimes = maxRetryTimes;
+        this.index = index;
+        this.searchColumn = searchColumn;
+    }
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        doOpen(context);
+        deserializationSchema.open(null);
+    }
+
+    @Override
+    public void close() throws Exception {
+        try {
+            doClose();
+        } finally {
+            super.close();
+        }
+    }
+
+    @Override
+    public Collection<RowData> vectorSearch(int topK, RowData features) throws 
IOException {
+        for (int retry = 0; retry <= maxRetryTimes; retry++) {
+            try {
+                SearchResult[] results = doSearch(topK, features);
+                if (results.length > 0) {
+                    ArrayList<RowData> rows = new ArrayList<>(results.length);
+                    for (SearchResult result : results) {
+                        if (result.source == null) {
+                            continue;
+                        }
+                        RowData row = parseSearchResult(result.source);
+                        if (row == null) {
+                            continue;
+                        }
+                        GenericRowData scoreData = new GenericRowData(1);
+                        scoreData.setField(0, result.score);
+                        rows.add(new JoinedRowData(row, scoreData));
+                    }
+                    rows.trimToSize();
+                    return rows;
+                }
+            } catch (IOException e) {
+                LOG.error(String.format("Elasticsearch search error, retry 
times = %d", retry), e);
+                if (retry >= maxRetryTimes) {
+                    throw new FlinkRuntimeException("Execution of 
Elasticsearch search failed.", e);
+                }
+                try {
+                    Thread.sleep(1000L * retry);
+                } catch (InterruptedException e1) {
+                    LOG.warn(
+                            "Interrupted while waiting to retry failed 
elasticsearch search, aborting");
+                    throw new FlinkRuntimeException(e1);
+                }
+            }
+        }
+        return Collections.emptyList();
+    }
+
+    /** Version-specific initialization (e.g., creating the underlying 
Elasticsearch client). */
+    protected abstract void doOpen(FunctionContext context) throws Exception;
+
+    /** Version-specific resource release (e.g., closing the underlying 
Elasticsearch client). */
+    protected abstract void doClose() throws Exception;
+
+    /** Execute a single vector search call and return raw results, excluding 
nothing. */
+    protected abstract SearchResult[] doSearch(int topK, RowData features) 
throws IOException;
+
+    private RowData parseSearchResult(String result) {
+        try {
+            return deserializationSchema.deserialize(result.getBytes());
+        } catch (IOException e) {
+            LOG.error("Deserialize search hit failed: " + e.getMessage());
+            return null;
+        }
+    }
+
+    /** One hit from Elasticsearch — raw JSON source plus score. */
+    protected static class SearchResult {
+        final String source;
+        final Double score;
+
+        public SearchResult(String source, Double score) {
+            this.source = source;
+            this.score = score;
+        }
+    }
+}
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/SearchMetric.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/SearchMetric.java
new file mode 100644
index 0000000..5e42f7c
--- /dev/null
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/SearchMetric.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.table.search;
+
+/** Metric for vector search. */
+public enum SearchMetric {
+    COSINE_SIMILARITY("cosineSimilarity"),
+    L1NORM("l1norm"),
+    L2NORM("l2norm"),
+    HAMMING("hamming"),
+    DOT_PRODUCT("dotProduct");
+
+    private final String name;
+
+    SearchMetric(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+}
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/VectorSearchUtils.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/VectorSearchUtils.java
new file mode 100644
index 0000000..54fbbaf
--- /dev/null
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/VectorSearchUtils.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.table.search;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.source.VectorSearchTableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+/** Shared helpers for the Elasticsearch vector search table sources. */
+public class VectorSearchUtils {
+
+    private VectorSearchUtils() {}
+
+    /**
+     * Validates the search columns declared on the given context and returns 
the resolved physical
+     * column name. Elasticsearch only supports a single, non-nested 
float-array column.
+     */
+    public static String resolveSearchColumn(
+            DataType physicalRowDataType,
+            VectorSearchTableSource.VectorSearchContext vectorSearchContext) {
+        int[][] searchColumns = vectorSearchContext.getSearchColumns();
+
+        if (searchColumns.length != 1) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Elasticsearch only supports one search columns 
now, but input search columns size is %d.",
+                            searchColumns.length));
+        }
+        int[] searchColumn = searchColumns[0];
+        if (searchColumn.length != 1) {
+            throw new IllegalArgumentException(
+                    "Elasticsearch doesn't support to search data using nested 
columns.");
+        }
+        int searchColumnIndex = searchColumn[0];
+
+        if (searchColumnIndex < 0
+                || searchColumnIndex >= 
physicalRowDataType.getChildren().size()) {
+            throw new ValidationException(
+                    String.format(
+                            "The specified search column with index %d doesn't 
exist in schema.",
+                            searchColumnIndex));
+        }
+
+        DataType searchColumnType = 
physicalRowDataType.getChildren().get(searchColumnIndex);
+        if (!searchColumnType.getLogicalType().is(LogicalTypeRoot.ARRAY)
+                || !((ArrayType) searchColumnType.getLogicalType())
+                        .getElementType()
+                        .is(LogicalTypeRoot.FLOAT)) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Elasticsearch only supports search data using 
float vector now, but input search column type is %s.",
+                            searchColumnType));
+        }
+
+        return ((RowType) physicalRowDataType.getLogicalType())
+                .getFieldNames()
+                .get(searchColumnIndex);
+    }
+}
diff --git 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/ElasticsearchUtil.java
 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/ElasticsearchUtil.java
index f7126b7..2149bcb 100644
--- 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/ElasticsearchUtil.java
+++ 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/ElasticsearchUtil.java
@@ -26,9 +26,11 @@ import org.apache.flink.table.types.logical.LogicalType;
 
 import org.slf4j.Logger;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
 import org.testcontainers.utility.DockerImageName;
 
+import java.time.Duration;
 import java.util.Optional;
 
 /** Collection of utility methods for Elasticsearch tests. */
@@ -62,10 +64,16 @@ public class ElasticsearchUtil {
             logLevel = "OFF";
         }
 
-        return new 
ElasticsearchContainer(DockerImageName.parse(dockerImageVersion))
-                .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
-                .withEnv("logger.org.elasticsearch", logLevel)
-                .withLogConsumer(new Slf4jLogConsumer(log));
+        ElasticsearchContainer container =
+                new 
ElasticsearchContainer(DockerImageName.parse(dockerImageVersion))
+                        .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
+                        .withEnv("logger.org.elasticsearch", logLevel)
+                        .withLogConsumer(new Slf4jLogConsumer(log));
+
+        container.setWaitStrategy(
+                
Wait.defaultWaitStrategy().withStartupTimeout(Duration.ofMinutes(1)));
+
+        return container;
     }
 
     /** A mock {@link DynamicTableSink.Context} for Elasticsearch tests. */
diff --git a/flink-connector-elasticsearch7/pom.xml 
b/flink-connector-elasticsearch7/pom.xml
index f8cbbf4..b0b8014 100644
--- a/flink-connector-elasticsearch7/pom.xml
+++ b/flink-connector-elasticsearch7/pom.xml
@@ -165,6 +165,14 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+                       <version>${flink.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-table-runtime</artifactId>
diff --git 
a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicTableFactory.java
 
b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7Configuration.java
similarity index 50%
copy from 
flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicTableFactory.java
copy to 
flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7Configuration.java
index 2f6d884..8b373c7 100644
--- 
a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicTableFactory.java
+++ 
b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7Configuration.java
@@ -18,23 +18,23 @@
 
 package org.apache.flink.connector.elasticsearch.table;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.elasticsearch.Elasticsearch7ApiCallBridge;
-import org.apache.flink.connector.elasticsearch.ElasticsearchApiCallBridge;
-import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
-import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.elasticsearch.table.search.SearchMetric;
 
-/** A {@link DynamicTableSinkFactory} for discovering {@link 
ElasticsearchDynamicSink}. */
-@Internal
-public class Elasticsearch7DynamicTableFactory extends 
ElasticsearchDynamicTableFactoryBase {
-    private static final String FACTORY_IDENTIFIER = "elasticsearch-7";
+import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch7ConnectorOptions.MAX_RETRIES;
+import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch7ConnectorOptions.VECTOR_SEARCH_METRIC;
 
-    public Elasticsearch7DynamicTableFactory() {
-        super(FACTORY_IDENTIFIER, Elasticsearch7SinkBuilder::new);
+/** Elasticsearch 7 specific configuration. */
+public class Elasticsearch7Configuration extends ElasticsearchConfiguration {
+    Elasticsearch7Configuration(ReadableConfig config) {
+        super(config);
     }
 
-    @Override
-    ElasticsearchApiCallBridge<?> getElasticsearchApiCallBridge() {
-        return new Elasticsearch7ApiCallBridge();
+    public int getMaxRetries() {
+        return config.get(MAX_RETRIES);
+    }
+
+    public SearchMetric getVectorSearchMetric() {
+        return config.get(VECTOR_SEARCH_METRIC);
     }
 }
diff --git 
a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7ConnectorOptions.java
 
b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7ConnectorOptions.java
new file mode 100644
index 0000000..56fbaf2
--- /dev/null
+++ 
b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7ConnectorOptions.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.connector.elasticsearch.table.search.SearchMetric;
+
+/**
+ * Options specific for the Elasticsearch 7 connector. Public so that the 
{@link
+ * org.apache.flink.table.api.TableDescriptor} can access it.
+ */
+@PublicEvolving
+public class Elasticsearch7ConnectorOptions extends 
ElasticsearchConnectorOptions {
+    private Elasticsearch7ConnectorOptions() {}
+
+    public static final ConfigOption<Integer> MAX_RETRIES =
+            ConfigOptions.key("max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withFallbackKeys("lookup.max-retries")
+                    .withDescription(
+                            "The maximum allowed retries if a lookup/search 
operation fails.");
+
+    public static final ConfigOption<SearchMetric> VECTOR_SEARCH_METRIC =
+            ConfigOptions.key("vector-search.metric")
+                    .enumType(SearchMetric.class)
+                    .defaultValue(SearchMetric.COSINE_SIMILARITY)
+                    .withDescription(
+                            "The metric of vector search, by default is 
cosineSimilarity.");
+}
diff --git 
a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSource.java
 
b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSource.java
new file mode 100644
index 0000000..1f18281
--- /dev/null
+++ 
b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSource.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.connector.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.connector.elasticsearch.NetworkClientConfig;
+import 
org.apache.flink.connector.elasticsearch.table.search.ElasticsearchRowDataVectorSearchFunction;
+import org.apache.flink.connector.elasticsearch.table.search.VectorSearchUtils;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.VectorSearchTableSource;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import 
org.apache.flink.table.connector.source.search.VectorSearchFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import org.elasticsearch.client.RestHighLevelClient;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a {@link 
Elasticsearch7DynamicSource}
+ * from a logical description.
+ */
+public class Elasticsearch7DynamicSource extends ElasticsearchDynamicSource
+        implements VectorSearchTableSource {
+
+    public Elasticsearch7DynamicSource(
+            DecodingFormat<DeserializationSchema<RowData>> format,
+            ElasticsearchConfiguration config,
+            DataType physicalRowDataType,
+            int maxRetryTimes,
+            String summaryString,
+            ElasticsearchApiCallBridge<RestHighLevelClient> apiCallBridge,
+            @Nullable LookupCache lookupCache,
+            @Nullable String docType) {
+        super(
+                format,
+                config,
+                physicalRowDataType,
+                maxRetryTimes,
+                summaryString,
+                apiCallBridge,
+                lookupCache,
+                docType);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public DynamicTableSource copy() {
+        return new Elasticsearch7DynamicSource(
+                format,
+                config,
+                physicalRowDataType,
+                maxRetryTimes,
+                summaryString,
+                (ElasticsearchApiCallBridge<RestHighLevelClient>) 
apiCallBridge,
+                lookupCache,
+                docType);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public VectorSearchRuntimeProvider getSearchRuntimeProvider(
+            VectorSearchContext vectorSearchContext) {
+
+        NetworkClientConfig networkClientConfig = buildNetworkClientConfig();
+
+        ElasticsearchRowDataVectorSearchFunction vectorSearchFunction =
+                new ElasticsearchRowDataVectorSearchFunction(
+                        this.format.createRuntimeDecoder(vectorSearchContext, 
physicalRowDataType),
+                        this.maxRetryTimes,
+                        ((Elasticsearch7Configuration) 
config).getVectorSearchMetric(),
+                        config.getIndex(),
+                        VectorSearchUtils.resolveSearchColumn(
+                                physicalRowDataType, vectorSearchContext),
+                        
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
+                        config.getHosts(),
+                        networkClientConfig,
+                        (ElasticsearchApiCallBridge<RestHighLevelClient>) 
apiCallBridge);
+
+        return VectorSearchFunctionProvider.of(vectorSearchFunction);
+    }
+}
diff --git 
a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicTableFactory.java
 
b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicTableFactory.java
index 2f6d884..755175f 100644
--- 
a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicTableFactory.java
+++ 
b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicTableFactory.java
@@ -19,10 +19,49 @@
 package org.apache.flink.connector.elasticsearch.table;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.elasticsearch.Elasticsearch7ApiCallBridge;
 import org.apache.flink.connector.elasticsearch.ElasticsearchApiCallBridge;
 import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch7ConnectorOptions.MAX_RETRIES;
+import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch7ConnectorOptions.VECTOR_SEARCH_METRIC;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_REQUEST_TIMEOUT;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_TIMEOUT;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.DELIVERY_GUARANTEE_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.FORMAT_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_MAX_ROWS;
+import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.elasticsearch.common.Strings.capitalize;
 
 /** A {@link DynamicTableSinkFactory} for discovering {@link 
ElasticsearchDynamicSink}. */
 @Internal
@@ -34,7 +73,67 @@ public class Elasticsearch7DynamicTableFactory extends 
ElasticsearchDynamicTable
     }
 
     @Override
-    ElasticsearchApiCallBridge<?> getElasticsearchApiCallBridge() {
+    ElasticsearchConfiguration getConfiguration(FactoryUtil.TableFactoryHelper 
helper) {
+        return new Elasticsearch7Configuration(helper.getOptions());
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        final FactoryUtil.TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(this, context);
+        final ReadableConfig options = helper.getOptions();
+        final DecodingFormat<DeserializationSchema<RowData>> format =
+                helper.discoverDecodingFormat(
+                        DeserializationFormatFactory.class,
+                        
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions
+                                .FORMAT_OPTION);
+
+        Elasticsearch7Configuration config = (Elasticsearch7Configuration) 
getConfiguration(helper);
+        helper.validate();
+        validateConfiguration(config);
+
+        return new Elasticsearch7DynamicSource(
+                format,
+                config,
+                context.getPhysicalRowDataType(),
+                config.getMaxRetries(),
+                capitalize(FACTORY_IDENTIFIER),
+                getElasticsearchApiCallBridge(),
+                getLookupCache(options),
+                getDocumentType(config));
+    }
+
+    @Override
+    ElasticsearchApiCallBridge<RestHighLevelClient> 
getElasticsearchApiCallBridge() {
         return new Elasticsearch7ApiCallBridge();
     }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return Stream.of(
+                        KEY_DELIMITER_OPTION,
+                        BULK_FLUSH_MAX_SIZE_OPTION,
+                        BULK_FLUSH_MAX_ACTIONS_OPTION,
+                        BULK_FLUSH_INTERVAL_OPTION,
+                        BULK_FLUSH_BACKOFF_TYPE_OPTION,
+                        BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+                        BULK_FLUSH_BACKOFF_DELAY_OPTION,
+                        CONNECTION_PATH_PREFIX_OPTION,
+                        CONNECTION_REQUEST_TIMEOUT,
+                        CONNECTION_TIMEOUT,
+                        SOCKET_TIMEOUT,
+                        FORMAT_OPTION,
+                        DELIVERY_GUARANTEE_OPTION,
+                        PASSWORD_OPTION,
+                        USERNAME_OPTION,
+                        SINK_PARALLELISM,
+                        CACHE_TYPE,
+                        PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
+                        PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
+                        PARTIAL_CACHE_MAX_ROWS,
+                        PARTIAL_CACHE_CACHE_MISSING_KEY,
+                        MAX_RETRIES,
+                        VECTOR_SEARCH_METRIC)
+                .collect(Collectors.toSet());
+    }
 }
diff --git 
a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/search/ElasticsearchRowDataVectorSearchFunction.java
 
b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/search/ElasticsearchRowDataVectorSearchFunction.java
new file mode 100644
index 0000000..66f19cb
--- /dev/null
+++ 
b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/search/ElasticsearchRowDataVectorSearchFunction.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.table.search;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.connector.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.connector.elasticsearch.NetworkClientConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.VectorSearchFunction;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.index.query.MatchAllQueryBuilder;
+import org.elasticsearch.index.query.functionscore.ScriptScoreQueryBuilder;
+import org.elasticsearch.script.Script;
+import org.elasticsearch.script.ScriptType;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The {@link VectorSearchFunction} implementation for Elasticsearch 7. */
+public class ElasticsearchRowDataVectorSearchFunction
+        extends AbstractElasticsearchVectorSearchFunction {
+    private static final long serialVersionUID = 1L;
+    private static final String QUERY_VECTOR = "query_vector";
+
+    private final ElasticsearchApiCallBridge<RestHighLevelClient> callBridge;
+    private final NetworkClientConfig networkClientConfig;
+    private final List<HttpHost> hosts;
+    private final String scriptScore;
+
+    private transient RestHighLevelClient client;
+    private transient SearchRequest searchRequest;
+    private transient SearchSourceBuilder searchSourceBuilder;
+
+    public ElasticsearchRowDataVectorSearchFunction(
+            DeserializationSchema<RowData> deserializationSchema,
+            int maxRetryTimes,
+            SearchMetric searchMetric,
+            String index,
+            String searchColumn,
+            String[] producedNames,
+            List<HttpHost> hosts,
+            NetworkClientConfig networkClientConfig,
+            ElasticsearchApiCallBridge<RestHighLevelClient> callBridge) {
+        super(deserializationSchema, maxRetryTimes, index, searchColumn, 
producedNames);
+        this.networkClientConfig =
+                checkNotNull(networkClientConfig, "No networkClientConfig 
supplied.");
+        this.hosts = checkNotNull(hosts, "No hosts supplied.");
+        this.callBridge = checkNotNull(callBridge, "No 
ElasticsearchApiCallBridge supplied.");
+        this.scriptScore =
+                String.format(
+                        "%s(params.%s, '%s') + 1.0",
+                        searchMetric.toString(), QUERY_VECTOR, searchColumn);
+    }
+
+    @Override
+    protected void doOpen(FunctionContext context) {
+        this.client = callBridge.createClient(networkClientConfig, hosts);
+
+        // Reuse searchRequest / searchSourceBuilder across invocations to 
avoid rebuilding them
+        // per record.
+        this.searchRequest = new SearchRequest(index);
+        this.searchSourceBuilder = new SearchSourceBuilder();
+        this.searchSourceBuilder.fetchSource(producedNames, null);
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        if (client != null) {
+            client.close();
+            client = null;
+        }
+    }
+
+    @Override
+    protected SearchResult[] doSearch(int topK, RowData features) throws 
IOException {
+        // Elasticsearch 7.x doesn't support ANN, we use script score to 
achieve exact matching.
+        Map<String, Object> params =
+                Collections.singletonMap(QUERY_VECTOR, 
features.getArray(0).toFloatArray());
+
+        Script script = new Script(ScriptType.INLINE, "painless", scriptScore, 
params);
+        ScriptScoreQueryBuilder scriptScoreQuery =
+                new ScriptScoreQueryBuilder(new MatchAllQueryBuilder(), 
script);
+
+        searchSourceBuilder.query(scriptScoreQuery).size(topK);
+        searchRequest.source(searchSourceBuilder);
+
+        SearchResponse searchResponse = client.search(searchRequest, 
RequestOptions.DEFAULT);
+        SearchHit[] searchHits = searchResponse.getHits().getHits();
+
+        return Stream.of(searchHits)
+                .filter(hit -> hit.getSourceAsString() != null)
+                .map(hit -> new SearchResult(hit.getSourceAsString(), (double) 
hit.getScore()))
+                .toArray(SearchResult[]::new);
+    }
+}
diff --git 
a/flink-connector-elasticsearch7/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7VectorSearchITCase.java
 
b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7VectorSearchITCase.java
new file mode 100644
index 0000000..8f6c249
--- /dev/null
+++ 
b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7VectorSearchITCase.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.connector.elasticsearch.ElasticsearchUtil;
+import org.apache.flink.connector.elasticsearch.test.DockerImageVersions;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.Expressions.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** {@code VECTOR_SEARCH } ITCase for Elasticsearch. */
+@Testcontainers
+public class Elasticsearch7VectorSearchITCase {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(Elasticsearch7VectorSearchITCase.class);
+
+    private static final int PARALLELISM = 2;
+
+    @Container
+    private static final ElasticsearchContainer ES_CONTAINER =
+            ElasticsearchUtil.createElasticsearchContainer(
+                    DockerImageVersions.ELASTICSEARCH_7, LOG);
+
+    String getElasticsearchHttpHostAddress() {
+        return ES_CONTAINER.getHttpHostAddress();
+    }
+
+    private RestHighLevelClient getClient() {
+        return new RestHighLevelClient(
+                
RestClient.builder(HttpHost.create(getElasticsearchHttpHostAddress())));
+    }
+
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .build());
+
+    private final List<Row> inputData =
+            Arrays.asList(
+                    Row.of(1L, "Spark", new Float[] {5f, 12f, 13f}),
+                    Row.of(2L, "Flink", new Float[] {-5f, -12f, -13f}));
+
+    private TableEnvironment tEnv;
+
+    @BeforeEach
+    void beforeEach() {
+        tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+    }
+
+    @Test
+    public void testSearchFullTypeVectorTable() throws Exception {
+        String index = "table_with_all_supported_types";
+        createFullTypesIndex(index);
+        tEnv.executeSql(
+                "CREATE TABLE esTable ("
+                        + "  id BIGINT,\n"
+                        + "  f1 STRING,\n"
+                        + "  f2 BOOLEAN,\n"
+                        + "  f3 TINYINT,\n"
+                        + "  f4 SMALLINT,\n"
+                        + "  f5 INTEGER,\n"
+                        + "  f6 DATE,\n"
+                        + "  f7 TIMESTAMP,\n"
+                        + "  f8 FLOAT,\n"
+                        + "  f9 DOUBLE,\n"
+                        + "  f10 ARRAY<FLOAT>,\n"
+                        + "  f11 ARRAY<DOUBLE>,\n"
+                        + "  f12 ARRAY<INTEGER>,\n"
+                        + "  f13 ARRAY<BIGINT>,\n"
+                        + "  PRIMARY KEY (id) NOT ENFORCED\n"
+                        + ")\n"
+                        + "WITH (\n"
+                        + String.format("'%s'='%s',\n", "connector", 
"elasticsearch-7")
+                        + String.format(
+                                "'%s'='%s',\n",
+                                
ElasticsearchConnectorOptions.INDEX_OPTION.key(), index)
+                        + String.format(
+                                "'%s'='%s'\n",
+                                
ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                ES_CONTAINER.getHttpHostAddress())
+                        + ")");
+
+        tEnv.fromValues(
+                        row(
+                                1,
+                                "ABCDE",
+                                true,
+                                (byte) 127,
+                                (short) 257,
+                                65535,
+                                LocalDate.ofEpochDay(12345),
+                                LocalDateTime.parse("2012-12-12T12:12:12"),
+                                11.11f,
+                                12.22d,
+                                new Float[] {11.11f, 11.12f},
+                                new Double[] {12.22d, 12.22d},
+                                new int[] {Integer.MIN_VALUE, 
Integer.MAX_VALUE},
+                                new long[] {Long.MIN_VALUE, Long.MAX_VALUE}))
+                .executeInsert("esTable")
+                .await();
+
+        // Wait for es construct index.
+        Thread.sleep(2000);
+
+        List<String> rows =
+                CollectionUtil.iteratorToList(
+                                tEnv.executeSql(
+                                                "WITH t(id, vector) AS (SELECT 
* FROM (VALUES (1, CAST(ARRAY[11.11, 1] AS ARRAY<FLOAT>))))\n"
+                                                        + "SELECT * FROM t, 
LATERAL TABLE(VECTOR_SEARCH(TABLE esTable, DESCRIPTOR(f10), t.vector, 3))\n")
+                                        .collect())
+                        .stream()
+                        .map(Row::toString)
+                        .collect(Collectors.toList());
+        assertThat(rows)
+                .isEqualTo(
+                        Collections.singletonList(
+                                "+I[1, [11.11, 1.0], 1, ABCDE, true, 127, 257, 
65535, 2003-10-20, 2012-12-12T12:12:12, 11.11, 12.22, [11.11, 11.12], [12.22, 
12.22], [-2147483648, 2147483647], [-9223372036854775808, 9223372036854775807], 
1.767361044883728]"));
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"cosineSimilarity", "l1norm", "l2norm", "hamming", 
"dotProduct"})
+    void testSearchUsingFloatArray(String metric) throws Exception {
+        String index = "table_with_multiple_data_with_" + metric.toLowerCase();
+        createSimpleIndex(index);
+        tEnv.executeSql(
+                "CREATE TABLE es_table("
+                        + "  id BIGINT,"
+                        + "  label STRING,"
+                        + "  vector ARRAY<FLOAT>"
+                        + ")\n WITH (\n"
+                        + String.format("'%s'='%s',\n", "connector", 
"elasticsearch-7")
+                        + String.format(
+                                "'%s'='%s',\n",
+                                
ElasticsearchConnectorOptions.INDEX_OPTION.key(), index)
+                        + String.format(
+                                "'%s'='%s'\n",
+                                
ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                ES_CONTAINER.getHttpHostAddress())
+                        + ")");
+
+        tEnv.fromValues(
+                        row(1L, "Batch", new Float[] {5f, 12f, 13f}),
+                        row(2L, "Streaming", new Float[] {-5f, -12f, -13f}),
+                        row(3L, "Big Data", new Float[] {1f, 1f, 0f}))
+                .executeInsert("es_table")
+                .await();
+
+        // Wait for es construct index.
+        Thread.sleep(2000);
+
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE src(\n"
+                                + "  id BIGINT PRIMARY KEY NOT ENFORCED,\n"
+                                + "  content STRING,\n"
+                                + "  index ARRAY<FLOAT>\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'data-id' = '%s'\n"
+                                + ");\n",
+                        TestValuesTableFactory.registerData(inputData)));
+        assertThat(
+                        CollectionUtil.iteratorToList(
+                                        tEnv.executeSql(
+                                                        "SELECT content, label 
FROM src, LATERAL TABLE(VECTOR_SEARCH(TABLE es_table, DESCRIPTOR(vector), 
src.index, 2))")
+                                                .collect())
+                                .stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .isEqualTo(
+                        Arrays.asList(
+                                "+I[Spark, Batch]",
+                                "+I[Spark, Big Data]",
+                                "+I[Flink, Streaming]",
+                                "+I[Flink, Big Data]"));
+    }
+
+    private void createFullTypesIndex(String index) throws IOException {
+        XContentBuilder mappingBuilder = XContentFactory.jsonBuilder();
+        mappingBuilder.startObject();
+        mappingBuilder.startObject("properties");
+
+        // id: long
+        mappingBuilder.startObject("id");
+        mappingBuilder.field("type", "long");
+        mappingBuilder.endObject();
+
+        // f1: string
+        mappingBuilder.startObject("f1");
+        mappingBuilder.field("type", "text");
+        mappingBuilder.endObject();
+
+        // f2: boolean
+        mappingBuilder.startObject("f2");
+        mappingBuilder.field("type", "boolean");
+        mappingBuilder.endObject();
+
+        // f3: tinyint
+        mappingBuilder.startObject("f3");
+        mappingBuilder.field("type", "byte");
+        mappingBuilder.endObject();
+
+        // f4: long
+        mappingBuilder.startObject("f4");
+        mappingBuilder.field("type", "short");
+        mappingBuilder.endObject();
+
+        // f5: long
+        mappingBuilder.startObject("f5");
+        mappingBuilder.field("type", "integer");
+        mappingBuilder.endObject();
+
+        // f6: date
+        mappingBuilder.startObject("f6");
+        mappingBuilder.field("type", "date");
+        mappingBuilder.endObject();
+
+        // f7: timestamp
+        mappingBuilder.startObject("f7");
+        mappingBuilder.field("type", "text");
+        mappingBuilder.endObject();
+
+        // f8: float
+        mappingBuilder.startObject("f8");
+        mappingBuilder.field("type", "float");
+        mappingBuilder.endObject();
+
+        // f9: double
+        mappingBuilder.startObject("f9");
+        mappingBuilder.field("type", "double");
+        mappingBuilder.endObject();
+
+        // f10: Array<Float>
+        mappingBuilder.startObject("f10");
+        mappingBuilder.field("type", "dense_vector");
+        mappingBuilder.field("dims", 2);
+        mappingBuilder.endObject();
+
+        // f11: Array<Double>
+        mappingBuilder.startObject("f11");
+        mappingBuilder.field("type", "dense_vector");
+        mappingBuilder.field("dims", 2);
+        mappingBuilder.endObject();
+
+        // f12: Array<Integer>
+        mappingBuilder.startObject("f12");
+        mappingBuilder.field("type", "dense_vector");
+        mappingBuilder.field("dims", 2);
+        mappingBuilder.endObject();
+
+        // f13: Array<Long>
+        mappingBuilder.startObject("f13");
+        mappingBuilder.field("type", "dense_vector");
+        mappingBuilder.field("dims", 2);
+        mappingBuilder.endObject();
+
+        mappingBuilder.endObject(); // end properties
+        mappingBuilder.endObject(); // end root
+
+        CreateIndexRequest request = new CreateIndexRequest(index);
+        request.mapping(mappingBuilder);
+
+        this.getClient().indices().create(request, RequestOptions.DEFAULT);
+    }
+
+    private void createSimpleIndex(String index) throws IOException {
+        XContentBuilder mappingBuilder = XContentFactory.jsonBuilder();
+        mappingBuilder.startObject();
+        mappingBuilder.startObject("properties");
+
+        // id: long
+        mappingBuilder.startObject("id");
+        mappingBuilder.field("type", "long");
+        mappingBuilder.endObject();
+
+        // f1: string
+        mappingBuilder.startObject("label");
+        mappingBuilder.field("type", "text");
+        mappingBuilder.endObject();
+
+        // f2: float vector
+        mappingBuilder.startObject("vector");
+        mappingBuilder.field("type", "dense_vector");
+        mappingBuilder.field("dims", 3);
+        mappingBuilder.endObject();
+
+        mappingBuilder.endObject(); // end properties
+        mappingBuilder.endObject(); // end root
+
+        CreateIndexRequest request = new CreateIndexRequest(index);
+        request.mapping(mappingBuilder);
+
+        this.getClient().indices().create(request, RequestOptions.DEFAULT);
+    }
+}
diff --git 
a/flink-connector-elasticsearch7/src/test/resources/testcontainers.properties 
b/flink-connector-elasticsearch7/src/test/resources/testcontainers.properties
new file mode 100644
index 0000000..07514cc
--- /dev/null
+++ 
b/flink-connector-elasticsearch7/src/test/resources/testcontainers.properties
@@ -0,0 +1,17 @@
+################################################################################
+#  Copyright 2023 Ververica Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+ryuk.container.image = testcontainers/ryuk:0.6.0
diff --git a/flink-connector-elasticsearch8/pom.xml 
b/flink-connector-elasticsearch8/pom.xml
index e86af07..9e8962f 100644
--- a/flink-connector-elasticsearch8/pom.xml
+++ b/flink-connector-elasticsearch8/pom.xml
@@ -88,6 +88,23 @@ under the License.
                        <version>${jackson.version}</version>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-elasticsearch-base</artifactId>
+                       <version>${project.version}</version>
+                       <exclusions>
+                               <!-- Exclude the base module elasticsearch -->
+                               <exclusion>
+                                       <groupId>org.elasticsearch</groupId>
+                                       <artifactId>elasticsearch</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       
<groupId>org.elasticsearch.client</groupId>
+                                       
<artifactId>elasticsearch-rest-high-level-client</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
                <!-- Dependency for Elasticsearch 8.x Java Client -->
                <dependency>
                        <groupId>co.elastic.clients</groupId>
@@ -123,45 +140,46 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
-               <!-- Elasticsearch table sink factory testing -->
+               <!-- Table API integration tests -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-json</artifactId>
+                       <artifactId>flink-table-planner-loader</artifactId>
                        <version>${flink.version}</version>
                        <scope>test</scope>
                </dependency>
 
-               <!-- ArchUit test dependencies -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-architecture-tests-test</artifactId>
+                       
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+                       <version>${flink.version}</version>
+                       <type>test-jar</type>
                        <scope>test</scope>
                </dependency>
 
+               <!-- Elasticsearch table sink factory testing -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-architecture-tests-production</artifactId>
+                       <artifactId>flink-json</artifactId>
+                       <version>${flink.version}</version>
                        <scope>test</scope>
                </dependency>
 
+               <!-- ArchUit test dependencies -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-connector-base</artifactId>
-                       <version>${flink.version}</version>
-                       <type>test-jar</type>
+                       <artifactId>flink-architecture-tests-test</artifactId>
                        <scope>test</scope>
                </dependency>
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-table-planner-loader</artifactId>
-                       <version>${flink.version}</version>
+                       
<artifactId>flink-architecture-tests-production</artifactId>
                        <scope>test</scope>
                </dependency>
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-table-planner_2.12</artifactId>
+                       <artifactId>flink-connector-base</artifactId>
                        <version>${flink.version}</version>
                        <type>test-jar</type>
                        <scope>test</scope>
diff --git 
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkConfig.java
 
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkConfig.java
index 93ecd78..34447bf 100644
--- 
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkConfig.java
+++ 
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkConfig.java
@@ -24,6 +24,7 @@ package org.apache.flink.connector.elasticsearch.sink;
 import org.apache.flink.util.function.SerializableSupplier;
 
 import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
 import co.elastic.clients.json.jackson.JacksonJsonpMapper;
 import co.elastic.clients.transport.rest_client.RestClientTransport;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -108,6 +109,11 @@ public class NetworkConfig implements Serializable {
                 new RestClientTransport(this.getRestClient(), new 
JacksonJsonpMapper(mapper)));
     }
 
+    public ElasticsearchClient createEsSyncClient() {
+        return new ElasticsearchClient(
+                new RestClientTransport(this.getRestClient(), new 
JacksonJsonpMapper()));
+    }
+
     private RestClient getRestClient() {
         RestClientBuilder restClientBuilder =
                 RestClient.builder(hosts.toArray(new HttpHost[0]))
diff --git 
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8Configuration.java
 
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8Configuration.java
index 9e3e8bf..351fa55 100644
--- 
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8Configuration.java
+++ 
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8Configuration.java
@@ -46,6 +46,8 @@ import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch8Conne
 import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.HOSTS_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.INDEX_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.KEY_DELIMITER_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.MAX_RETRIES;
+import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.NUM_CANDIDATES;
 import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.PASSWORD_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.SOCKET_TIMEOUT;
 import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.SSL_CERTIFICATE_FINGERPRINT;
@@ -132,6 +134,16 @@ public class Elasticsearch8Configuration {
         return config.getOptional(SINK_PARALLELISM);
     }
 
+    // --- Lookup / vector search accessors 
--------------------------------------------------
+
+    public int getMaxRetries() {
+        return config.get(MAX_RETRIES);
+    }
+
+    public int getNumCandidates() {
+        return config.get(NUM_CANDIDATES);
+    }
+
     /**
      * Parse Hosts String to list.
      *
diff --git 
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8ConnectorOptions.java
 
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8ConnectorOptions.java
index 1defeba..74a450b 100644
--- 
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8ConnectorOptions.java
+++ 
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8ConnectorOptions.java
@@ -149,4 +149,21 @@ public class Elasticsearch8ConnectorOptions {
                     .enumType(DeliveryGuarantee.class)
                     .defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)
                     .withDescription("Optional delivery guarantee when 
committing.");
+
+    // --- Lookup / vector search options 
----------------------------------------------------
+
+    public static final ConfigOption<Integer> MAX_RETRIES =
+            ConfigOptions.key("max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withFallbackKeys("lookup.max-retries")
+                    .withDescription(
+                            "The maximum allowed retries if a lookup/search 
operation fails.");
+
+    public static final ConfigOption<Integer> NUM_CANDIDATES =
+            ConfigOptions.key("vector-search.num-candidates")
+                    .intType()
+                    .defaultValue(100)
+                    .withDescription(
+                            "The number of candidate neighbors considered for 
each shard during the vector search.");
 }
diff --git 
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSource.java
 
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSource.java
new file mode 100644
index 0000000..23c1eb6
--- /dev/null
+++ 
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSource.java
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.connector.elasticsearch.sink.NetworkConfig;
+import 
org.apache.flink.connector.elasticsearch.table.search.ElasticsearchRowDataVectorSearchFunction;
+import org.apache.flink.connector.elasticsearch.table.search.VectorSearchUtils;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.VectorSearchTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import 
org.apache.flink.table.connector.source.search.VectorSearchFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import co.elastic.clients.transport.TransportUtils;
+import org.apache.http.HttpHost;
+
+import javax.net.ssl.SSLContext;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a {@link 
Elasticsearch8DynamicSource}
+ * from a logical description.
+ */
+public class Elasticsearch8DynamicSource
+        implements VectorSearchTableSource, SupportsProjectionPushDown {
+
+    protected final DecodingFormat<DeserializationSchema<RowData>> format;
+    protected final Elasticsearch8Configuration config;
+    private final String summaryString;
+    protected DataType physicalRowDataType;
+
+    public Elasticsearch8DynamicSource(
+            DecodingFormat<DeserializationSchema<RowData>> format,
+            Elasticsearch8Configuration config,
+            DataType physicalRowDataType,
+            String summaryString) {
+        this.format = format;
+        this.config = config;
+        this.physicalRowDataType = physicalRowDataType;
+        this.summaryString = summaryString;
+    }
+
+    @Override
+    public VectorSearchRuntimeProvider getSearchRuntimeProvider(
+            VectorSearchContext vectorSearchContext) {
+
+        ElasticsearchRowDataVectorSearchFunction vectorSearchFunction =
+                new ElasticsearchRowDataVectorSearchFunction(
+                        format.createRuntimeDecoder(vectorSearchContext, 
physicalRowDataType),
+                        config.getMaxRetries(),
+                        config.getNumCandidates(),
+                        config.getIndex(),
+                        VectorSearchUtils.resolveSearchColumn(
+                                physicalRowDataType, vectorSearchContext),
+                        
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
+                        buildNetworkConfig());
+
+        return VectorSearchFunctionProvider.of(vectorSearchFunction);
+    }
+
+    private NetworkConfig buildNetworkConfig() {
+        List<HttpHost> hosts = config.getHosts();
+        checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");
+
+        String username =
+                config.getUsername()
+                        .filter(v -> !StringUtils.isNullOrWhitespaceOnly(v))
+                        .orElse(null);
+        String password =
+                config.getPassword()
+                        .filter(v -> !StringUtils.isNullOrWhitespaceOnly(v))
+                        .orElse(null);
+        String pathPrefix =
+                config.getPathPrefix()
+                        .filter(v -> !StringUtils.isNullOrWhitespaceOnly(v))
+                        .orElse(null);
+
+        SerializableSupplier<SSLContext> sslContextSupplier =
+                config.getCertificateFingerprint()
+                        .filter(v -> !StringUtils.isNullOrWhitespaceOnly(v))
+                        .<SerializableSupplier<SSLContext>>map(
+                                fp -> () -> 
TransportUtils.sslContextFromCaFingerprint(fp))
+                        .orElse(null);
+
+        return new NetworkConfig(
+                hosts,
+                username,
+                password,
+                null,
+                pathPrefix,
+                config.getConnectionRequestTimeout().map(d -> (int) 
d.toMillis()).orElse(null),
+                config.getConnectionTimeout().map(d -> (int) 
d.toMillis()).orElse(null),
+                config.getSocketTimeout().map(d -> (int) 
d.toMillis()).orElse(null),
+                sslContextSupplier,
+                null);
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        return new Elasticsearch8DynamicSource(format, config, 
physicalRowDataType, summaryString);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return summaryString;
+    }
+
+    @Override
+    public boolean supportsNestedProjection() {
+        return false;
+    }
+
+    @Override
+    public void applyProjection(int[][] projectedFields, DataType type) {
+        this.physicalRowDataType = 
Projection.of(projectedFields).project(type);
+    }
+}
diff --git 
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticSearch8AsyncDynamicTableFactory.java
 
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicTableFactory.java
similarity index 84%
rename from 
flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticSearch8AsyncDynamicTableFactory.java
rename to 
flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicTableFactory.java
index 3008a14..bf21cbc 100644
--- 
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticSearch8AsyncDynamicTableFactory.java
+++ 
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicTableFactory.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,20 +7,19 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.elasticsearch.table;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
@@ -31,9 +29,13 @@ import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.SerializationFormatFactory;
 import org.apache.flink.table.types.DataType;
@@ -63,24 +65,49 @@ import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch8Conne
 import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.HOSTS_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.INDEX_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.KEY_DELIMITER_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.MAX_RETRIES;
+import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.NUM_CANDIDATES;
 import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.PASSWORD_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.SOCKET_TIMEOUT;
 import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.SSL_CERTIFICATE_FINGERPRINT;
 import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.USERNAME_OPTION;
 import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
-import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.MAX_RETRIES;
 import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY;
 import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS;
 import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE;
 import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_MAX_ROWS;
 import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
 
-/** Factory for creating {@link ElasticSearch8AsyncDynamicSink} . */
+/**
+ * A factory for discovering both {@link Elasticsearch8DynamicSource} (lookup 
/ vector search) and
+ * {@link ElasticSearch8AsyncDynamicSink} under the same {@code 
elasticsearch-8} identifier.
+ */
 @Internal
-public class ElasticSearch8AsyncDynamicTableFactory extends 
AsyncDynamicTableSinkFactory {
-
+public class Elasticsearch8DynamicTableFactory extends 
AsyncDynamicTableSinkFactory
+        implements DynamicTableSourceFactory {
     private static final String IDENTIFIER = "elasticsearch-8";
 
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        final FactoryUtil.TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(this, context);
+
+        final DecodingFormat<DeserializationSchema<RowData>> format =
+                
helper.discoverDecodingFormat(DeserializationFormatFactory.class, 
FORMAT_OPTION);
+
+        Elasticsearch8Configuration config = getConfiguration(helper);
+        helper.validate();
+        validateConfiguration(config);
+
+        return new Elasticsearch8DynamicSource(
+                format, config, context.getPhysicalRowDataType(), 
capitalize(IDENTIFIER));
+    }
+
     @Override
     public DynamicTableSink createDynamicTableSink(Context context) {
         List<LogicalTypeWithIndex> primaryKeyLogicalTypesWithIndex =
@@ -95,10 +122,8 @@ public class ElasticSearch8AsyncDynamicTableFactory extends 
AsyncDynamicTableSin
         helper.validate();
         validateConfiguration(config);
 
-        ElasticSearch8AsyncDynamicSink.ElasticSearch8AsyncDynamicSinkBuilder 
builder =
-                new 
ElasticSearch8AsyncDynamicSink.ElasticSearch8AsyncDynamicSinkBuilder();
-
-        return builder.setConfig(config)
+        return new 
ElasticSearch8AsyncDynamicSink.ElasticSearch8AsyncDynamicSinkBuilder()
+                .setConfig(config)
                 .setFormat(format)
                 
.setPrimaryKeyLogicalTypesWithIndex(primaryKeyLogicalTypesWithIndex)
                 .setPhysicalRowDataType(context.getPhysicalRowDataType())
@@ -107,40 +132,6 @@ public class ElasticSearch8AsyncDynamicTableFactory 
extends AsyncDynamicTableSin
                 .build();
     }
 
-    ZoneId getLocalTimeZoneId(ReadableConfig readableConfig) {
-        final String zone = 
readableConfig.get(TableConfigOptions.LOCAL_TIME_ZONE);
-
-        return TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone)
-                ? ZoneId.systemDefault()
-                : ZoneId.of(zone);
-    }
-
-    List<LogicalTypeWithIndex> getPrimaryKeyLogicalTypesWithIndex(Context 
context) {
-        DataType physicalRowDataType = context.getPhysicalRowDataType();
-        int[] primaryKeyIndexes = context.getPrimaryKeyIndexes();
-        if (primaryKeyIndexes.length != 0) {
-            DataType pkDataType = 
Projection.of(primaryKeyIndexes).project(physicalRowDataType);
-
-            ElasticsearchValidationUtils.validatePrimaryKey(pkDataType);
-        }
-
-        ResolvedSchema resolvedSchema = 
context.getCatalogTable().getResolvedSchema();
-        return Arrays.stream(primaryKeyIndexes)
-                .mapToObj(
-                        index -> {
-                            Optional<Column> column = 
resolvedSchema.getColumn(index);
-                            if (!column.isPresent()) {
-                                throw new IllegalStateException(
-                                        String.format(
-                                                "No primary key column found 
with index '%s'.",
-                                                index));
-                            }
-                            LogicalType logicalType = 
column.get().getDataType().getLogicalType();
-                            return new LogicalTypeWithIndex(index, 
logicalType);
-                        })
-                .collect(Collectors.toList());
-    }
-
     Elasticsearch8Configuration 
getConfiguration(FactoryUtil.TableFactoryHelper helper) {
         return new Elasticsearch8Configuration(helper.getOptions());
     }
@@ -187,9 +178,37 @@ public class ElasticSearch8AsyncDynamicTableFactory 
extends AsyncDynamicTableSin
         }
     }
 
-    @Override
-    public String factoryIdentifier() {
-        return IDENTIFIER;
+    ZoneId getLocalTimeZoneId(ReadableConfig readableConfig) {
+        final String zone = 
readableConfig.get(TableConfigOptions.LOCAL_TIME_ZONE);
+
+        return TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone)
+                ? ZoneId.systemDefault()
+                : ZoneId.of(zone);
+    }
+
+    List<LogicalTypeWithIndex> getPrimaryKeyLogicalTypesWithIndex(Context 
context) {
+        DataType physicalRowDataType = context.getPhysicalRowDataType();
+        int[] primaryKeyIndexes = context.getPrimaryKeyIndexes();
+        if (primaryKeyIndexes.length != 0) {
+            DataType pkDataType = 
Projection.of(primaryKeyIndexes).project(physicalRowDataType);
+            ElasticsearchValidationUtils.validatePrimaryKey(pkDataType);
+        }
+
+        ResolvedSchema resolvedSchema = 
context.getCatalogTable().getResolvedSchema();
+        return Arrays.stream(primaryKeyIndexes)
+                .mapToObj(
+                        index -> {
+                            Optional<Column> column = 
resolvedSchema.getColumn(index);
+                            if (!column.isPresent()) {
+                                throw new IllegalStateException(
+                                        String.format(
+                                                "No primary key column found 
with index '%s'.",
+                                                index));
+                            }
+                            LogicalType logicalType = 
column.get().getDataType().getLogicalType();
+                            return new LogicalTypeWithIndex(index, 
logicalType);
+                        })
+                .collect(Collectors.toList());
     }
 
     @Override
@@ -201,16 +220,15 @@ public class ElasticSearch8AsyncDynamicTableFactory 
extends AsyncDynamicTableSin
     public Set<ConfigOption<?>> optionalOptions() {
         return Stream.of(
                         KEY_DELIMITER_OPTION,
-                        BULK_FLUSH_MAX_SIZE_OPTION,
                         BULK_FLUSH_MAX_ACTIONS_OPTION,
-                        BULK_FLUSH_INTERVAL_OPTION,
                         BULK_FLUSH_MAX_BUFFERED_ACTIONS_OPTION,
                         BULK_FLUSH_MAX_IN_FLIGHT_ACTIONS_OPTION,
+                        BULK_FLUSH_MAX_SIZE_OPTION,
+                        BULK_FLUSH_INTERVAL_OPTION,
                         CONNECTION_PATH_PREFIX_OPTION,
                         CONNECTION_REQUEST_TIMEOUT,
                         CONNECTION_TIMEOUT,
                         SOCKET_TIMEOUT,
-                        SSL_CERTIFICATE_FINGERPRINT,
                         FORMAT_OPTION,
                         DELIVERY_GUARANTEE_OPTION,
                         PASSWORD_OPTION,
@@ -221,7 +239,9 @@ public class ElasticSearch8AsyncDynamicTableFactory extends 
AsyncDynamicTableSin
                         PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
                         PARTIAL_CACHE_MAX_ROWS,
                         PARTIAL_CACHE_CACHE_MISSING_KEY,
-                        MAX_RETRIES)
+                        MAX_RETRIES,
+                        NUM_CANDIDATES,
+                        SSL_CERTIFICATE_FINGERPRINT)
                 .collect(Collectors.toSet());
     }
 
@@ -234,10 +254,10 @@ public class ElasticSearch8AsyncDynamicTableFactory 
extends AsyncDynamicTableSin
                         USERNAME_OPTION,
                         KEY_DELIMITER_OPTION,
                         BULK_FLUSH_MAX_ACTIONS_OPTION,
-                        BULK_FLUSH_MAX_SIZE_OPTION,
-                        BULK_FLUSH_INTERVAL_OPTION,
                         BULK_FLUSH_MAX_BUFFERED_ACTIONS_OPTION,
                         BULK_FLUSH_MAX_IN_FLIGHT_ACTIONS_OPTION,
+                        BULK_FLUSH_MAX_SIZE_OPTION,
+                        BULK_FLUSH_INTERVAL_OPTION,
                         CONNECTION_PATH_PREFIX_OPTION,
                         CONNECTION_REQUEST_TIMEOUT,
                         CONNECTION_TIMEOUT,
diff --git 
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/search/ElasticsearchRowDataVectorSearchFunction.java
 
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/search/ElasticsearchRowDataVectorSearchFunction.java
new file mode 100644
index 0000000..516752a
--- /dev/null
+++ 
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/search/ElasticsearchRowDataVectorSearchFunction.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.table.search;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.connector.elasticsearch.sink.NetworkConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.VectorSearchFunction;
+
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch.core.SearchRequest;
+import co.elastic.clients.elasticsearch.core.SearchResponse;
+import co.elastic.clients.json.JsonData;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The {@link VectorSearchFunction} implementation for Elasticsearch 8. */
+public class ElasticsearchRowDataVectorSearchFunction
+        extends AbstractElasticsearchVectorSearchFunction {
+    private static final long serialVersionUID = 1L;
+
+    private final int numCandidates;
+    private final NetworkConfig networkConfig;
+
+    private transient ElasticsearchClient client;
+
+    public ElasticsearchRowDataVectorSearchFunction(
+            DeserializationSchema<RowData> deserializationSchema,
+            int maxRetryTimes,
+            int numCandidates,
+            String index,
+            String searchColumn,
+            String[] producedNames,
+            NetworkConfig networkConfig) {
+        super(deserializationSchema, maxRetryTimes, index, searchColumn, 
producedNames);
+        this.numCandidates = numCandidates;
+        this.networkConfig = checkNotNull(networkConfig, "No networkConfig 
supplied.");
+    }
+
+    @Override
+    protected void doOpen(FunctionContext context) {
+        this.client = networkConfig.createEsSyncClient();
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        if (client != null) {
+            client._transport().close();
+            client = null;
+        }
+    }
+
+    @Override
+    protected SearchResult[] doSearch(int topK, RowData features) throws 
IOException {
+        List<Float> queryVector = new ArrayList<>();
+        for (float feature : features.getArray(0).toFloatArray()) {
+            queryVector.add(feature);
+        }
+
+        SearchRequest request =
+                new SearchRequest.Builder()
+                        .index(index)
+                        .knn(
+                                kb ->
+                                        kb.field(searchColumn)
+                                                .numCandidates(numCandidates)
+                                                .queryVector(queryVector)
+                                                .k(topK))
+                        .source(src -> src.filter(f -> 
f.includes(Arrays.asList(producedNames))))
+                        .build();
+
+        SearchResponse<JsonData> searchResponse = client.search(request, 
JsonData.class);
+
+        return searchResponse.hits().hits().stream()
+                .filter(hit -> hit.source() != null)
+                .map(hit -> new SearchResult(hit.source().toJson().toString(), 
hit.score()))
+                .toArray(SearchResult[]::new);
+    }
+}
diff --git 
a/flink-connector-elasticsearch8/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-connector-elasticsearch8/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index de87735..b138912 100644
--- 
a/flink-connector-elasticsearch8/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/flink-connector-elasticsearch8/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,5 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.connector.elasticsearch.table.ElasticSearch8AsyncDynamicTableFactory
-
+org.apache.flink.connector.elasticsearch.table.Elasticsearch8DynamicTableFactory
diff --git 
a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSinkITCase.java
 
b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSinkITCase.java
index 2381b88..f65e72a 100644
--- 
a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSinkITCase.java
+++ 
b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSinkITCase.java
@@ -94,8 +94,7 @@ class Elasticsearch8DynamicSinkITCase extends 
Elasticsearch8DynamicSinkBaseITCas
                                 LocalDateTime.parse("2012-12-12T12:12:12")));
 
         String index = "writing-documents";
-        ElasticSearch8AsyncDynamicTableFactory sinkFactory =
-                new ElasticSearch8AsyncDynamicTableFactory();
+        Elasticsearch8DynamicTableFactory sinkFactory = new 
Elasticsearch8DynamicTableFactory();
 
         DynamicTableSink.SinkRuntimeProvider runtimeProvider =
                 sinkFactory
diff --git 
a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8VectorSearchITCase.java
 
b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8VectorSearchITCase.java
new file mode 100644
index 0000000..46e5c34
--- /dev/null
+++ 
b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8VectorSearchITCase.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch._types.mapping.BooleanProperty;
+import co.elastic.clients.elasticsearch._types.mapping.ByteNumberProperty;
+import co.elastic.clients.elasticsearch._types.mapping.DateProperty;
+import co.elastic.clients.elasticsearch._types.mapping.DenseVectorIndexOptions;
+import co.elastic.clients.elasticsearch._types.mapping.DenseVectorProperty;
+import co.elastic.clients.elasticsearch._types.mapping.DoubleNumberProperty;
+import co.elastic.clients.elasticsearch._types.mapping.FloatNumberProperty;
+import co.elastic.clients.elasticsearch._types.mapping.IntegerNumberProperty;
+import co.elastic.clients.elasticsearch._types.mapping.LongNumberProperty;
+import co.elastic.clients.elasticsearch._types.mapping.Property;
+import co.elastic.clients.elasticsearch._types.mapping.ShortNumberProperty;
+import co.elastic.clients.elasticsearch._types.mapping.TextProperty;
+import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
+import co.elastic.clients.elasticsearch.core.IndexResponse;
+import co.elastic.clients.json.jackson.JacksonJsonpMapper;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.client.RestClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** {@code VECTOR_SEARCH } ITCase for Elasticsearch 8. */
+@Testcontainers
+public class Elasticsearch8VectorSearchITCase {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(Elasticsearch8VectorSearchITCase.class);
+
+    private static final int PARALLELISM = 2;
+
+    public static final String ELASTICSEARCH_VERSION = "8.19.0";
+    public static final DockerImageName ELASTICSEARCH_IMAGE =
+            
DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch")
+                    .withTag(ELASTICSEARCH_VERSION);
+    private static final String ES_CLUSTER_USERNAME = "elastic";
+    private static final String ES_CLUSTER_PASSWORD = "s3cret";
+
+    @Container
+    private static final ElasticsearchContainer ES_CONTAINER = 
createElasticsearchContainer();
+
+    private static ElasticsearchContainer createElasticsearchContainer() {
+        final ElasticsearchContainer container =
+                new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
+                        .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
+                        .withEnv("logger.org.elasticsearch", "ERROR")
+                        .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+        container.withPassword(ES_CLUSTER_PASSWORD);
+
+        container.setWaitStrategy(
+                
Wait.defaultWaitStrategy().withStartupTimeout(Duration.ofMinutes(3)));
+
+        return container;
+    }
+
+    private String getEsCertFingerprint() throws Exception {
+        Preconditions.checkArgument(ES_CONTAINER.caCertAsBytes().isPresent());
+        byte[] caCertBytes = ES_CONTAINER.caCertAsBytes().get();
+        X509Certificate caCert =
+                (X509Certificate)
+                        CertificateFactory.getInstance("X.509")
+                                .generateCertificate(new 
ByteArrayInputStream(caCertBytes));
+        byte[] fingerprint = 
MessageDigest.getInstance("SHA-256").digest(caCert.getEncoded());
+        return Hex.encodeHexString(fingerprint);
+    }
+
+    private ElasticsearchClient getClient() {
+        final CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
+        credentialsProvider.setCredentials(
+                AuthScope.ANY,
+                new UsernamePasswordCredentials(ES_CLUSTER_USERNAME, 
ES_CLUSTER_PASSWORD));
+        RestClient restClient =
+                RestClient.builder(
+                                new HttpHost(
+                                        ES_CONTAINER.getHost(),
+                                        ES_CONTAINER.getFirstMappedPort(),
+                                        "https"))
+                        .setHttpClientConfigCallback(
+                                httpClientBuilder ->
+                                        httpClientBuilder
+                                                
.setDefaultCredentialsProvider(credentialsProvider)
+                                                .setSSLContext(
+                                                        
ES_CONTAINER.createSslContextFromCa()))
+                        .build();
+        RestClientTransport transport =
+                new RestClientTransport(restClient, new JacksonJsonpMapper());
+        return new ElasticsearchClient(transport);
+    }
+
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .build());
+
+    private final List<Row> inputData =
+            Arrays.asList(
+                    Row.of(1L, "Spark", new Float[] {0.2718f, 0.6527f, 
0.7076f}),
+                    Row.of(2L, "Flink", new Float[] {-0.2718f, -0.6527f, 
-0.7076f}));
+
+    private TableEnvironment tEnv;
+
+    @BeforeEach
+    void beforeEach() {
+        tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+    }
+
+    @Test
+    public void testSearchFullTypeVectorTable() throws Exception {
+        String index = "table_with_all_supported_types";
+        createFullTypesIndex(index);
+
+        // Insert data using ES client since elasticsearch-8 connector doesn't 
support sink
+        Map<String, Object> document = new HashMap<>();
+        document.put("id", 1L);
+        document.put("f1", "ABCDE");
+        document.put("f2", true);
+        document.put("f3", (byte) 127);
+        document.put("f4", (short) 257);
+        document.put("f5", 65535);
+        document.put("f6", LocalDate.ofEpochDay(12345).toString());
+        document.put("f7", "2012-12-12 12:12:12");
+        document.put("f8", 11.11f);
+        document.put("f9", 12.22d);
+        document.put("f10", new float[] {11.11f, 11.12f});
+        document.put("f11", new double[] {12.22d, 12.22d});
+        document.put("f12", new int[] {Integer.MIN_VALUE, Integer.MAX_VALUE});
+        document.put("f13", new long[] {Long.MIN_VALUE, Long.MAX_VALUE});
+
+        IndexResponse response = getClient().index(i -> 
i.index(index).id("1").document(document));
+        LOG.info("Indexed document with result: {}", response.result());
+
+        // Wait for es to refresh index
+        getClient().indices().refresh(r -> r.index(index));
+
+        String certFingerprint = getEsCertFingerprint();
+
+        tEnv.executeSql(
+                "CREATE TABLE esTable ("
+                        + "  id BIGINT,\n"
+                        + "  f1 STRING,\n"
+                        + "  f2 BOOLEAN,\n"
+                        + "  f3 TINYINT,\n"
+                        + "  f4 SMALLINT,\n"
+                        + "  f5 INTEGER,\n"
+                        + "  f6 DATE,\n"
+                        + "  f7 TIMESTAMP,\n"
+                        + "  f8 FLOAT,\n"
+                        + "  f9 DOUBLE,\n"
+                        + "  f10 ARRAY<FLOAT>,\n"
+                        + "  f11 ARRAY<DOUBLE>,\n"
+                        + "  f12 ARRAY<INTEGER>,\n"
+                        + "  f13 ARRAY<BIGINT>,\n"
+                        + "  PRIMARY KEY (id) NOT ENFORCED\n"
+                        + ")\n"
+                        + "WITH (\n"
+                        + String.format("'%s'='%s',\n", "connector", 
"elasticsearch-8")
+                        + String.format(
+                                "'%s'='%s',\n",
+                                
ElasticsearchConnectorOptions.INDEX_OPTION.key(), index)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                
ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                "https://"; + ES_CONTAINER.getHttpHostAddress())
+                        + String.format(
+                                "'%s'='%s',\n",
+                                
ElasticsearchConnectorOptions.USERNAME_OPTION.key(),
+                                ES_CLUSTER_USERNAME)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                
ElasticsearchConnectorOptions.PASSWORD_OPTION.key(),
+                                ES_CLUSTER_PASSWORD)
+                        + String.format(
+                                "'%s'='%s'\n",
+                                
Elasticsearch8ConnectorOptions.SSL_CERTIFICATE_FINGERPRINT.key(),
+                                certFingerprint)
+                        + ")");
+
+        List<String> rows =
+                CollectionUtil.iteratorToList(
+                                tEnv.executeSql(
+                                                "WITH t(id, vector) AS (SELECT 
* FROM (VALUES (1, CAST(ARRAY[11.11, 1] AS ARRAY<FLOAT>))))\n"
+                                                        + "SELECT * FROM t, 
LATERAL TABLE(VECTOR_SEARCH(TABLE esTable, DESCRIPTOR(f10), t.vector, 3))\n")
+                                        .collect())
+                        .stream()
+                        .map(Row::toString)
+                        .collect(Collectors.toList());
+        assertThat(rows)
+                .isEqualTo(
+                        Collections.singletonList(
+                                "+I[1, [11.11, 1.0], 1, ABCDE, true, 127, 257, 
65535, 2003-10-20, 2012-12-12T12:12:12, 11.11, 12.22, [11.11, 11.12], [12.22, 
12.22], [-2147483648, 2147483647], [-9223372036854775808, 9223372036854775807], 
0.8836806]"));
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"cosine", "l2_norm", "dot_product"})
+    void testSearchUsingFloatArray(String metric) throws Exception {
+        String index = "table_with_multiple_data_with_" + 
metric.toLowerCase().replace("_", "");
+        createSimpleIndex(index, metric);
+
+        // Insert data using ES client since elasticsearch-8 connector doesn't 
support sink
+        // For dot_product, vectors must be normalized (unit vectors)
+        indexSimpleDocument(index, "1", 1L, "Batch", new float[] {0.2718f, 
0.6527f, 0.7076f});
+        indexSimpleDocument(
+                index, "2", 2L, "Streaming", new float[] {-0.2718f, -0.6527f, 
-0.7076f});
+        indexSimpleDocument(index, "3", 3L, "Big Data", new float[] {0.7071f, 
0.7071f, 0f});
+
+        // Refresh index to make documents searchable
+        getClient().indices().refresh(r -> r.index(index));
+
+        String certFingerprint = getEsCertFingerprint();
+
+        tEnv.executeSql(
+                "CREATE TABLE es_table("
+                        + "  id BIGINT,"
+                        + "  label STRING,"
+                        + "  vector ARRAY<FLOAT>"
+                        + ")\n WITH (\n"
+                        + String.format("'%s'='%s',\n", "connector", 
"elasticsearch-8")
+                        + String.format(
+                                "'%s'='%s',\n",
+                                
ElasticsearchConnectorOptions.INDEX_OPTION.key(), index)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                
ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+                                "https://"; + ES_CONTAINER.getHttpHostAddress())
+                        + String.format(
+                                "'%s'='%s',\n",
+                                
ElasticsearchConnectorOptions.USERNAME_OPTION.key(),
+                                ES_CLUSTER_USERNAME)
+                        + String.format(
+                                "'%s'='%s',\n",
+                                
ElasticsearchConnectorOptions.PASSWORD_OPTION.key(),
+                                ES_CLUSTER_PASSWORD)
+                        + String.format(
+                                "'%s'='%s'\n",
+                                
Elasticsearch8ConnectorOptions.SSL_CERTIFICATE_FINGERPRINT.key(),
+                                certFingerprint)
+                        + ")");
+
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE src(\n"
+                                + "  id BIGINT PRIMARY KEY NOT ENFORCED,\n"
+                                + "  content STRING,\n"
+                                + "  index ARRAY<FLOAT>\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'data-id' = '%s'\n"
+                                + ");\n",
+                        TestValuesTableFactory.registerData(inputData)));
+        assertThat(
+                        CollectionUtil.iteratorToList(
+                                        tEnv.executeSql(
+                                                        "SELECT content, label 
FROM src, LATERAL TABLE(VECTOR_SEARCH(TABLE es_table, DESCRIPTOR(vector), 
src.index, 2))")
+                                                .collect())
+                                .stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .isEqualTo(
+                        Arrays.asList(
+                                "+I[Spark, Batch]",
+                                "+I[Spark, Big Data]",
+                                "+I[Flink, Streaming]",
+                                "+I[Flink, Big Data]"));
+    }
+
+    private void createFullTypesIndex(String index) throws IOException {
+        // In ES 8.x, dense_vector requires index: true and similarity for kNN 
search
+        TypeMapping mapping =
+                TypeMapping.of(
+                        m ->
+                                m.properties(
+                                                "id",
+                                                Property.of(
+                                                        p ->
+                                                                p.long_(
+                                                                        
LongNumberProperty.of(
+                                                                               
 l -> l))))
+                                        .properties(
+                                                "f1",
+                                                Property.of(p -> 
p.text(TextProperty.of(t -> t))))
+                                        .properties(
+                                                "f2",
+                                                Property.of(
+                                                        p ->
+                                                                p.boolean_(
+                                                                        
BooleanProperty.of(
+                                                                               
 b -> b))))
+                                        .properties(
+                                                "f3",
+                                                Property.of(
+                                                        p ->
+                                                                p.byte_(
+                                                                        
ByteNumberProperty.of(
+                                                                               
 b -> b))))
+                                        .properties(
+                                                "f4",
+                                                Property.of(
+                                                        p ->
+                                                                p.short_(
+                                                                        
ShortNumberProperty.of(
+                                                                               
 s -> s))))
+                                        .properties(
+                                                "f5",
+                                                Property.of(
+                                                        p ->
+                                                                p.integer(
+                                                                        
IntegerNumberProperty.of(
+                                                                               
 i -> i))))
+                                        .properties(
+                                                "f6",
+                                                Property.of(p -> 
p.date(DateProperty.of(d -> d))))
+                                        .properties(
+                                                "f7",
+                                                Property.of(p -> 
p.text(TextProperty.of(t -> t))))
+                                        .properties(
+                                                "f8",
+                                                Property.of(
+                                                        p ->
+                                                                p.float_(
+                                                                        
FloatNumberProperty.of(
+                                                                               
 f -> f))))
+                                        .properties(
+                                                "f9",
+                                                Property.of(
+                                                        p ->
+                                                                p.double_(
+                                                                        
DoubleNumberProperty.of(
+                                                                               
 d -> d))))
+                                        .properties(
+                                                "f10",
+                                                Property.of(
+                                                        p ->
+                                                                p.denseVector(
+                                                                        
createDenseVectorProperty(
+                                                                               
 2, "cosine"))))
+                                        .properties(
+                                                "f11",
+                                                Property.of(
+                                                        p ->
+                                                                p.denseVector(
+                                                                        
createDenseVectorProperty(
+                                                                               
 2, "cosine"))))
+                                        .properties(
+                                                "f12",
+                                                Property.of(
+                                                        p ->
+                                                                p.denseVector(
+                                                                        
createDenseVectorProperty(
+                                                                               
 2, "cosine"))))
+                                        .properties(
+                                                "f13",
+                                                Property.of(
+                                                        p ->
+                                                                p.denseVector(
+                                                                        
createDenseVectorProperty(
+                                                                               
 2, "cosine")))));
+
+        this.getClient().indices().create(c -> 
c.index(index).mappings(mapping));
+    }
+
+    private void createSimpleIndex(String index, String similarity) throws 
IOException {
+        // In ES 8.x, dense_vector requires index: true and similarity for kNN 
search
+        TypeMapping mapping =
+                TypeMapping.of(
+                        m ->
+                                m.properties(
+                                                "id",
+                                                Property.of(
+                                                        p ->
+                                                                p.long_(
+                                                                        
LongNumberProperty.of(
+                                                                               
 l -> l))))
+                                        .properties(
+                                                "label",
+                                                Property.of(p -> 
p.text(TextProperty.of(t -> t))))
+                                        .properties(
+                                                "vector",
+                                                Property.of(
+                                                        p ->
+                                                                p.denseVector(
+                                                                        
createDenseVectorProperty(
+                                                                               
 3, similarity)))));
+
+        this.getClient().indices().create(c -> 
c.index(index).mappings(mapping));
+    }
+
+    private DenseVectorProperty createDenseVectorProperty(int dims, String 
similarity) {
+        return DenseVectorProperty.of(
+                d ->
+                        d.dims(dims)
+                                .index(true)
+                                .similarity(similarity)
+                                .indexOptions(
+                                        DenseVectorIndexOptions.of(
+                                                o -> 
o.type("hnsw").m(16).efConstruction(100))));
+    }
+
+    private void indexSimpleDocument(
+            String index, String docId, Long id, String label, float[] vector) 
throws IOException {
+        Map<String, Object> document = new HashMap<>();
+        document.put("id", id);
+        document.put("label", label);
+        document.put("vector", vector);
+
+        IndexResponse response =
+                getClient().index(i -> 
i.index(index).id(docId).document(document));
+        LOG.info("Indexed document {} with result: {}", docId, 
response.result());
+    }
+}
diff --git 
a/flink-connector-elasticsearch8/src/test/resources/testcontainers.properties 
b/flink-connector-elasticsearch8/src/test/resources/testcontainers.properties
new file mode 100644
index 0000000..07514cc
--- /dev/null
+++ 
b/flink-connector-elasticsearch8/src/test/resources/testcontainers.properties
@@ -0,0 +1,17 @@
+################################################################################
+#  Copyright 2023 Ververica Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+ryuk.container.image = testcontainers/ryuk:0.6.0
diff --git a/pom.xml b/pom.xml
index 98775f6..98dbf91 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,6 +52,7 @@ under the License.
 
        <properties>
                <flink.version>2.2.1</flink.version>
+               <scala.binary.version>2.12</scala.binary.version>
 
                <jackson-bom.version>2.15.3</jackson-bom.version>
                <junit4.version>4.13.2</junit4.version>

Reply via email to