xishuaidelin commented on code in PR #137:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/137#discussion_r3263262986


##########
flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/SearchMetric.java:
##########
@@ -0,0 +1,21 @@
+package org.apache.flink.connector.elasticsearch.table.search;
+
+/** Metric for vector search. */
+public enum SearchMetric {

Review Comment:
   Missing Apache license header. Several other files have the same issue.



##########
flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSource.java:
##########
@@ -0,0 +1,111 @@
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.connector.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.connector.elasticsearch.NetworkClientConfig;
+import 
org.apache.flink.connector.elasticsearch.table.search.ElasticsearchRowDataVectorSearchFunction;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.VectorSearchTableSource;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import 
org.apache.flink.table.connector.source.search.VectorSearchFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.elasticsearch.client.RestHighLevelClient;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a {@link 
Elasticsearch7DynamicSource}
+ * from a logical description.
+ */
+public class Elasticsearch7DynamicSource extends ElasticsearchDynamicSource

Review Comment:
   This class does not override copy(). If copied, new source would lose the 
vectorSearch capability.



##########
flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/search/ElasticsearchRowDataVectorSearchFunction.java:
##########
@@ -0,0 +1,98 @@
+package org.apache.flink.connector.elasticsearch.table.search;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.connector.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.connector.elasticsearch.NetworkClientConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.VectorSearchFunction;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.index.query.MatchAllQueryBuilder;
+import org.elasticsearch.index.query.functionscore.ScriptScoreQueryBuilder;
+import org.elasticsearch.script.Script;
+import org.elasticsearch.script.ScriptType;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The {@link VectorSearchFunction} implementation for Elasticsearch 7. */
+public class ElasticsearchRowDataVectorSearchFunction
+        extends AbstractElasticsearchVectorSearchFunction {
+    private static final long serialVersionUID = 1L;
+    private static final String QUERY_VECTOR = "query_vector";
+
+    private final ElasticsearchApiCallBridge<RestHighLevelClient> callBridge;
+    private final NetworkClientConfig networkClientConfig;
+    private final List<HttpHost> hosts;
+    private final String scriptScore;
+
+    private transient RestHighLevelClient client;
+    private transient SearchRequest searchRequest;
+    private transient SearchSourceBuilder searchSourceBuilder;
+
+    public ElasticsearchRowDataVectorSearchFunction(
+            DeserializationSchema<RowData> deserializationSchema,
+            int maxRetryTimes,
+            SearchMetric searchMetric,
+            String index,
+            String searchColumn,
+            String[] producedNames,
+            List<HttpHost> hosts,
+            NetworkClientConfig networkClientConfig,
+            ElasticsearchApiCallBridge<RestHighLevelClient> callBridge) {
+        super(deserializationSchema, maxRetryTimes, index, searchColumn, 
producedNames);
+        this.networkClientConfig =
+                checkNotNull(networkClientConfig, "No networkClientConfig 
supplied.");
+        this.hosts = checkNotNull(hosts, "No hosts supplied.");
+        this.callBridge = checkNotNull(callBridge, "No 
ElasticsearchApiCallBridge supplied.");
+        this.scriptScore =
+                String.format(
+                        "%s(params.%s, '%s') + 1.0",
+                        searchMetric.toString(), QUERY_VECTOR, searchColumn);
+    }
+
+    @Override
+    protected void doOpen(FunctionContext context) {
+        this.client = callBridge.createClient(networkClientConfig, hosts);

Review Comment:
   The client is created but never closed.



##########
flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSource.java:
##########
@@ -0,0 +1,111 @@
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.connector.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.connector.elasticsearch.NetworkClientConfig;
+import 
org.apache.flink.connector.elasticsearch.table.search.ElasticsearchRowDataVectorSearchFunction;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.VectorSearchTableSource;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import 
org.apache.flink.table.connector.source.search.VectorSearchFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.elasticsearch.client.RestHighLevelClient;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a {@link 
Elasticsearch7DynamicSource}
+ * from a logical description.
+ */
+public class Elasticsearch7DynamicSource extends ElasticsearchDynamicSource
+        implements VectorSearchTableSource {
+
+    public Elasticsearch7DynamicSource(
+            DecodingFormat<DeserializationSchema<RowData>> format,
+            ElasticsearchConfiguration config,
+            DataType physicalRowDataType,
+            int maxRetryTimes,
+            String summaryString,
+            ElasticsearchApiCallBridge<RestHighLevelClient> apiCallBridge,
+            @Nullable LookupCache lookupCache,
+            @Nullable String docType) {
+        super(
+                format,
+                config,
+                physicalRowDataType,
+                maxRetryTimes,
+                summaryString,
+                apiCallBridge,
+                lookupCache,
+                docType);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public VectorSearchRuntimeProvider getSearchRuntimeProvider(
+            VectorSearchContext vectorSearchContext) {
+
+        NetworkClientConfig networkClientConfig = buildNetworkClientConfig();
+
+        ElasticsearchRowDataVectorSearchFunction vectorSearchFunction =
+                new ElasticsearchRowDataVectorSearchFunction(
+                        this.format.createRuntimeDecoder(vectorSearchContext, 
physicalRowDataType),
+                        this.maxRetryTimes,
+                        ((Elasticsearch7Configuration) 
config).getVectorSearchMetric(),
+                        config.getIndex(),
+                        getSearchColumn(vectorSearchContext),
+                        
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
+                        config.getHosts(),
+                        networkClientConfig,
+                        (ElasticsearchApiCallBridge<RestHighLevelClient>) 
apiCallBridge);
+
+        return VectorSearchFunctionProvider.of(vectorSearchFunction);
+    }
+
+    private String getSearchColumn(VectorSearchContext vectorSearchContext) {

Review Comment:
   This still seems unresolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to