[GitHub] [flink] deadwind4 commented on a diff in pull request #19682: [FLINK-25795][python][connector/pulsar] Add pulsar sink DataStream API

2022-05-12 Thread GitBox


deadwind4 commented on code in PR #19682:
URL: https://github.com/apache/flink/pull/19682#discussion_r871110043


##
flink-python/pyflink/datastream/connectors.py:
##
@@ -1449,6 +1455,314 @@ def build(self) -> 'PulsarSource':
 return PulsarSource(self._j_pulsar_source_builder.build())
 
 
+class DeliveryGuarantee(Enum):
+"""
+DeliverGuarantees that can be chosen. In general your pipeline can only 
offer the lowest
+delivery guarantee which is supported by your sources and sinks.
+
+:data: `EXACTLY_ONCE`:
+Records are only delivered exactly-once also under failover scenarios. To 
build a complete
+exactly-once pipeline is required that the source and sink support 
exactly-once and are
+properly configured.
+
+:data: `AT_LEAST_ONCE`:
+Records are ensured to be delivered but it may happen that the same record 
is delivered
+multiple times. Usually, this guarantee is faster than the exactly-once 
delivery.
+
+:data: `NONE`:
+Records are delivered on a best effort basis. It is often the fastest way 
to process records
+but it may happen that records are lost or duplicated.
+"""
+
+EXACTLY_ONCE = 0,
+AT_LEAST_ONCE = 1,
+NONE = 2
+
+def _to_j_delivery_guarantee(self):
+JDeliveryGuarantee = get_gateway().jvm \
+.org.apache.flink.connector.base.DeliveryGuarantee
+return getattr(JDeliveryGuarantee, self.name)
+
+
+class PulsarSerializationSchema(object):
+"""
+The serialization schema for how to serialize records into Pulsar.
+"""
+
+def __init__(self, _j_pulsar_serialization_schema):
+self._j_pulsar_serialization_schema = _j_pulsar_serialization_schema
+
+@staticmethod
+def flink_schema(serialization_schema: SerializationSchema) \
+-> 'PulsarSerializationSchema':
+"""
+Create a PulsarSerializationSchema by using the flink's 
SerializationSchema. It would
+serialize the message into byte array and send it to Pulsar with 
Schema#BYTES.
+"""
+JPulsarSerializationSchema = get_gateway().jvm.org.apache.flink \
+.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema
+_j_pulsar_serialization_schema = 
JPulsarSerializationSchema.flinkSchema(
+serialization_schema._j_serialization_schema)
+return PulsarSerializationSchema(_j_pulsar_serialization_schema)
+
+
+class TopicRoutingMode(Enum):
+"""
+The routing policy for choosing the desired topic by the given message.
+
+:data: `ROUND_ROBIN`:
+The producer will publish messages across all partitions in a round-robin 
fashion to achieve
+maximum throughput. Please note that round-robin is not done per 
individual message but
+rather it's set to the same boundary of batching delay, to ensure batching 
is effective.
+
+:data: `MESSAGE_KEY_HASH`:
+If no key is provided, The partitioned producer will randomly pick one 
single topic partition
+and publish all the messages into that partition. If a key is provided on 
the message, the
+partitioned producer will hash the key and assign the message to a 
particular partition.
+
+:data: `CUSTOM`:
+Use custom topic router implementation that will be called to determine 
the partition for a
+particular message.
+"""
+
+ROUND_ROBIN = 0
+MESSAGE_KEY_HASH = 1
+CUSTOM = 2
+
+def _to_j_topic_routing_mode(self):
+JTopicRoutingMode = get_gateway().jvm \
+
.org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode
+return getattr(JTopicRoutingMode, self.name)
+
+
+class MessageDelayer(object):
+"""
+A delayer for Pulsar broker passing the sent message to the downstream 
consumer. This is only
+works in {@link SubscriptionType#Shared} subscription.
+
+Read delayed message delivery
+
https://pulsar.apache.org/docs/en/next/concepts-messaging/#delayed-message-delivery
 for better
+understanding this feature.
+"""
+def __init__(self, _j_message_delayer):
+self._j_message_delayer = _j_message_delayer
+
+@staticmethod
+def never() -> 'MessageDelayer':
+"""
+All the messages should be consumed immediately.
+"""
+JMessageDelayer = get_gateway().jvm \
+
.org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer
+return MessageDelayer(JMessageDelayer.never())
+
+@staticmethod
+def fixed(duration: Duration) -> 'MessageDelayer':
+"""
+All the messages should be consumed in a fixed duration.
+"""
+JMessageDelayer = get_gateway().jvm \
+
.org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer
+return MessageDelayer(JMessageDelayer.fixed(duration._j_duration))
+
+
+class PulsarSink(Sink):
+"""
+The Sink implementation of Pulsar. Please use a PulsarSinkBuilder to 
construct a
+PulsarSink. The following ex

[GitHub] [flink] deadwind4 commented on a diff in pull request #19682: [FLINK-25795][python][connector/pulsar] Add pulsar sink DataStream API

2022-05-12 Thread GitBox


deadwind4 commented on code in PR #19682:
URL: https://github.com/apache/flink/pull/19682#discussion_r871173891


##
flink-python/pyflink/datastream/connectors.py:
##
@@ -1449,6 +1455,314 @@ def build(self) -> 'PulsarSource':
 return PulsarSource(self._j_pulsar_source_builder.build())
 
 
+class DeliveryGuarantee(Enum):
+"""
+DeliverGuarantees that can be chosen. In general your pipeline can only 
offer the lowest
+delivery guarantee which is supported by your sources and sinks.
+
+:data: `EXACTLY_ONCE`:
+Records are only delivered exactly-once also under failover scenarios. To 
build a complete
+exactly-once pipeline is required that the source and sink support 
exactly-once and are
+properly configured.
+
+:data: `AT_LEAST_ONCE`:
+Records are ensured to be delivered but it may happen that the same record 
is delivered
+multiple times. Usually, this guarantee is faster than the exactly-once 
delivery.
+
+:data: `NONE`:
+Records are delivered on a best effort basis. It is often the fastest way 
to process records
+but it may happen that records are lost or duplicated.
+"""
+
+EXACTLY_ONCE = 0,
+AT_LEAST_ONCE = 1,
+NONE = 2
+
+def _to_j_delivery_guarantee(self):
+JDeliveryGuarantee = get_gateway().jvm \
+.org.apache.flink.connector.base.DeliveryGuarantee
+return getattr(JDeliveryGuarantee, self.name)
+
+
+class PulsarSerializationSchema(object):
+"""
+The serialization schema for how to serialize records into Pulsar.
+"""
+
+def __init__(self, _j_pulsar_serialization_schema):
+self._j_pulsar_serialization_schema = _j_pulsar_serialization_schema
+
+@staticmethod
+def flink_schema(serialization_schema: SerializationSchema) \
+-> 'PulsarSerializationSchema':
+"""
+Create a PulsarSerializationSchema by using the flink's 
SerializationSchema. It would
+serialize the message into byte array and send it to Pulsar with 
Schema#BYTES.
+"""
+JPulsarSerializationSchema = get_gateway().jvm.org.apache.flink \
+.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema
+_j_pulsar_serialization_schema = 
JPulsarSerializationSchema.flinkSchema(
+serialization_schema._j_serialization_schema)
+return PulsarSerializationSchema(_j_pulsar_serialization_schema)
+
+
+class TopicRoutingMode(Enum):
+"""
+The routing policy for choosing the desired topic by the given message.
+
+:data: `ROUND_ROBIN`:
+The producer will publish messages across all partitions in a round-robin 
fashion to achieve
+maximum throughput. Please note that round-robin is not done per 
individual message but
+rather it's set to the same boundary of batching delay, to ensure batching 
is effective.
+
+:data: `MESSAGE_KEY_HASH`:
+If no key is provided, The partitioned producer will randomly pick one 
single topic partition
+and publish all the messages into that partition. If a key is provided on 
the message, the
+partitioned producer will hash the key and assign the message to a 
particular partition.
+
+:data: `CUSTOM`:
+Use custom topic router implementation that will be called to determine 
the partition for a
+particular message.
+"""
+
+ROUND_ROBIN = 0
+MESSAGE_KEY_HASH = 1
+CUSTOM = 2
+
+def _to_j_topic_routing_mode(self):
+JTopicRoutingMode = get_gateway().jvm \
+
.org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode
+return getattr(JTopicRoutingMode, self.name)
+
+
+class MessageDelayer(object):
+"""
+A delayer for Pulsar broker passing the sent message to the downstream 
consumer. This is only
+works in {@link SubscriptionType#Shared} subscription.
+
+Read delayed message delivery
+
https://pulsar.apache.org/docs/en/next/concepts-messaging/#delayed-message-delivery
 for better
+understanding this feature.
+"""
+def __init__(self, _j_message_delayer):
+self._j_message_delayer = _j_message_delayer
+
+@staticmethod
+def never() -> 'MessageDelayer':
+"""
+All the messages should be consumed immediately.
+"""
+JMessageDelayer = get_gateway().jvm \
+
.org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer
+return MessageDelayer(JMessageDelayer.never())
+
+@staticmethod
+def fixed(duration: Duration) -> 'MessageDelayer':
+"""
+All the messages should be consumed in a fixed duration.
+"""
+JMessageDelayer = get_gateway().jvm \
+
.org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer
+return MessageDelayer(JMessageDelayer.fixed(duration._j_duration))
+
+
+class PulsarSink(Sink):
+"""
+The Sink implementation of Pulsar. Please use a PulsarSinkBuilder to 
construct a
+PulsarSink. The following ex

[GitHub] [flink] deadwind4 commented on a diff in pull request #19682: [FLINK-25795][python][connector/pulsar] Add pulsar sink DataStream API

2022-05-13 Thread GitBox


deadwind4 commented on code in PR #19682:
URL: https://github.com/apache/flink/pull/19682#discussion_r872081349


##
flink-python/pyflink/datastream/connectors.py:
##
@@ -1449,6 +1455,314 @@ def build(self) -> 'PulsarSource':
 return PulsarSource(self._j_pulsar_source_builder.build())
 
 
+class DeliveryGuarantee(Enum):
+"""
+DeliverGuarantees that can be chosen. In general your pipeline can only 
offer the lowest
+delivery guarantee which is supported by your sources and sinks.
+
+:data: `EXACTLY_ONCE`:
+Records are only delivered exactly-once also under failover scenarios. To 
build a complete
+exactly-once pipeline is required that the source and sink support 
exactly-once and are
+properly configured.
+
+:data: `AT_LEAST_ONCE`:
+Records are ensured to be delivered but it may happen that the same record 
is delivered
+multiple times. Usually, this guarantee is faster than the exactly-once 
delivery.
+
+:data: `NONE`:
+Records are delivered on a best effort basis. It is often the fastest way 
to process records
+but it may happen that records are lost or duplicated.
+"""
+
+EXACTLY_ONCE = 0,
+AT_LEAST_ONCE = 1,
+NONE = 2
+
+def _to_j_delivery_guarantee(self):
+JDeliveryGuarantee = get_gateway().jvm \
+.org.apache.flink.connector.base.DeliveryGuarantee
+return getattr(JDeliveryGuarantee, self.name)
+
+
+class PulsarSerializationSchema(object):
+"""
+The serialization schema for how to serialize records into Pulsar.
+"""
+
+def __init__(self, _j_pulsar_serialization_schema):
+self._j_pulsar_serialization_schema = _j_pulsar_serialization_schema
+
+@staticmethod
+def flink_schema(serialization_schema: SerializationSchema) \
+-> 'PulsarSerializationSchema':
+"""
+Create a PulsarSerializationSchema by using the flink's 
SerializationSchema. It would
+serialize the message into byte array and send it to Pulsar with 
Schema#BYTES.
+"""
+JPulsarSerializationSchema = get_gateway().jvm.org.apache.flink \
+.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema
+_j_pulsar_serialization_schema = 
JPulsarSerializationSchema.flinkSchema(
+serialization_schema._j_serialization_schema)
+return PulsarSerializationSchema(_j_pulsar_serialization_schema)
+
+
+class TopicRoutingMode(Enum):
+"""
+The routing policy for choosing the desired topic by the given message.
+
+:data: `ROUND_ROBIN`:
+The producer will publish messages across all partitions in a round-robin 
fashion to achieve
+maximum throughput. Please note that round-robin is not done per 
individual message but
+rather it's set to the same boundary of batching delay, to ensure batching 
is effective.
+
+:data: `MESSAGE_KEY_HASH`:
+If no key is provided, The partitioned producer will randomly pick one 
single topic partition
+and publish all the messages into that partition. If a key is provided on 
the message, the
+partitioned producer will hash the key and assign the message to a 
particular partition.
+
+:data: `CUSTOM`:
+Use custom topic router implementation that will be called to determine 
the partition for a
+particular message.
+"""
+
+ROUND_ROBIN = 0
+MESSAGE_KEY_HASH = 1
+CUSTOM = 2
+
+def _to_j_topic_routing_mode(self):
+JTopicRoutingMode = get_gateway().jvm \
+
.org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode
+return getattr(JTopicRoutingMode, self.name)
+
+
+class MessageDelayer(object):
+"""
+A delayer for Pulsar broker passing the sent message to the downstream 
consumer. This is only
+works in {@link SubscriptionType#Shared} subscription.
+
+Read delayed message delivery
+
https://pulsar.apache.org/docs/en/next/concepts-messaging/#delayed-message-delivery
 for better
+understanding this feature.
+"""
+def __init__(self, _j_message_delayer):
+self._j_message_delayer = _j_message_delayer
+
+@staticmethod
+def never() -> 'MessageDelayer':
+"""
+All the messages should be consumed immediately.
+"""
+JMessageDelayer = get_gateway().jvm \
+
.org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer
+return MessageDelayer(JMessageDelayer.never())
+
+@staticmethod
+def fixed(duration: Duration) -> 'MessageDelayer':
+"""
+All the messages should be consumed in a fixed duration.
+"""
+JMessageDelayer = get_gateway().jvm \
+
.org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer
+return MessageDelayer(JMessageDelayer.fixed(duration._j_duration))
+
+
+class PulsarSink(Sink):
+"""
+The Sink implementation of Pulsar. Please use a PulsarSinkBuilder to 
construct a
+PulsarSink. The following ex

[GitHub] [flink] deadwind4 commented on a diff in pull request #19682: [FLINK-25795][python][connector/pulsar] Add pulsar sink DataStream API

2022-05-13 Thread GitBox


deadwind4 commented on code in PR #19682:
URL: https://github.com/apache/flink/pull/19682#discussion_r872131355


##
flink-python/pyflink/datastream/connectors.py:
##
@@ -1449,6 +1455,314 @@ def build(self) -> 'PulsarSource':
 return PulsarSource(self._j_pulsar_source_builder.build())
 
 
+class DeliveryGuarantee(Enum):
+"""
+DeliverGuarantees that can be chosen. In general your pipeline can only 
offer the lowest
+delivery guarantee which is supported by your sources and sinks.
+
+:data: `EXACTLY_ONCE`:
+Records are only delivered exactly-once also under failover scenarios. To 
build a complete
+exactly-once pipeline is required that the source and sink support 
exactly-once and are
+properly configured.
+
+:data: `AT_LEAST_ONCE`:
+Records are ensured to be delivered but it may happen that the same record 
is delivered
+multiple times. Usually, this guarantee is faster than the exactly-once 
delivery.
+
+:data: `NONE`:
+Records are delivered on a best effort basis. It is often the fastest way 
to process records
+but it may happen that records are lost or duplicated.
+"""
+
+EXACTLY_ONCE = 0,
+AT_LEAST_ONCE = 1,
+NONE = 2
+
+def _to_j_delivery_guarantee(self):
+JDeliveryGuarantee = get_gateway().jvm \
+.org.apache.flink.connector.base.DeliveryGuarantee
+return getattr(JDeliveryGuarantee, self.name)
+
+
+class PulsarSerializationSchema(object):
+"""
+The serialization schema for how to serialize records into Pulsar.
+"""
+
+def __init__(self, _j_pulsar_serialization_schema):
+self._j_pulsar_serialization_schema = _j_pulsar_serialization_schema
+
+@staticmethod
+def flink_schema(serialization_schema: SerializationSchema) \

Review Comment:
   Almost all PulsarSchema need schema info (POJO or sth). IMO, we can 
temporarily support only flink_schema. Users can use StringSchema and realize 
serialization by themselves or use `JsonRowDeserializationSchema` to deal with 
JSON.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] deadwind4 commented on a diff in pull request #19682: [FLINK-25795][python][connector/pulsar] Add pulsar sink DataStream API

2022-05-13 Thread GitBox


deadwind4 commented on code in PR #19682:
URL: https://github.com/apache/flink/pull/19682#discussion_r872131355


##
flink-python/pyflink/datastream/connectors.py:
##
@@ -1449,6 +1455,314 @@ def build(self) -> 'PulsarSource':
 return PulsarSource(self._j_pulsar_source_builder.build())
 
 
+class DeliveryGuarantee(Enum):
+"""
+DeliverGuarantees that can be chosen. In general your pipeline can only 
offer the lowest
+delivery guarantee which is supported by your sources and sinks.
+
+:data: `EXACTLY_ONCE`:
+Records are only delivered exactly-once also under failover scenarios. To 
build a complete
+exactly-once pipeline is required that the source and sink support 
exactly-once and are
+properly configured.
+
+:data: `AT_LEAST_ONCE`:
+Records are ensured to be delivered but it may happen that the same record 
is delivered
+multiple times. Usually, this guarantee is faster than the exactly-once 
delivery.
+
+:data: `NONE`:
+Records are delivered on a best effort basis. It is often the fastest way 
to process records
+but it may happen that records are lost or duplicated.
+"""
+
+EXACTLY_ONCE = 0,
+AT_LEAST_ONCE = 1,
+NONE = 2
+
+def _to_j_delivery_guarantee(self):
+JDeliveryGuarantee = get_gateway().jvm \
+.org.apache.flink.connector.base.DeliveryGuarantee
+return getattr(JDeliveryGuarantee, self.name)
+
+
+class PulsarSerializationSchema(object):
+"""
+The serialization schema for how to serialize records into Pulsar.
+"""
+
+def __init__(self, _j_pulsar_serialization_schema):
+self._j_pulsar_serialization_schema = _j_pulsar_serialization_schema
+
+@staticmethod
+def flink_schema(serialization_schema: SerializationSchema) \

Review Comment:
   Almost all PulsarSchema need schema info (POJO or sth), It's difficult to 
use in Python API. IMO, we can temporarily support only flink_schema. Users can 
use StringSchema and realize serialization by themselves or use 
`JsonRowDeserializationSchema` to deal with JSON.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] deadwind4 commented on a diff in pull request #19682: [FLINK-25795][python][connector/pulsar] Add pulsar sink DataStream API

2022-05-13 Thread GitBox


deadwind4 commented on code in PR #19682:
URL: https://github.com/apache/flink/pull/19682#discussion_r872131355


##
flink-python/pyflink/datastream/connectors.py:
##
@@ -1449,6 +1455,314 @@ def build(self) -> 'PulsarSource':
 return PulsarSource(self._j_pulsar_source_builder.build())
 
 
+class DeliveryGuarantee(Enum):
+"""
+DeliverGuarantees that can be chosen. In general your pipeline can only 
offer the lowest
+delivery guarantee which is supported by your sources and sinks.
+
+:data: `EXACTLY_ONCE`:
+Records are only delivered exactly-once also under failover scenarios. To 
build a complete
+exactly-once pipeline is required that the source and sink support 
exactly-once and are
+properly configured.
+
+:data: `AT_LEAST_ONCE`:
+Records are ensured to be delivered but it may happen that the same record 
is delivered
+multiple times. Usually, this guarantee is faster than the exactly-once 
delivery.
+
+:data: `NONE`:
+Records are delivered on a best effort basis. It is often the fastest way 
to process records
+but it may happen that records are lost or duplicated.
+"""
+
+EXACTLY_ONCE = 0,
+AT_LEAST_ONCE = 1,
+NONE = 2
+
+def _to_j_delivery_guarantee(self):
+JDeliveryGuarantee = get_gateway().jvm \
+.org.apache.flink.connector.base.DeliveryGuarantee
+return getattr(JDeliveryGuarantee, self.name)
+
+
+class PulsarSerializationSchema(object):
+"""
+The serialization schema for how to serialize records into Pulsar.
+"""
+
+def __init__(self, _j_pulsar_serialization_schema):
+self._j_pulsar_serialization_schema = _j_pulsar_serialization_schema
+
+@staticmethod
+def flink_schema(serialization_schema: SerializationSchema) \

Review Comment:
   Almost all PulsarSchema need schema info (POJO or sth), It's challenging to 
use in Python API. IMO, we can temporarily support only flink_schema. Users can 
use StringSchema and realize serialization by themselves or use 
`JsonRowDeserializationSchema` to deal with JSON.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] deadwind4 commented on a diff in pull request #19682: [FLINK-25795][python][connector/pulsar] Add pulsar sink DataStream API

2022-05-13 Thread GitBox


deadwind4 commented on code in PR #19682:
URL: https://github.com/apache/flink/pull/19682#discussion_r872299864


##
flink-python/pyflink/datastream/connectors.py:
##
@@ -1449,6 +1455,314 @@ def build(self) -> 'PulsarSource':
 return PulsarSource(self._j_pulsar_source_builder.build())
 
 
+class DeliveryGuarantee(Enum):
+"""
+DeliverGuarantees that can be chosen. In general your pipeline can only 
offer the lowest
+delivery guarantee which is supported by your sources and sinks.
+
+:data: `EXACTLY_ONCE`:
+Records are only delivered exactly-once also under failover scenarios. To 
build a complete
+exactly-once pipeline is required that the source and sink support 
exactly-once and are
+properly configured.
+
+:data: `AT_LEAST_ONCE`:
+Records are ensured to be delivered but it may happen that the same record 
is delivered
+multiple times. Usually, this guarantee is faster than the exactly-once 
delivery.
+
+:data: `NONE`:
+Records are delivered on a best effort basis. It is often the fastest way 
to process records
+but it may happen that records are lost or duplicated.
+"""
+
+EXACTLY_ONCE = 0,
+AT_LEAST_ONCE = 1,
+NONE = 2
+
+def _to_j_delivery_guarantee(self):
+JDeliveryGuarantee = get_gateway().jvm \
+.org.apache.flink.connector.base.DeliveryGuarantee
+return getattr(JDeliveryGuarantee, self.name)
+
+
+class PulsarSerializationSchema(object):
+"""
+The serialization schema for how to serialize records into Pulsar.
+"""
+
+def __init__(self, _j_pulsar_serialization_schema):
+self._j_pulsar_serialization_schema = _j_pulsar_serialization_schema
+
+@staticmethod
+def flink_schema(serialization_schema: SerializationSchema) \
+-> 'PulsarSerializationSchema':
+"""
+Create a PulsarSerializationSchema by using the flink's 
SerializationSchema. It would
+serialize the message into byte array and send it to Pulsar with 
Schema#BYTES.
+"""
+JPulsarSerializationSchema = get_gateway().jvm.org.apache.flink \
+.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema
+_j_pulsar_serialization_schema = 
JPulsarSerializationSchema.flinkSchema(
+serialization_schema._j_serialization_schema)
+return PulsarSerializationSchema(_j_pulsar_serialization_schema)
+
+
+class TopicRoutingMode(Enum):
+"""
+The routing policy for choosing the desired topic by the given message.
+
+:data: `ROUND_ROBIN`:
+The producer will publish messages across all partitions in a round-robin 
fashion to achieve
+maximum throughput. Please note that round-robin is not done per 
individual message but
+rather it's set to the same boundary of batching delay, to ensure batching 
is effective.
+
+:data: `MESSAGE_KEY_HASH`:
+If no key is provided, The partitioned producer will randomly pick one 
single topic partition
+and publish all the messages into that partition. If a key is provided on 
the message, the
+partitioned producer will hash the key and assign the message to a 
particular partition.
+
+:data: `CUSTOM`:
+Use custom topic router implementation that will be called to determine 
the partition for a
+particular message.
+"""
+
+ROUND_ROBIN = 0
+MESSAGE_KEY_HASH = 1
+CUSTOM = 2
+
+def _to_j_topic_routing_mode(self):
+JTopicRoutingMode = get_gateway().jvm \
+
.org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode
+return getattr(JTopicRoutingMode, self.name)
+
+
+class MessageDelayer(object):
+"""
+A delayer for Pulsar broker passing the sent message to the downstream 
consumer. This is only
+works in {@link SubscriptionType#Shared} subscription.
+
+Read delayed message delivery
+
https://pulsar.apache.org/docs/en/next/concepts-messaging/#delayed-message-delivery
 for better
+understanding this feature.
+"""
+def __init__(self, _j_message_delayer):
+self._j_message_delayer = _j_message_delayer
+
+@staticmethod
+def never() -> 'MessageDelayer':
+"""
+All the messages should be consumed immediately.
+"""
+JMessageDelayer = get_gateway().jvm \
+
.org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer
+return MessageDelayer(JMessageDelayer.never())
+
+@staticmethod
+def fixed(duration: Duration) -> 'MessageDelayer':
+"""
+All the messages should be consumed in a fixed duration.
+"""
+JMessageDelayer = get_gateway().jvm \
+
.org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer
+return MessageDelayer(JMessageDelayer.fixed(duration._j_duration))
+
+
+class PulsarSink(Sink):
+"""
+The Sink implementation of Pulsar. Please use a PulsarSinkBuilder to 
construct a
+PulsarSink. The following ex

[GitHub] [flink] deadwind4 commented on a diff in pull request #19682: [FLINK-25795][python][connector/pulsar] Add pulsar sink DataStream API

2022-05-14 Thread GitBox


deadwind4 commented on code in PR #19682:
URL: https://github.com/apache/flink/pull/19682#discussion_r872081349


##
flink-python/pyflink/datastream/connectors.py:
##
@@ -1449,6 +1455,314 @@ def build(self) -> 'PulsarSource':
 return PulsarSource(self._j_pulsar_source_builder.build())
 
 
+class DeliveryGuarantee(Enum):
+"""
+DeliverGuarantees that can be chosen. In general your pipeline can only 
offer the lowest
+delivery guarantee which is supported by your sources and sinks.
+
+:data: `EXACTLY_ONCE`:
+Records are only delivered exactly-once also under failover scenarios. To 
build a complete
+exactly-once pipeline is required that the source and sink support 
exactly-once and are
+properly configured.
+
+:data: `AT_LEAST_ONCE`:
+Records are ensured to be delivered but it may happen that the same record 
is delivered
+multiple times. Usually, this guarantee is faster than the exactly-once 
delivery.
+
+:data: `NONE`:
+Records are delivered on a best effort basis. It is often the fastest way 
to process records
+but it may happen that records are lost or duplicated.
+"""
+
+EXACTLY_ONCE = 0,
+AT_LEAST_ONCE = 1,
+NONE = 2
+
+def _to_j_delivery_guarantee(self):
+JDeliveryGuarantee = get_gateway().jvm \
+.org.apache.flink.connector.base.DeliveryGuarantee
+return getattr(JDeliveryGuarantee, self.name)
+
+
+class PulsarSerializationSchema(object):
+"""
+The serialization schema for how to serialize records into Pulsar.
+"""
+
+def __init__(self, _j_pulsar_serialization_schema):
+self._j_pulsar_serialization_schema = _j_pulsar_serialization_schema
+
+@staticmethod
+def flink_schema(serialization_schema: SerializationSchema) \
+-> 'PulsarSerializationSchema':
+"""
+Create a PulsarSerializationSchema by using the flink's 
SerializationSchema. It would
+serialize the message into byte array and send it to Pulsar with 
Schema#BYTES.
+"""
+JPulsarSerializationSchema = get_gateway().jvm.org.apache.flink \
+.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema
+_j_pulsar_serialization_schema = 
JPulsarSerializationSchema.flinkSchema(
+serialization_schema._j_serialization_schema)
+return PulsarSerializationSchema(_j_pulsar_serialization_schema)
+
+
+class TopicRoutingMode(Enum):
+"""
+The routing policy for choosing the desired topic by the given message.
+
+:data: `ROUND_ROBIN`:
+The producer will publish messages across all partitions in a round-robin 
fashion to achieve
+maximum throughput. Please note that round-robin is not done per 
individual message but
+rather it's set to the same boundary of batching delay, to ensure batching 
is effective.
+
+:data: `MESSAGE_KEY_HASH`:
+If no key is provided, The partitioned producer will randomly pick one 
single topic partition
+and publish all the messages into that partition. If a key is provided on 
the message, the
+partitioned producer will hash the key and assign the message to a 
particular partition.
+
+:data: `CUSTOM`:
+Use custom topic router implementation that will be called to determine 
the partition for a
+particular message.
+"""
+
+ROUND_ROBIN = 0
+MESSAGE_KEY_HASH = 1
+CUSTOM = 2
+
+def _to_j_topic_routing_mode(self):
+JTopicRoutingMode = get_gateway().jvm \
+
.org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode
+return getattr(JTopicRoutingMode, self.name)
+
+
+class MessageDelayer(object):
+"""
+A delayer for Pulsar broker passing the sent message to the downstream 
consumer. This is only
+works in {@link SubscriptionType#Shared} subscription.
+
+Read delayed message delivery
+
https://pulsar.apache.org/docs/en/next/concepts-messaging/#delayed-message-delivery
 for better
+understanding this feature.
+"""
+def __init__(self, _j_message_delayer):
+self._j_message_delayer = _j_message_delayer
+
+@staticmethod
+def never() -> 'MessageDelayer':
+"""
+All the messages should be consumed immediately.
+"""
+JMessageDelayer = get_gateway().jvm \
+
.org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer
+return MessageDelayer(JMessageDelayer.never())
+
+@staticmethod
+def fixed(duration: Duration) -> 'MessageDelayer':
+"""
+All the messages should be consumed in a fixed duration.
+"""
+JMessageDelayer = get_gateway().jvm \
+
.org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer
+return MessageDelayer(JMessageDelayer.fixed(duration._j_duration))
+
+
+class PulsarSink(Sink):
+"""
+The Sink implementation of Pulsar. Please use a PulsarSinkBuilder to 
construct a
+PulsarSink. The following ex

[GitHub] [flink] deadwind4 commented on a diff in pull request #19682: [FLINK-25795][python][connector/pulsar] Add pulsar sink DataStream API

2022-05-16 Thread GitBox


deadwind4 commented on code in PR #19682:
URL: https://github.com/apache/flink/pull/19682#discussion_r874426556


##
flink-python/pyflink/datastream/connectors/base.py:
##
@@ -48,3 +50,32 @@ def __init__(self, sink: Union[str, JavaObject]):
 :param sink: The java Sink object.
 """
 super(Sink, self).__init__(sink)
+
+
+class DeliveryGuarantee(Enum):

Review Comment:
   This class is common. it is located in 
`org.apache.flink.connector.baseDeliveryGuarantee`. Pulsar, ES, and Kafka need 
it. In the future, I will commit a new PR fix Kafka code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org