dianfu commented on code in PR #20002:
URL: https://github.com/apache/flink/pull/20002#discussion_r900607863


##########
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/SimpleElasticsearchEmitter.java:
##########
@@ -20,76 +20,87 @@
 
 import org.apache.flink.api.connector.sink2.SinkWriter;
 
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.update.UpdateRequest;
 
-import java.io.Serializable;
+import javax.annotation.Nullable;
+
 import java.util.Map;
-import java.util.function.Function;
+import java.util.function.BiConsumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
-/** A ElasticsearchEmitter that is currently used Python Flink Connector. */
+/** A simple ElasticsearchEmitter which is currently used in PyFlink ES 
connector. */
 public class SimpleElasticsearchEmitter implements 
ElasticsearchEmitter<Map<String, Object>> {
 
     private static final long serialVersionUID = 1L;
-    private Function<Map<String, Object>, UpdateRequest> requestGenerator;
+
+    private final String index;
+    private @Nullable final String documentType;
+    private @Nullable final String idFieldName;
+    private final boolean isDynamicIndex;
+
+    private transient BiConsumer<Map<String, Object>, RequestIndexer> 
requestGenerator;
 
     public SimpleElasticsearchEmitter(
-            String index, String documentType, String idFieldName, boolean 
isDynamicIndex) {
-        // If this issue resolve 
https://issues.apache.org/jira/browse/MSHADE-260
-        // we can replace requestGenerator with lambda.
-        // Other corresponding issues 
https://issues.apache.org/jira/browse/FLINK-18857 and
-        // https://issues.apache.org/jira/browse/FLINK-18006
+            String index,
+            @Nullable String documentType,
+            @Nullable String idFieldName,
+            boolean isDynamicIndex) {
+        this.index = checkNotNull(index);
+        this.documentType = documentType;
+        this.idFieldName = idFieldName;
+        this.isDynamicIndex = isDynamicIndex;
+    }
+
+    @Override
+    public void open() throws Exception {
         if (isDynamicIndex) {
-            this.requestGenerator =
-                    new DynamicIndexRequestGenerator(index, documentType, 
idFieldName);
+            final String indexFieldName = index;
+            requestGenerator =

Review Comment:
   The logic could be further simplified by making the following abstraction:
   ```
           Function<Map<String, Object>, String> indexProvider;
           if (isDynamicIndex) {
               indexProvider = doc -> doc.get(index).toString();
           } else {
               indexProvider = doc -> index;
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to