dianfu commented on code in PR #22571:
URL: https://github.com/apache/flink/pull/22571#discussion_r1191830067


##########
flink-python/pyflink/datastream/connectors/pulsar.py:
##########
@@ -693,8 +668,7 @@ class PulsarSinkBuilder(object):
         ...     .set_service_url(PULSAR_BROKER_URL) \\
         ...     .set_admin_url(PULSAR_BROKER_HTTP_URL) \\
         ...     .set_topics([TOPIC1, TOPIC2]) \\
-        ...     .set_serialization_schema(
-        ...         
PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \\
+        ...     .set_value_serialization_schema(SimpleStringSchema()) \\

Review Comment:
   ```suggestion
           ...     .set_serialization_schema(SimpleStringSchema()) \\
   ```



##########
flink-python/pyflink/datastream/connectors/pulsar.py:
##########
@@ -483,18 +438,17 @@ def set_bounded_stop_cursor(self, stop_cursor: 
StopCursor) -> 'PulsarSourceBuild
         
self._j_pulsar_source_builder.setBoundedStopCursor(stop_cursor._j_stop_cursor)
         return self
 
-    def set_deserialization_schema(self,
-                                   pulsar_deserialization_schema: 
PulsarDeserializationSchema) \
+    def set_value_only_deserializer(self, deserialization_schema: 
DeserializationSchema) \
             -> 'PulsarSourceBuilder':
         """
-        DeserializationSchema is required for getting the Schema for 
deserialize message from
-        pulsar and getting the TypeInformation for message serialization in 
flink.
+        Sets the :class:`~pyflink.common.serialization.DeserializationSchema` 
for deserializing the
+        value of Pulsars message.
 
-        We have defined a set of implementations, using 
PulsarDeserializationSchema#flink_type_info
-        or PulsarDeserializationSchema#flink_schema for creating the desired 
schema.
+        :param deserialization_schema: the :class:`DeserializationSchema` to 
use for
+            deserialization.
+        :return: this PulsarSourceBuilder.
         """
-        self._j_pulsar_source_builder.setDeserializationSchema(
-            pulsar_deserialization_schema._j_pulsar_deserialization_schema)
+        
self._j_builder.setValueOnlyDeserializer(deserialization_schema._j_deserialization_schema)

Review Comment:
   I'm curious why you are trying to call `setValueOnlyDeserializer`. Per my 
understanding, there is no such interface at all: 
https://github.com/apache/flink-connector-pulsar/blob/main/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to