deadwind4 commented on code in PR #19682:
URL: https://github.com/apache/flink/pull/19682#discussion_r872131355


##########
flink-python/pyflink/datastream/connectors.py:
##########
@@ -1449,6 +1455,314 @@ def build(self) -> 'PulsarSource':
         return PulsarSource(self._j_pulsar_source_builder.build())
 
 
+class DeliveryGuarantee(Enum):
+    """
+    DeliverGuarantees that can be chosen. In general your pipeline can only 
offer the lowest
+    delivery guarantee which is supported by your sources and sinks.
+
+    :data: `EXACTLY_ONCE`:
+    Records are only delivered exactly-once also under failover scenarios. To 
build a complete
+    exactly-once pipeline is required that the source and sink support 
exactly-once and are
+    properly configured.
+
+    :data: `AT_LEAST_ONCE`:
+    Records are ensured to be delivered but it may happen that the same record 
is delivered
+    multiple times. Usually, this guarantee is faster than the exactly-once 
delivery.
+
+    :data: `NONE`:
+    Records are delivered on a best effort basis. It is often the fastest way 
to process records
+    but it may happen that records are lost or duplicated.
+    """
+
+    EXACTLY_ONCE = 0,
+    AT_LEAST_ONCE = 1,
+    NONE = 2
+
+    def _to_j_delivery_guarantee(self):
+        JDeliveryGuarantee = get_gateway().jvm \
+            .org.apache.flink.connector.base.DeliveryGuarantee
+        return getattr(JDeliveryGuarantee, self.name)
+
+
+class PulsarSerializationSchema(object):
+    """
+    The serialization schema for how to serialize records into Pulsar.
+    """
+
+    def __init__(self, _j_pulsar_serialization_schema):
+        self._j_pulsar_serialization_schema = _j_pulsar_serialization_schema
+
+    @staticmethod
+    def flink_schema(serialization_schema: SerializationSchema) \

Review Comment:
   Almost all PulsarSchema need schema info (POJO or sth), It's difficult to 
use in Python API. IMO, we can temporarily support only flink_schema. Users can 
use StringSchema and realize serialization by themselves or use 
`JsonRowDeserializationSchema` to deal with JSON.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to