dianfu commented on code in PR #19732:
URL: https://github.com/apache/flink/pull/19732#discussion_r883356468


##########
flink-python/pyflink/datastream/connectors/__init__.py:
##########
@@ -48,6 +50,10 @@
     'JdbcExecutionOptions',
     'NumberSequenceSource',
     'OutputFileConfig',
+    'ElasticsearchSink',

Review Comment:
   Update the documentation in 
https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/__init__.py#L170
 to mention es.
   
   Nit: It would be great to also add pulsar connector in the above place.



##########
flink-python/pyflink/datastream/connectors/elasticsearch.py:
##########
@@ -0,0 +1,255 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import abc
+from enum import Enum
+from typing import List
+
+from pyflink.datastream.connectors import Sink, DeliveryGuarantee
+from pyflink.java_gateway import get_gateway
+from pyflink.util.java_utils import to_jarray, load_java_class
+
+
+class FlushBackoffType(Enum):
+    """
+    Used to control whether the sink should retry failed requests at all or 
with which kind back off
+    strategy.
+
+    :data: `CONSTANT`:
+
+    After every failure, it waits a configured time until the retries are 
exhausted.
+
+    :data: `EXPONENTIAL`:
+
+    After every failure, it waits initially the configured time and increases 
the waiting time
+    exponentially until the retries are exhausted.
+
+    :data: `NONE`:
+
+    The failure is not retried.
+    """
+
+    CONSTANT = 0,
+    EXPONENTIAL = 1,
+    NONE = 2,
+
+    def _to_j_flush_backoff_type(self):
+        JFlushBackoffType = get_gateway().jvm \
+            .org.apache.flink.connector.elasticsearch.sink.FlushBackoffType
+        return getattr(JFlushBackoffType, self.name)
+
+
+class ElasticsearchSinkBuilderBase(abc.ABC):
+    """
+    Base builder to construct a ElasticsearchSink.
+    """
+
+    @abc.abstractmethod
+    def __init__(self):
+        self._j_elasticsearch_sink_builder = None
+
+    def set_emitter(self, emitter_class_name: str):

Review Comment:
   This is not usable for Python users at all. There is no default 
implementations available for use. It means that Python users will have to 
write Java code to use ES connector.



##########
flink-python/pyflink/datastream/connectors/__init__.py:
##########
@@ -48,6 +50,10 @@
     'JdbcExecutionOptions',
     'NumberSequenceSource',
     'OutputFileConfig',
+    'ElasticsearchSink',
+    'Elasticsearch6SinkBuilder',
+    'Elasticsearch7SinkBuilder',
+    'FlushBackoffType',

Review Comment:
   I'm thinking about which classes should be kept here and which classes 
should be moved to the connector specific files(e.g. elasticsearch.py in this 
case). If all the classes are placed here, conflicts may happen that two 
classes belonging to two different connectors having the same name.
   
   There are two solutions in my mind:
   - All the classes added in the future are placed in connector specific files
   - Classes which could be considered as entrypoints(e.g. ElasticsearchSink, 
Elasticsearch6SinkBuilder and Elasticsearch7SinkBuilder here) are placed here 
and all the others(e.g. FlushBackoffType here) are placed in connector specific 
files.
   
   What's your thoughts?
   



