deadwind4 commented on code in PR #19682:
URL: https://github.com/apache/flink/pull/19682#discussion_r871173891


##########
flink-python/pyflink/datastream/connectors.py:
##########
@@ -1449,6 +1455,314 @@ def build(self) -> 'PulsarSource':
         return PulsarSource(self._j_pulsar_source_builder.build())
 
 
+class DeliveryGuarantee(Enum):
+    """
+    DeliverGuarantees that can be chosen. In general your pipeline can only 
offer the lowest
+    delivery guarantee which is supported by your sources and sinks.
+
+    :data: `EXACTLY_ONCE`:
+    Records are only delivered exactly-once also under failover scenarios. To 
build a complete
+    exactly-once pipeline is required that the source and sink support 
exactly-once and are
+    properly configured.
+
+    :data: `AT_LEAST_ONCE`:
+    Records are ensured to be delivered but it may happen that the same record 
is delivered
+    multiple times. Usually, this guarantee is faster than the exactly-once 
delivery.
+
+    :data: `NONE`:
+    Records are delivered on a best effort basis. It is often the fastest way 
to process records
+    but it may happen that records are lost or duplicated.
+    """
+
+    EXACTLY_ONCE = 0,
+    AT_LEAST_ONCE = 1,
+    NONE = 2
+
+    def _to_j_delivery_guarantee(self):
+        JDeliveryGuarantee = get_gateway().jvm \
+            .org.apache.flink.connector.base.DeliveryGuarantee
+        return getattr(JDeliveryGuarantee, self.name)
+
+
+class PulsarSerializationSchema(object):
+    """
+    The serialization schema for how to serialize records into Pulsar.
+    """
+
+    def __init__(self, _j_pulsar_serialization_schema):
+        self._j_pulsar_serialization_schema = _j_pulsar_serialization_schema
+
+    @staticmethod
+    def flink_schema(serialization_schema: SerializationSchema) \
+            -> 'PulsarSerializationSchema':
+        """
+        Create a PulsarSerializationSchema by using the flink's 
SerializationSchema. It would
+        serialize the message into byte array and send it to Pulsar with 
Schema#BYTES.
+        """
+        JPulsarSerializationSchema = get_gateway().jvm.org.apache.flink \
+            .connector.pulsar.sink.writer.serializer.PulsarSerializationSchema
+        _j_pulsar_serialization_schema = 
JPulsarSerializationSchema.flinkSchema(
+            serialization_schema._j_serialization_schema)
+        return PulsarSerializationSchema(_j_pulsar_serialization_schema)
+
+
+class TopicRoutingMode(Enum):
+    """
+    The routing policy for choosing the desired topic by the given message.
+
+    :data: `ROUND_ROBIN`:
+    The producer will publish messages across all partitions in a round-robin 
fashion to achieve
+    maximum throughput. Please note that round-robin is not done per 
individual message but
+    rather it's set to the same boundary of batching delay, to ensure batching 
is effective.
+
+    :data: `MESSAGE_KEY_HASH`:
+    If no key is provided, The partitioned producer will randomly pick one 
single topic partition
+    and publish all the messages into that partition. If a key is provided on 
the message, the
+    partitioned producer will hash the key and assign the message to a 
particular partition.
+
+    :data: `CUSTOM`:
+    Use custom topic router implementation that will be called to determine 
the partition for a
+    particular message.
+    """
+
+    ROUND_ROBIN = 0
+    MESSAGE_KEY_HASH = 1
+    CUSTOM = 2
+
+    def _to_j_topic_routing_mode(self):
+        JTopicRoutingMode = get_gateway().jvm \
+            
.org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode
+        return getattr(JTopicRoutingMode, self.name)
+
+
+class MessageDelayer(object):
+    """
+    A delayer for Pulsar broker passing the sent message to the downstream 
consumer. This is only
+    works in {@link SubscriptionType#Shared} subscription.
+
+    Read delayed message delivery
+    
https://pulsar.apache.org/docs/en/next/concepts-messaging/#delayed-message-delivery
 for better
+    understanding this feature.
+    """
+    def __init__(self, _j_message_delayer):
+        self._j_message_delayer = _j_message_delayer
+
+    @staticmethod
+    def never() -> 'MessageDelayer':
+        """
+        All the messages should be consumed immediately.
+        """
+        JMessageDelayer = get_gateway().jvm \
+            
.org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer
+        return MessageDelayer(JMessageDelayer.never())
+
+    @staticmethod
+    def fixed(duration: Duration) -> 'MessageDelayer':
+        """
+        All the messages should be consumed in a fixed duration.
+        """
+        JMessageDelayer = get_gateway().jvm \
+            
.org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer
+        return MessageDelayer(JMessageDelayer.fixed(duration._j_duration))
+
+
+class PulsarSink(Sink):
+    """
+    The Sink implementation of Pulsar. Please use a PulsarSinkBuilder to 
construct a
+    PulsarSink. The following example shows how to create a PulsarSink 
receiving records of
+    String type.
+
+    Example:
+    ::
+
+        >>> sink = PulsarSink() \\
+        ...     .builder() \\
+        ...     .set_service_url(PULSAR_BROKER_URL) \\
+        ...     .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
+        ...     .set_topics(topic) \\
+        ...     .set_serialization_schema(
+        ...         
PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \\
+        ...     .build()
+
+    The sink supports all delivery guarantees described by DeliveryGuarantee.
+
+    DeliveryGuarantee#NONE does not provide any guarantees: messages may be 
lost in
+    case of issues on the Pulsar broker and messages may be duplicated in case 
of a Flink
+    failure.
+
+    DeliveryGuarantee#AT_LEAST_ONCE the sink will wait for all outstanding 
records in
+    the Pulsar buffers to be acknowledged by the Pulsar producer on a 
checkpoint. No messages
+    will be lost in case of any issue with the Pulsar brokers but messages may 
be duplicated
+    when Flink restarts.
+
+    DeliveryGuarantee#EXACTLY_ONCE: In this mode the PulsarSink will write all 
messages
+    in a Pulsar transaction that will be committed to Pulsar on a checkpoint. 
Thus, no
+    duplicates will be seen in case of a Flink restart. However, this delays 
record writing
+     effectively until a checkpoint is written, so adjust the checkpoint 
duration accordingly.
+    Additionally, it is highly recommended to tweak Pulsar transaction timeout 
(link) >>
+    maximum checkpoint duration + maximum restart duration or data loss may 
happen when Pulsar
+    expires an uncommitted transaction.
+
+    See PulsarSinkBuilder for more details.
+    """
+
+    def __init__(self, j_pulsar_sink):
+        super(PulsarSink, self).__init__(sink=j_pulsar_sink)
+
+    @staticmethod
+    def builder() -> 'PulsarSinkBuilder':
+        """
+        Get a PulsarSinkBuilder to builder a PulsarSink.
+        """
+        return PulsarSinkBuilder()
+
+
+class PulsarSinkBuilder(object):
+    """
+    The builder class for PulsarSink to make it easier for the users to 
construct a PulsarSink.
+
+    The following example shows the minimum setup to create a PulsarSink that 
reads the String
+    values from a Pulsar topic.
+
+    Example:
+    ::
+
+        >>> sink = PulsarSink() \\
+        ...     .builder() \\
+        ...     .set_service_url(PULSAR_BROKER_URL) \\
+        ...     .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
+        ...     .set_topics([TOPIC1, TOPIC2]) \\
+        ...     .set_serialization_schema(
+        ...         
PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \\
+        ...     .build()
+
+    The service url, admin url, and the record serializer are required fields 
that must be set. If
+    you don't set the topics, make sure you have provided a custom 
TopicRouter. Otherwise,
+    you must provide the topics to produce.
+
+    To specify the delivery guarantees of PulsarSink, one can call
+    #setDeliveryGuarantee(DeliveryGuarantee). The default value of the 
delivery guarantee is
+    DeliveryGuarantee#NONE, and it wouldn't promise the consistence when write 
the message into
+    Pulsar.
+
+    Example:
+    ::
+
+        >>> sink = PulsarSink() \\
+        ...     .builder() \\
+        ...     .set_service_url(PULSAR_BROKER_URL) \\
+        ...     .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
+        ...     .set_topics([TOPIC1, TOPIC2]) \\
+        ...     .set_serialization_schema(
+        ...         
PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \\
+        ...     .set_delivery_guarantee(DeliveryGuarantee.EXACTLY_ONCE)
+        ...     .build()
+    """
+
+    def __init__(self):
+        JPulsarSink = 
get_gateway().jvm.org.apache.flink.connector.pulsar.sink.PulsarSink
+        self._j_pulsar_sink_builder = JPulsarSink.builder()
+
+    def set_admin_url(self, admin_url: str) -> 'PulsarSinkBuilder':
+        """
+        Sets the admin endpoint for the PulsarAdmin of the PulsarSink.
+        """
+        self._j_pulsar_sink_builder.setAdminUrl(admin_url)
+        return self
+
+    def set_service_url(self, service_url: str) -> 'PulsarSinkBuilder':
+        """
+        Sets the server's link for the PulsarProducer of the PulsarSink.
+        """
+        self._j_pulsar_sink_builder.setServiceUrl(service_url)
+        return self
+
+    def set_producer_name(self, producer_name: str) -> 'PulsarSinkBuilder':
+        """
+        The producer name is informative, and it can be used to identify a 
particular producer
+        instance from the topic stats.
+        """
+        self._j_pulsar_sink_builder.setProducerName(producer_name)
+        return self
+
+    def set_topics(self, topics: Union[str, List[str]]) -> 'PulsarSinkBuilder':
+        """
+        Set a pulsar topic list for flink sink. Some topic may not exist 
currently, write to this
+        non-existed topic wouldn't throw any exception.
+        """
+        if not isinstance(topics, list):
+            topics = [topics]
+        self._j_pulsar_sink_builder.setTopics(topics)
+        return self
+
+    def set_delivery_guarantee(self, delivery_guarantee: DeliveryGuarantee) -> 
'PulsarSinkBuilder':
+        """
+        Sets the wanted the DeliveryGuarantee. The default delivery guarantee 
is
+        DeliveryGuarantee#NONE.
+        """
+        self._j_pulsar_sink_builder.setDeliveryGuarantee(
+            delivery_guarantee._to_j_delivery_guarantee())
+        return self
+
+    def set_topic_routing_mode(self, topic_routing_mode: TopicRoutingMode) -> 
'PulsarSinkBuilder':
+        """
+        Set a routing mode for choosing right topic partition to send messages.
+        """
+        self._j_pulsar_sink_builder.setTopicRoutingMode(
+            topic_routing_mode._to_j_topic_routing_mode())
+        return self
+
+    def set_serialization_schema(self, pulsar_serialization_schema: 
PulsarSerializationSchema) \
+            -> 'PulsarSinkBuilder':
+        """
+        Sets the PulsarSerializationSchema that transforms incoming records to 
bytes.
+        """
+        self._j_pulsar_sink_builder.setSerializationSchema(
+            pulsar_serialization_schema._j_pulsar_serialization_schema)
+        return self
+
+    def enable_schema_evolution(self) \
+            -> 'PulsarSinkBuilder':
+        """
+        If you enable this option, we would consume and deserialize the 
message by using Pulsar
+        Schema.
+        """
+        self._j_pulsar_sink_builder.enableSchemaEvolution()
+        return self
+
+    def delay_sending_message(self, message_delayer: MessageDelayer):
+        """
+        Set a message delayer for enable Pulsar message delay delivery.
+        """
+        
self._j_pulsar_sink_builder.delaySendingMessage(message_delayer._j_message_delayer)
+        return self
+
+    def set_config(self, key: ConfigOption, value) -> 'PulsarSinkBuilder':

Review Comment:
   I will add PulsarSinkOptions class for user-friendliness.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to