##########
flink-python/pyflink/datastream/connectors/elasticsearch.py:
##########
@@ -0,0 +1,255 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import abc
+from enum import Enum
+from typing import List
+
+from pyflink.datastream.connectors import Sink, DeliveryGuarantee
+from pyflink.java_gateway import get_gateway
+from pyflink.util.java_utils import to_jarray, load_java_class
+
+
+class FlushBackoffType(Enum):
+    """
+    Used to control whether the sink should retry failed requests at all or 
with which kind back off
+    strategy.
+
+    :data: `CONSTANT`:
+
+    After every failure, it waits a configured time until the retries are 
exhausted.
+
+    :data: `EXPONENTIAL`:
+
+    After every failure, it waits initially the configured time and increases 
the waiting time
+    exponentially until the retries are exhausted.
+
+    :data: `NONE`:
+
+    The failure is not retried.
+    """
+
+    CONSTANT = 0,
+    EXPONENTIAL = 1,
+    NONE = 2,
+
+    def _to_j_flush_backoff_type(self):
+        JFlushBackoffType = get_gateway().jvm \
+            .org.apache.flink.connector.elasticsearch.sink.FlushBackoffType
+        return getattr(JFlushBackoffType, self.name)
+
+
+class ElasticsearchSinkBuilderBase(abc.ABC):
+    """
+    Base builder to construct a ElasticsearchSink.
+    """
+
+    @abc.abstractmethod
+    def __init__(self):
+        self._j_elasticsearch_sink_builder = None
+
+    def set_emitter(self, emitter_class_name: str):
+        """
+        Sets the emitter which is invoked on every record to convert it to 
Elasticsearch actions.
+        """
+        j_emitter = load_java_class(emitter_class_name).newInstance()
+        self._j_elasticsearch_sink_builder.setEmitter(j_emitter)
+        return self
+
+    @abc.abstractmethod
+    def get_http_host_class(self):
+        """
+        Gets the org.apache.http.HttpHost class which path is different in 
different Elasticsearch
+        version.
+        """
+        pass
+
+    def set_hosts(self, hosts: List[str]) -> 'ElasticsearchSinkBuilderBase':
+        """
+        Sets the hosts where the Elasticsearch cluster nodes are reachable.
+        """
+        JHttpHost = self.get_http_host_class()
+        j_http_hosts_list = [JHttpHost.create(x) for x in hosts]
+        j_http_hosts_array = to_jarray(JHttpHost, j_http_hosts_list)
+        self._j_elasticsearch_sink_builder.setHosts(j_http_hosts_array)
+        return self
+
+    def set_delivery_guarantee(self, delivery_guarantee: DeliveryGuarantee) \
+            -> 'ElasticsearchSinkBuilderBase':
+        """
+        Sets the wanted DeliveryGuarantee. The default delivery guarantee is 
DeliveryGuarantee#NONE
+        """
+        j_delivery_guarantee = delivery_guarantee._to_j_delivery_guarantee()
+        
self._j_elasticsearch_sink_builder.setDeliveryGuarantee(j_delivery_guarantee)
+        return self
+
+    def set_bulk_flush_max_actions(self, num_max_actions: int) -> 
'ElasticsearchSinkBuilderBase':
+        """
+        Sets the maximum number of actions to buffer for each bulk request. 
You can pass -1 to
+        disable it. The default flush size 1000.
+        """
+        
self._j_elasticsearch_sink_builder.setBulkFlushMaxActions(num_max_actions)
+        return self
+
+    def set_bulk_flush_max_size_mb(self, max_size_mb: int) -> 
'ElasticsearchSinkBuilderBase':
+        """
+        Sets the maximum size of buffered actions, in mb, per bulk request. 
You can pass -1 to
+        disable it.
+        """
+        self._j_elasticsearch_sink_builder.setBulkFlushMaxSizeMb(max_size_mb)
+        return self
+
+    def set_bulk_flush_interval(self, interval_millis: int) -> 
'ElasticsearchSinkBuilderBase':
+        """
+        Sets the bulk flush interval, in milliseconds. You can pass -1 to 
disable it.
+        """
+        
self._j_elasticsearch_sink_builder.setBulkFlushInterval(interval_millis)
+        return self
+
+    def set_bulk_flush_backoff_strategy(self,
+                                        flush_backoff_type: FlushBackoffType,
+                                        max_retries: int,
+                                        delay_millis: int) -> 
'ElasticsearchSinkBuilderBase':
+        """
+        Sets the type of back off to use when flushing bulk requests. The 
default bulk flush back
+        off type is FlushBackoffType#NONE.
+
+        Sets the amount of delay between each backoff attempt when flushing 
bulk requests, in
+        milliseconds.
+
+        Sets the maximum number of retries for a backoff attempt when flushing 
bulk requests.
+        """
+        self._j_elasticsearch_sink_builder.setBulkFlushBackoffStrategy(
+            flush_backoff_type._to_j_flush_backoff_type(), max_retries, 
delay_millis)
+        return self
+
+    def set_connection_username(self, username: str) -> 
'ElasticsearchSinkBuilderBase':
+        """
+        Sets the username used to authenticate the connection with the 
Elasticsearch cluster.
+        """
+        self._j_elasticsearch_sink_builder.setConnectionUsername(username)
+        return self
+
+    def set_connection_password(self, password: str) -> 
'ElasticsearchSinkBuilderBase':
+        """
+        Sets the password used to authenticate the connection with the 
Elasticsearch cluster.
+        """
+        self._j_elasticsearch_sink_builder.setConnectionPassword(password)
+        return self
+
+    def set_connection_path_prefix(self, prefix: str):

Review Comment:
   Missing return result type hint? Please also check the other places.



##########
flink-python/pyflink/datastream/connectors/elasticsearch.py:
##########
@@ -0,0 +1,255 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import abc
+from enum import Enum
+from typing import List
+
+from pyflink.datastream.connectors import Sink, DeliveryGuarantee
+from pyflink.java_gateway import get_gateway
+from pyflink.util.java_utils import to_jarray, load_java_class
+
+
+class FlushBackoffType(Enum):
+    """
+    Used to control whether the sink should retry failed requests at all or 
with which kind back off
+    strategy.
+
+    :data: `CONSTANT`:
+
+    After every failure, it waits a configured time until the retries are 
exhausted.
+
+    :data: `EXPONENTIAL`:
+
+    After every failure, it waits initially the configured time and increases 
the waiting time
+    exponentially until the retries are exhausted.
+
+    :data: `NONE`:
+
+    The failure is not retried.
+    """
+
+    CONSTANT = 0,
+    EXPONENTIAL = 1,
+    NONE = 2,
+
+    def _to_j_flush_backoff_type(self):
+        JFlushBackoffType = get_gateway().jvm \
+            .org.apache.flink.connector.elasticsearch.sink.FlushBackoffType
+        return getattr(JFlushBackoffType, self.name)
+
+
+class ElasticsearchSinkBuilderBase(abc.ABC):
+    """
+    Base builder to construct a ElasticsearchSink.
+    """
+
+    @abc.abstractmethod
+    def __init__(self):
+        self._j_elasticsearch_sink_builder = None
+
+    def set_emitter(self, emitter_class_name: str):
+        """
+        Sets the emitter which is invoked on every record to convert it to 
Elasticsearch actions.
+        """
+        j_emitter = load_java_class(emitter_class_name).newInstance()
+        self._j_elasticsearch_sink_builder.setEmitter(j_emitter)
+        return self
+
+    @abc.abstractmethod
+    def get_http_host_class(self):
+        """
+        Gets the org.apache.http.HttpHost class which path is different in 
different Elasticsearch
+        version.
+        """
+        pass
+
+    def set_hosts(self, hosts: List[str]) -> 'ElasticsearchSinkBuilderBase':

Review Comment:
   ```suggestion
       def set_hosts(self, hosts: Union[str, List[str]]) -> 
'ElasticsearchSinkBuilderBase':
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to