This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit d6e2e06964c4cf40a5e58d4bc2f976a65b4a566b
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Tue Nov 8 14:53:22 2022 +0800

    [FLINK-28083][Connector/Pulsar] PulsarSource work with object-reusing 
DeserializationSchema. (#21205)
---
 .../source/reader/PulsarSourceReaderFactory.java   | 23 +++----
 .../source/reader/emitter/PulsarRecordEmitter.java | 63 ++++++++++++++++--
 .../reader/fetcher/PulsarFetcherManagerBase.java   | 27 ++++----
 .../fetcher/PulsarOrderedFetcherManager.java       | 18 +++---
 .../fetcher/PulsarUnorderedFetcherManager.java     | 14 ++--
 .../source/reader/message/PulsarMessage.java       | 74 ----------------------
 .../reader/message/PulsarMessageCollector.java     | 60 ------------------
 .../reader/source/PulsarOrderedSourceReader.java   | 15 +++--
 .../reader/source/PulsarSourceReaderBase.java      | 18 ++----
 .../reader/source/PulsarUnorderedSourceReader.java | 13 ++--
 .../split/PulsarOrderedPartitionSplitReader.java   | 13 +---
 .../split/PulsarPartitionSplitReaderBase.java      | 33 +++-------
 .../split/PulsarUnorderedPartitionSplitReader.java | 11 +---
 .../source/enumerator/cursor/StopCursorTest.java   | 17 ++---
 .../PulsarOrderedPartitionSplitReaderTest.java     |  8 +--
 .../split/PulsarPartitionSplitReaderTestBase.java  | 74 ++++++++++------------
 16 files changed, 175 insertions(+), 306 deletions(-)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
index 820888a..2e83ab5 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
@@ -25,7 +25,7 @@ import 
org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
-import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
+import 
org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter;
 import 
org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader;
 import 
org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader;
 import 
org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
@@ -33,6 +33,7 @@ import 
org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPart
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
@@ -70,25 +71,25 @@ public final class PulsarSourceReaderFactory {
 
         // Create a message queue with the predefined source option.
         int queueCapacity = sourceConfiguration.getMessageQueueCapacity();
-        FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>> 
elementsQueue =
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> 
elementsQueue =
                 new FutureCompletingBlockingQueue<>(queueCapacity);
 
+        PulsarRecordEmitter<OUT> recordEmitter = new 
PulsarRecordEmitter<>(deserializationSchema);
+
         // Create different pulsar source reader by subscription type.
         SubscriptionType subscriptionType = 
sourceConfiguration.getSubscriptionType();
         if (subscriptionType == SubscriptionType.Failover
                 || subscriptionType == SubscriptionType.Exclusive) {
             // Create an ordered split reader supplier.
-            Supplier<PulsarOrderedPartitionSplitReader<OUT>> 
splitReaderSupplier =
+            Supplier<PulsarOrderedPartitionSplitReader> splitReaderSupplier =
                     () ->
-                            new PulsarOrderedPartitionSplitReader<>(
-                                    pulsarClient,
-                                    pulsarAdmin,
-                                    sourceConfiguration,
-                                    deserializationSchema);
+                            new PulsarOrderedPartitionSplitReader(
+                                    pulsarClient, pulsarAdmin, 
sourceConfiguration);
 
             return new PulsarOrderedSourceReader<>(
                     elementsQueue,
                     splitReaderSupplier,
+                    recordEmitter,
                     readerContext,
                     sourceConfiguration,
                     pulsarClient,
@@ -102,18 +103,18 @@ public final class PulsarSourceReaderFactory {
                 throw new IllegalStateException("Transaction is required but 
didn't enabled");
             }
 
-            Supplier<PulsarUnorderedPartitionSplitReader<OUT>> 
splitReaderSupplier =
+            Supplier<PulsarUnorderedPartitionSplitReader> splitReaderSupplier =
                     () ->
-                            new PulsarUnorderedPartitionSplitReader<>(
+                            new PulsarUnorderedPartitionSplitReader(
                                     pulsarClient,
                                     pulsarAdmin,
                                     sourceConfiguration,
-                                    deserializationSchema,
                                     coordinatorClient);
 
             return new PulsarUnorderedSourceReader<>(
                     elementsQueue,
                     splitReaderSupplier,
+                    recordEmitter,
                     readerContext,
                     sourceConfiguration,
                     pulsarClient,
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/emitter/PulsarRecordEmitter.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/emitter/PulsarRecordEmitter.java
index f6607f0..5cac7de 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/emitter/PulsarRecordEmitter.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/emitter/PulsarRecordEmitter.java
@@ -20,10 +20,13 @@ package 
org.apache.flink.connector.pulsar.source.reader.emitter;
 
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
-import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
 import 
org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader;
 import 
org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader;
 import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
+import org.apache.flink.util.Collector;
+
+import org.apache.pulsar.client.api.Message;
 
 /**
  * The {@link RecordEmitter} implementation for both {@link 
PulsarOrderedSourceReader} and {@link
@@ -31,15 +34,61 @@ import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
  * emitter.
  */
 public class PulsarRecordEmitter<T>
-        implements RecordEmitter<PulsarMessage<T>, T, 
PulsarPartitionSplitState> {
+        implements RecordEmitter<Message<byte[]>, T, 
PulsarPartitionSplitState> {
+
+    private final PulsarDeserializationSchema<T> deserializationSchema;
+    private final SourceOutputWrapper<T> sourceOutputWrapper;
+
+    public PulsarRecordEmitter(PulsarDeserializationSchema<T> 
deserializationSchema) {
+        this.deserializationSchema = deserializationSchema;
+        this.sourceOutputWrapper = new SourceOutputWrapper<>();
+    }
 
     @Override
     public void emitRecord(
-            PulsarMessage<T> element, SourceOutput<T> output, 
PulsarPartitionSplitState splitState)
+            Message<byte[]> element, SourceOutput<T> output, 
PulsarPartitionSplitState splitState)
             throws Exception {
-        // Sink the record to source output.
-        output.collect(element.getValue(), element.getEventTime());
-        // Update the split state.
-        splitState.setLatestConsumedId(element.getId());
+        // Update the source output.
+        sourceOutputWrapper.setSourceOutput(output);
+        sourceOutputWrapper.setTimestamp(element);
+
+        // Deserialize the message and since it to output.
+        deserializationSchema.deserialize(element, sourceOutputWrapper);
+        splitState.setLatestConsumedId(element.getMessageId());
+
+        // Release the messages if we use message pool in Pulsar.
+        element.release();
+    }
+
+    private static class SourceOutputWrapper<T> implements Collector<T> {
+
+        private SourceOutput<T> sourceOutput;
+        private long timestamp;
+
+        @Override
+        public void collect(T record) {
+            if (timestamp > 0) {
+                sourceOutput.collect(record, timestamp);
+            } else {
+                sourceOutput.collect(record);
+            }
+        }
+
+        @Override
+        public void close() {
+            // Nothing to do here.
+        }
+
+        private void setSourceOutput(SourceOutput<T> sourceOutput) {
+            this.sourceOutput = sourceOutput;
+        }
+
+        /**
+         * Get the event timestamp from Pulsar. Zero means there is no event 
time. See {@link
+         * Message#getEventTime()} to get the reason why it returns zero.
+         */
+        private void setTimestamp(Message<?> message) {
+            this.timestamp = message.getEventTime();
+        }
     }
 }
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarFetcherManagerBase.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarFetcherManagerBase.java
index 1622eee..af9bdd4 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarFetcherManagerBase.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarFetcherManagerBase.java
@@ -26,9 +26,10 @@ import 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
 import 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 
+import org.apache.pulsar.client.api.Message;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -36,14 +37,10 @@ import java.util.function.Supplier;
 
 import static java.util.Collections.singletonList;
 
-/**
- * Common fetcher manager abstraction for both ordered & unordered message.
- *
- * @param <T> The decoded message type for flink.
- */
+/** Common fetcher manager abstraction for both ordered & unordered message. */
 @Internal
-public abstract class PulsarFetcherManagerBase<T>
-        extends SplitFetcherManager<PulsarMessage<T>, PulsarPartitionSplit> {
+public abstract class PulsarFetcherManagerBase
+        extends SplitFetcherManager<Message<byte[]>, PulsarPartitionSplit> {
 
     private final Map<String, Integer> splitFetcherMapping = new HashMap<>();
     private final Map<Integer, Boolean> fetcherStatus = new HashMap<>();
@@ -57,8 +54,8 @@ public abstract class PulsarFetcherManagerBase<T>
      * @param splitReaderSupplier The factory for the split reader that 
connects to the source
      */
     protected PulsarFetcherManagerBase(
-            
FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<T>>> 
elementsQueue,
-            Supplier<SplitReader<PulsarMessage<T>, PulsarPartitionSplit>> 
splitReaderSupplier,
+            
FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> 
elementsQueue,
+            Supplier<SplitReader<Message<byte[]>, PulsarPartitionSplit>> 
splitReaderSupplier,
             Configuration configuration) {
         super(elementsQueue, splitReaderSupplier, configuration);
     }
@@ -70,7 +67,7 @@ public abstract class PulsarFetcherManagerBase<T>
     @Override
     public void addSplits(List<PulsarPartitionSplit> splitsToAdd) {
         for (PulsarPartitionSplit split : splitsToAdd) {
-            SplitFetcher<PulsarMessage<T>, PulsarPartitionSplit> fetcher =
+            SplitFetcher<Message<byte[]>, PulsarPartitionSplit> fetcher =
                     getOrCreateFetcher(split.splitId());
             fetcher.addSplits(singletonList(split));
             // This method could be executed multiple times.
@@ -79,7 +76,7 @@ public abstract class PulsarFetcherManagerBase<T>
     }
 
     @Override
-    protected void startFetcher(SplitFetcher<PulsarMessage<T>, 
PulsarPartitionSplit> fetcher) {
+    protected void startFetcher(SplitFetcher<Message<byte[]>, 
PulsarPartitionSplit> fetcher) {
         if (fetcherStatus.get(fetcher.fetcherId()) != Boolean.TRUE) {
             fetcherStatus.put(fetcher.fetcherId(), true);
             super.startFetcher(fetcher);
@@ -91,16 +88,16 @@ public abstract class PulsarFetcherManagerBase<T>
         Integer fetchId = splitFetcherMapping.remove(splitId);
         if (fetchId != null) {
             fetcherStatus.remove(fetchId);
-            SplitFetcher<PulsarMessage<T>, PulsarPartitionSplit> fetcher = 
fetchers.remove(fetchId);
+            SplitFetcher<Message<byte[]>, PulsarPartitionSplit> fetcher = 
fetchers.remove(fetchId);
             if (fetcher != null) {
                 fetcher.shutdown();
             }
         }
     }
 
-    protected SplitFetcher<PulsarMessage<T>, PulsarPartitionSplit> 
getOrCreateFetcher(
+    protected SplitFetcher<Message<byte[]>, PulsarPartitionSplit> 
getOrCreateFetcher(
             String splitId) {
-        SplitFetcher<PulsarMessage<T>, PulsarPartitionSplit> fetcher;
+        SplitFetcher<Message<byte[]>, PulsarPartitionSplit> fetcher;
         Integer fetcherId = splitFetcherMapping.get(splitId);
 
         if (fetcherId == null) {
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarOrderedFetcherManager.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarOrderedFetcherManager.java
index 103dc62..0bc7c98 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarOrderedFetcherManager.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarOrderedFetcherManager.java
@@ -25,11 +25,11 @@ import 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
 import 
org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,16 +41,14 @@ import java.util.function.Supplier;
  * Pulsar's FetcherManager implementation for ordered consuming. This class is 
needed to help
  * acknowledge the message to Pulsar using the {@link Consumer} inside the 
{@link
  * PulsarOrderedPartitionSplitReader}.
- *
- * @param <T> The message type for pulsar decoded message.
  */
 @Internal
-public class PulsarOrderedFetcherManager<T> extends 
PulsarFetcherManagerBase<T> {
+public class PulsarOrderedFetcherManager extends PulsarFetcherManagerBase {
     private static final Logger LOG = 
LoggerFactory.getLogger(PulsarOrderedFetcherManager.class);
 
     public PulsarOrderedFetcherManager(
-            
FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<T>>> 
elementsQueue,
-            Supplier<SplitReader<PulsarMessage<T>, PulsarPartitionSplit>> 
splitReaderSupplier,
+            
FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> 
elementsQueue,
+            Supplier<SplitReader<Message<byte[]>, PulsarPartitionSplit>> 
splitReaderSupplier,
             Configuration configuration) {
         super(elementsQueue, splitReaderSupplier, configuration);
     }
@@ -59,18 +57,18 @@ public class PulsarOrderedFetcherManager<T> extends 
PulsarFetcherManagerBase<T>
         LOG.debug("Acknowledge messages {}", cursorsToCommit);
         cursorsToCommit.forEach(
                 (partition, messageId) -> {
-                    SplitFetcher<PulsarMessage<T>, PulsarPartitionSplit> 
fetcher =
+                    SplitFetcher<Message<byte[]>, PulsarPartitionSplit> 
fetcher =
                             getOrCreateFetcher(partition.toString());
                     triggerAcknowledge(fetcher, partition, messageId);
                 });
     }
 
     private void triggerAcknowledge(
-            SplitFetcher<PulsarMessage<T>, PulsarPartitionSplit> splitFetcher,
+            SplitFetcher<Message<byte[]>, PulsarPartitionSplit> splitFetcher,
             TopicPartition partition,
             MessageId messageId) {
-        PulsarOrderedPartitionSplitReader<T> splitReader =
-                (PulsarOrderedPartitionSplitReader<T>) 
splitFetcher.getSplitReader();
+        PulsarOrderedPartitionSplitReader splitReader =
+                (PulsarOrderedPartitionSplitReader) 
splitFetcher.getSplitReader();
         splitReader.notifyCheckpointComplete(partition, messageId);
         startFetcher(splitFetcher);
     }
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
index 449e992..33b811c 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
@@ -24,12 +24,12 @@ import 
org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
 import 
org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
 
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -42,15 +42,13 @@ import static java.util.stream.Collectors.toCollection;
  * Pulsar's FetcherManager implementation for unordered consuming. This class 
is needed to help
  * acknowledge the message to Pulsar using the {@link Consumer} inside the 
{@link
  * PulsarUnorderedPartitionSplitReader}.
- *
- * @param <T> The message type for pulsar decoded message.
  */
 @Internal
-public class PulsarUnorderedFetcherManager<T> extends 
PulsarFetcherManagerBase<T> {
+public class PulsarUnorderedFetcherManager extends PulsarFetcherManagerBase {
 
     public PulsarUnorderedFetcherManager(
-            
FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<T>>> 
elementsQueue,
-            Supplier<SplitReader<PulsarMessage<T>, PulsarPartitionSplit>> 
splitReaderSupplier,
+            
FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> 
elementsQueue,
+            Supplier<SplitReader<Message<byte[]>, PulsarPartitionSplit>> 
splitReaderSupplier,
             Configuration configuration) {
         super(elementsQueue, splitReaderSupplier, configuration);
     }
@@ -65,8 +63,8 @@ public class PulsarUnorderedFetcherManager<T> extends 
PulsarFetcherManagerBase<T
     }
 
     private Optional<PulsarPartitionSplit> snapshotReader(
-            SplitReader<PulsarMessage<T>, PulsarPartitionSplit> splitReader) {
-        return ((PulsarUnorderedPartitionSplitReader<T>) splitReader)
+            SplitReader<Message<byte[]>, PulsarPartitionSplit> splitReader) {
+        return ((PulsarUnorderedPartitionSplitReader) splitReader)
                 .snapshotState()
                 .map(PulsarPartitionSplitState::toPulsarPartitionSplit);
     }
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/message/PulsarMessage.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/message/PulsarMessage.java
deleted file mode 100644
index 0632e22..0000000
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/message/PulsarMessage.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.reader.message;
-
-import org.apache.flink.annotation.Internal;
-import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
-
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-
-/**
- * The message instance that contains the required information which would be 
used for committing
- * the consuming status.
- */
-@Internal
-public class PulsarMessage<T> {
-
-    /**
-     * The id of a given message. This id could be same for multiple {@link 
PulsarMessage}, although
-     * it is unique for every {@link Message}.
-     */
-    private final MessageId id;
-
-    /** The value which deserialized by {@link PulsarDeserializationSchema}. */
-    private final T value;
-
-    /** The produce time for this message, it's a event time. */
-    private final long eventTime;
-
-    public PulsarMessage(MessageId id, T value, long eventTime) {
-        this.id = id;
-        this.value = value;
-        this.eventTime = eventTime;
-    }
-
-    public MessageId getId() {
-        return id;
-    }
-
-    public T getValue() {
-        return value;
-    }
-
-    public long getEventTime() {
-        return eventTime;
-    }
-
-    @Override
-    public String toString() {
-        return "PulsarMessage{"
-                + "id="
-                + id
-                + ", value="
-                + value
-                + ", eventTime="
-                + eventTime
-                + '}';
-    }
-}
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/message/PulsarMessageCollector.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/message/PulsarMessageCollector.java
deleted file mode 100644
index f201425..0000000
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/message/PulsarMessageCollector.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.reader.message;
-
-import org.apache.flink.connector.base.source.reader.RecordsBySplits;
-import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
-import org.apache.flink.util.Collector;
-
-import org.apache.pulsar.client.api.Message;
-
-/**
- * This collector supplier is providing the {@link Collector} for accepting 
the deserialized {@link
- * PulsarMessage} from pulsar {@link PulsarDeserializationSchema}.
- *
- * @param <T> The deserialized pulsar message type, aka the source message 
type.
- */
-public class PulsarMessageCollector<T> implements Collector<T> {
-
-    private final String splitId;
-    private final RecordsBySplits.Builder<PulsarMessage<T>> builder;
-    private Message<?> message;
-
-    public PulsarMessageCollector(
-            String splitId, RecordsBySplits.Builder<PulsarMessage<T>> builder) 
{
-        this.splitId = splitId;
-        this.builder = builder;
-    }
-
-    public void setMessage(Message<?> message) {
-        this.message = message;
-    }
-
-    @Override
-    public void collect(T t) {
-        PulsarMessage<T> result =
-                new PulsarMessage<>(message.getMessageId(), t, 
message.getEventTime());
-        builder.add(splitId, result);
-    }
-
-    @Override
-    public void close() {
-        // Nothing to do for this collector.
-    }
-}
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReader.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReader.java
index 6ff0466..046575e 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReader.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReader.java
@@ -26,14 +26,15 @@ import 
org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import 
org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter;
 import 
org.apache.flink.connector.pulsar.source.reader.fetcher.PulsarOrderedFetcherManager;
-import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
 import 
org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
 import org.apache.flink.core.io.InputStatus;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.slf4j.Logger;
@@ -67,16 +68,18 @@ public class PulsarOrderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT>
     private ScheduledExecutorService cursorScheduler;
 
     public PulsarOrderedSourceReader(
-            
FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>> 
elementsQueue,
-            Supplier<PulsarOrderedPartitionSplitReader<OUT>> 
splitReaderSupplier,
+            
FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> 
elementsQueue,
+            Supplier<PulsarOrderedPartitionSplitReader> splitReaderSupplier,
+            PulsarRecordEmitter<OUT> recordEmitter,
             SourceReaderContext context,
             SourceConfiguration sourceConfiguration,
             PulsarClient pulsarClient,
             PulsarAdmin pulsarAdmin) {
         super(
                 elementsQueue,
-                new PulsarOrderedFetcherManager<>(
+                new PulsarOrderedFetcherManager(
                         elementsQueue, splitReaderSupplier::get, 
context.getConfiguration()),
+                recordEmitter,
                 context,
                 sourceConfiguration,
                 pulsarClient,
@@ -151,7 +154,7 @@ public class PulsarOrderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT>
         LOG.debug("Committing cursors for checkpoint {}", checkpointId);
         Map<TopicPartition, MessageId> cursors = 
cursorsToCommit.get(checkpointId);
         try {
-            ((PulsarOrderedFetcherManager<OUT>) 
splitFetcherManager).acknowledgeMessages(cursors);
+            ((PulsarOrderedFetcherManager) 
splitFetcherManager).acknowledgeMessages(cursors);
             LOG.debug("Successfully acknowledge cursors for checkpoint {}", 
checkpointId);
 
             // Clean up the cursors.
@@ -196,7 +199,7 @@ public class PulsarOrderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT>
         }
 
         try {
-            ((PulsarOrderedFetcherManager<OUT>) 
splitFetcherManager).acknowledgeMessages(cursors);
+            ((PulsarOrderedFetcherManager) 
splitFetcherManager).acknowledgeMessages(cursors);
             // Clean up the finish splits.
             cursorsOfFinishedSplits.keySet().removeAll(cursors.keySet());
         } catch (Exception e) {
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java
index eee6950..d7fe471 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java
@@ -25,11 +25,11 @@ import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompl
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import 
org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter;
 import 
org.apache.flink.connector.pulsar.source.reader.fetcher.PulsarFetcherManagerBase;
-import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClient;
 
 import java.util.Collection;
@@ -42,25 +42,21 @@ import java.util.Set;
  */
 abstract class PulsarSourceReaderBase<OUT>
         extends SourceReaderBase<
-                PulsarMessage<OUT>, OUT, PulsarPartitionSplit, 
PulsarPartitionSplitState> {
+                Message<byte[]>, OUT, PulsarPartitionSplit, 
PulsarPartitionSplitState> {
 
     protected final SourceConfiguration sourceConfiguration;
     protected final PulsarClient pulsarClient;
     protected final PulsarAdmin pulsarAdmin;
 
     protected PulsarSourceReaderBase(
-            
FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>> 
elementsQueue,
-            PulsarFetcherManagerBase<OUT> splitFetcherManager,
+            
FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> 
elementsQueue,
+            PulsarFetcherManagerBase splitFetcherManager,
+            PulsarRecordEmitter<OUT> recordEmitter,
             SourceReaderContext context,
             SourceConfiguration sourceConfiguration,
             PulsarClient pulsarClient,
             PulsarAdmin pulsarAdmin) {
-        super(
-                elementsQueue,
-                splitFetcherManager,
-                new PulsarRecordEmitter<>(),
-                sourceConfiguration,
-                context);
+        super(elementsQueue, splitFetcherManager, recordEmitter, 
sourceConfiguration, context);
 
         this.sourceConfiguration = sourceConfiguration;
         this.pulsarClient = pulsarClient;
@@ -96,7 +92,7 @@ abstract class PulsarSourceReaderBase<OUT>
 
     protected void closeFinishedSplits(Set<String> finishedSplitIds) {
         for (String splitId : finishedSplitIds) {
-            ((PulsarFetcherManagerBase<OUT>) 
splitFetcherManager).closeFetcher(splitId);
+            ((PulsarFetcherManagerBase) 
splitFetcherManager).closeFetcher(splitId);
         }
     }
 }
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
index 3bb4bc6..41a9b28 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
@@ -23,13 +23,14 @@ import 
org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import 
org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter;
 import 
org.apache.flink.connector.pulsar.source.reader.fetcher.PulsarUnorderedFetcherManager;
-import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
 import 
org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
 import org.apache.pulsar.client.api.transaction.TxnID;
@@ -63,8 +64,9 @@ public class PulsarUnorderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT
     private boolean started = false;
 
     public PulsarUnorderedSourceReader(
-            
FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>> 
elementsQueue,
-            Supplier<PulsarUnorderedPartitionSplitReader<OUT>> 
splitReaderSupplier,
+            
FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> 
elementsQueue,
+            Supplier<PulsarUnorderedPartitionSplitReader> splitReaderSupplier,
+            PulsarRecordEmitter<OUT> recordEmitter,
             SourceReaderContext context,
             SourceConfiguration sourceConfiguration,
             PulsarClient pulsarClient,
@@ -72,8 +74,9 @@ public class PulsarUnorderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT
             @Nullable TransactionCoordinatorClient coordinatorClient) {
         super(
                 elementsQueue,
-                new PulsarUnorderedFetcherManager<>(
+                new PulsarUnorderedFetcherManager(
                         elementsQueue, splitReaderSupplier::get, 
context.getConfiguration()),
+                recordEmitter,
                 context,
                 sourceConfiguration,
                 pulsarClient,
@@ -142,7 +145,7 @@ public class PulsarUnorderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT
     public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
         LOG.debug("Trigger the new transaction for downstream readers.");
         List<PulsarPartitionSplit> splits =
-                ((PulsarUnorderedFetcherManager<OUT>) 
splitFetcherManager).snapshotState();
+                ((PulsarUnorderedFetcherManager) 
splitFetcherManager).snapshotState();
 
         if (coordinatorClient != null) {
             // Snapshot the transaction status and commit it after checkpoint 
finishing.
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
index 316431a..6b02e0f 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
@@ -21,7 +21,6 @@ package org.apache.flink.connector.pulsar.source.reader.split;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
 import 
org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 
@@ -45,20 +44,17 @@ import static 
org.apache.flink.connector.pulsar.source.enumerator.cursor.Message
 /**
  * The split reader a given {@link PulsarPartitionSplit}, it would be closed 
once the {@link
  * PulsarOrderedSourceReader} is closed.
- *
- * @param <OUT> the type of the pulsar source message that would be serialized 
to downstream.
  */
 @Internal
-public class PulsarOrderedPartitionSplitReader<OUT> extends 
PulsarPartitionSplitReaderBase<OUT> {
+public class PulsarOrderedPartitionSplitReader extends 
PulsarPartitionSplitReaderBase {
     private static final Logger LOG =
             LoggerFactory.getLogger(PulsarOrderedPartitionSplitReader.class);
 
     public PulsarOrderedPartitionSplitReader(
             PulsarClient pulsarClient,
             PulsarAdmin pulsarAdmin,
-            SourceConfiguration sourceConfiguration,
-            PulsarDeserializationSchema<OUT> deserializationSchema) {
-        super(pulsarClient, pulsarAdmin, sourceConfiguration, 
deserializationSchema);
+            SourceConfiguration sourceConfiguration) {
+        super(pulsarClient, pulsarAdmin, sourceConfiguration);
     }
 
     @Override
@@ -70,9 +66,6 @@ public class PulsarOrderedPartitionSplitReader<OUT> extends 
PulsarPartitionSplit
     protected void finishedPollMessage(Message<byte[]> message) {
         // Nothing to do here.
         LOG.debug("Finished polling message {}", message);
-
-        // Release message
-        message.release();
     }
 
     @Override
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
index b884c57..08e9977 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
@@ -28,9 +28,6 @@ import 
org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.StopCondition;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
-import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
-import 
org.apache.flink.connector.pulsar.source.reader.message.PulsarMessageCollector;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.flink.util.Preconditions;
 
@@ -60,19 +57,14 @@ import static 
org.apache.flink.connector.pulsar.source.config.PulsarSourceConfig
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode.JOIN;
 import static org.apache.pulsar.client.api.KeySharedPolicy.stickyHashRange;
 
-/**
- * The common partition split reader.
- *
- * @param <OUT> the type of the pulsar source message that would be serialized 
to downstream.
- */
-abstract class PulsarPartitionSplitReaderBase<OUT>
-        implements SplitReader<PulsarMessage<OUT>, PulsarPartitionSplit> {
+/** The common partition split reader. */
+abstract class PulsarPartitionSplitReaderBase
+        implements SplitReader<Message<byte[]>, PulsarPartitionSplit> {
     private static final Logger LOG = 
LoggerFactory.getLogger(PulsarPartitionSplitReaderBase.class);
 
     protected final PulsarClient pulsarClient;
     protected final PulsarAdmin pulsarAdmin;
     protected final SourceConfiguration sourceConfiguration;
-    protected final PulsarDeserializationSchema<OUT> deserializationSchema;
 
     protected Consumer<byte[]> pulsarConsumer;
     protected PulsarPartitionSplit registeredSplit;
@@ -80,17 +72,15 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
     protected PulsarPartitionSplitReaderBase(
             PulsarClient pulsarClient,
             PulsarAdmin pulsarAdmin,
-            SourceConfiguration sourceConfiguration,
-            PulsarDeserializationSchema<OUT> deserializationSchema) {
+            SourceConfiguration sourceConfiguration) {
         this.pulsarClient = pulsarClient;
         this.pulsarAdmin = pulsarAdmin;
         this.sourceConfiguration = sourceConfiguration;
-        this.deserializationSchema = deserializationSchema;
     }
 
     @Override
-    public RecordsWithSplitIds<PulsarMessage<OUT>> fetch() throws IOException {
-        RecordsBySplits.Builder<PulsarMessage<OUT>> builder = new 
RecordsBySplits.Builder<>();
+    public RecordsWithSplitIds<Message<byte[]>> fetch() throws IOException {
+        RecordsBySplits.Builder<Message<byte[]>> builder = new 
RecordsBySplits.Builder<>();
 
         // Return when no split registered to this reader.
         if (pulsarConsumer == null || registeredSplit == null) {
@@ -99,10 +89,9 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
 
         StopCursor stopCursor = registeredSplit.getStopCursor();
         String splitId = registeredSplit.splitId();
-        PulsarMessageCollector<OUT> collector = new 
PulsarMessageCollector<>(splitId, builder);
         Deadline deadline = 
Deadline.fromNow(sourceConfiguration.getMaxFetchTime());
 
-        // Consume message from pulsar until it was woke up by flink reader.
+        // Consume messages from pulsar until it was woken up by flink reader.
         for (int messageNum = 0;
                 messageNum < sourceConfiguration.getMaxFetchRecords() && 
deadline.hasTimeLeft();
                 messageNum++) {
@@ -116,11 +105,9 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
                 StopCondition condition = stopCursor.shouldStop(message);
 
                 if (condition == StopCondition.CONTINUE || condition == 
StopCondition.EXACTLY) {
-                    // Deserialize message.
-                    collector.setMessage(message);
-                    deserializationSchema.deserialize(message, collector);
-
-                    // Acknowledge message if need.
+                    // Collect original message.
+                    builder.add(splitId, message);
+                    // Acknowledge the message if you need.
                     finishedPollMessage(message);
                 }
 
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
index 02fd8d9..f095c12 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
@@ -20,7 +20,6 @@ package org.apache.flink.connector.pulsar.source.reader.split;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
 import 
org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
@@ -52,11 +51,9 @@ import static 
org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUt
 /**
  * The split reader a given {@link PulsarPartitionSplit}, it would be closed 
once the {@link
  * PulsarUnorderedSourceReader} is closed.
- *
- * @param <OUT> the type of the pulsar source message that would be serialized 
to downstream.
  */
 @Internal
-public class PulsarUnorderedPartitionSplitReader<OUT> extends 
PulsarPartitionSplitReaderBase<OUT> {
+public class PulsarUnorderedPartitionSplitReader extends 
PulsarPartitionSplitReaderBase {
     private static final Logger LOG =
             LoggerFactory.getLogger(PulsarUnorderedPartitionSplitReader.class);
 
@@ -68,9 +65,8 @@ public class PulsarUnorderedPartitionSplitReader<OUT> extends 
PulsarPartitionSpl
             PulsarClient pulsarClient,
             PulsarAdmin pulsarAdmin,
             SourceConfiguration sourceConfiguration,
-            PulsarDeserializationSchema<OUT> deserializationSchema,
             TransactionCoordinatorClient coordinatorClient) {
-        super(pulsarClient, pulsarAdmin, sourceConfiguration, 
deserializationSchema);
+        super(pulsarClient, pulsarAdmin, sourceConfiguration);
 
         this.coordinatorClient = coordinatorClient;
     }
@@ -111,9 +107,6 @@ public class PulsarUnorderedPartitionSplitReader<OUT> 
extends PulsarPartitionSpl
         if (sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
             sneakyClient(() -> pulsarConsumer.acknowledge(message));
         }
-
-        // Release message
-        message.release();
     }
 
     @Override
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
index 831d9e0..0aee463 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
@@ -18,17 +18,16 @@
 
 package org.apache.flink.connector.pulsar.source.enumerator.cursor;
 
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
 import 
org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
 
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
 import org.junit.jupiter.api.Test;
@@ -43,7 +42,6 @@ import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSA
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
-import static 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.flinkSchema;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test different implementation of StopCursor. */
@@ -54,12 +52,9 @@ class StopCursorTest extends PulsarTestSuiteBase {
         String topicName = randomAlphanumeric(5);
         operator().createTopic(topicName, 2);
 
-        PulsarOrderedPartitionSplitReader<String> splitReader =
-                new PulsarOrderedPartitionSplitReader<>(
-                        operator().client(),
-                        operator().admin(),
-                        sourceConfig(),
-                        flinkSchema(new SimpleStringSchema()));
+        PulsarOrderedPartitionSplitReader splitReader =
+                new PulsarOrderedPartitionSplitReader(
+                        operator().client(), operator().admin(), 
sourceConfig());
         // send the first message and set the stopCursor to filter any late 
stopCursor
         operator()
                 .sendMessage(
@@ -77,7 +72,7 @@ class StopCursorTest extends PulsarTestSuiteBase {
         SplitsAddition<PulsarPartitionSplit> addition = new 
SplitsAddition<>(singletonList(split));
         splitReader.handleSplitsChanges(addition);
         // first fetch should have result
-        RecordsWithSplitIds<PulsarMessage<String>> firstResult = 
splitReader.fetch();
+        RecordsWithSplitIds<Message<byte[]>> firstResult = splitReader.fetch();
         assertThat(firstResult.nextSplit()).isNotNull();
         assertThat(firstResult.nextRecordFromSplit()).isNotNull();
         assertThat(firstResult.finishedSplits()).isEmpty();
@@ -87,7 +82,7 @@ class StopCursorTest extends PulsarTestSuiteBase {
                         topicNameWithPartition(topicName, 0),
                         Schema.STRING,
                         randomAlphanumeric(10));
-        RecordsWithSplitIds<PulsarMessage<String>> secondResult = 
splitReader.fetch();
+        RecordsWithSplitIds<Message<byte[]>> secondResult = 
splitReader.fetch();
         assertThat(secondResult.nextSplit()).isNull();
         assertThat(secondResult.finishedSplits()).isNotEmpty();
     }
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java
index 3d58d5e..4d93ce9 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java
@@ -38,7 +38,7 @@ class PulsarOrderedPartitionSplitReaderTest extends 
PulsarPartitionSplitReaderTe
 
     @TestTemplate
     void consumeMessageCreatedBeforeHandleSplitsChangesWithoutSeek(
-            PulsarPartitionSplitReaderBase<String> splitReader) {
+            PulsarPartitionSplitReaderBase splitReader) {
         String topicName = randomAlphabetic(10);
         operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
         handleSplit(splitReader, topicName, 0);
@@ -47,7 +47,7 @@ class PulsarOrderedPartitionSplitReaderTest extends 
PulsarPartitionSplitReaderTe
 
     @TestTemplate
     void 
consumeMessageCreatedBeforeHandleSplitsChangesAndUseLatestStartCursorWithoutSeek(
-            PulsarPartitionSplitReaderBase<String> splitReader) {
+            PulsarPartitionSplitReaderBase splitReader) {
         String topicName = randomAlphabetic(10);
         operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
         handleSplit(splitReader, topicName, 0, MessageId.latest);
@@ -56,7 +56,7 @@ class PulsarOrderedPartitionSplitReaderTest extends 
PulsarPartitionSplitReaderTe
 
     @TestTemplate
     void 
consumeMessageCreatedBeforeHandleSplitsChangesAndUseEarliestStartCursorWithoutSeek(
-            PulsarPartitionSplitReaderBase<String> splitReader) {
+            PulsarPartitionSplitReaderBase splitReader) {
         String topicName = randomAlphabetic(10);
         operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
         handleSplit(splitReader, topicName, 0, MessageId.earliest);
@@ -65,7 +65,7 @@ class PulsarOrderedPartitionSplitReaderTest extends 
PulsarPartitionSplitReaderTe
 
     @TestTemplate
     void 
consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageWithoutSeek(
-            PulsarPartitionSplitReaderBase<String> splitReader) {
+            PulsarPartitionSplitReaderBase splitReader) {
         String topicName = randomAlphabetic(10);
         operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
         MessageIdImpl lastMessageId =
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
index e63424a..4631798 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.connector.pulsar.source.reader.split;
 
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -26,13 +25,13 @@ import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
 import 
org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension;
 import org.apache.flink.util.TestLoggerExtension;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -65,7 +64,6 @@ import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSA
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
-import static 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.flinkSchema;
 import static 
org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.isAssignableFromParameterContext;
 import static 
org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension.PULSAR_SOURCE_READER_SUBSCRIPTION_TYPE_STORE_KEY;
 import static 
org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension.PULSAR_TEST_RESOURCE_NAMESPACE;
@@ -100,12 +98,12 @@ abstract class PulsarPartitionSplitReaderTestBase extends 
PulsarTestSuiteBase {
     }
 
     protected void handleSplit(
-            PulsarPartitionSplitReaderBase<String> reader, String topicName, 
int partitionId) {
+            PulsarPartitionSplitReaderBase reader, String topicName, int 
partitionId) {
         handleSplit(reader, topicName, partitionId, null);
     }
 
     protected void handleSplit(
-            PulsarPartitionSplitReaderBase<String> reader,
+            PulsarPartitionSplitReaderBase reader,
             String topicName,
             int partitionId,
             MessageId startPosition) {
@@ -117,12 +115,12 @@ abstract class PulsarPartitionSplitReaderTestBase extends 
PulsarTestSuiteBase {
     }
 
     private void seekStartPositionAndHandleSplit(
-            PulsarPartitionSplitReaderBase<String> reader, String topicName, 
int partitionId) {
+            PulsarPartitionSplitReaderBase reader, String topicName, int 
partitionId) {
         seekStartPositionAndHandleSplit(reader, topicName, partitionId, 
MessageId.latest);
     }
 
     private void seekStartPositionAndHandleSplit(
-            PulsarPartitionSplitReaderBase<String> reader,
+            PulsarPartitionSplitReaderBase reader,
             String topicName,
             int partitionId,
             MessageId startPosition) {
@@ -159,29 +157,29 @@ abstract class PulsarPartitionSplitReaderTestBase extends 
PulsarTestSuiteBase {
         reader.handleSplitsChanges(addition);
     }
 
-    private <T> PulsarMessage<T> 
fetchedMessage(PulsarPartitionSplitReaderBase<T> splitReader) {
+    private <T> Message<byte[]> fetchedMessage(PulsarPartitionSplitReaderBase 
splitReader) {
         return fetchedMessages(splitReader, 1, 
false).stream().findFirst().orElse(null);
     }
 
-    protected <T> List<PulsarMessage<T>> fetchedMessages(
-            PulsarPartitionSplitReaderBase<T> splitReader, int expectedCount, 
boolean verify) {
+    protected <T> List<Message<byte[]>> fetchedMessages(
+            PulsarPartitionSplitReaderBase splitReader, int expectedCount, 
boolean verify) {
         return fetchedMessages(
                 splitReader, expectedCount, verify, 
Boundedness.CONTINUOUS_UNBOUNDED);
     }
 
-    private <T> List<PulsarMessage<T>> fetchedMessages(
-            PulsarPartitionSplitReaderBase<T> splitReader,
+    private <T> List<Message<byte[]>> fetchedMessages(
+            PulsarPartitionSplitReaderBase splitReader,
             int expectedCount,
             boolean verify,
             Boundedness boundedness) {
-        List<PulsarMessage<T>> messages = new ArrayList<>(expectedCount);
+        List<Message<byte[]>> messages = new ArrayList<>(expectedCount);
         List<String> finishedSplits = new ArrayList<>();
         for (int i = 0; i < 3; ) {
             try {
-                RecordsWithSplitIds<PulsarMessage<T>> recordsBySplitIds = 
splitReader.fetch();
+                RecordsWithSplitIds<Message<byte[]>> recordsBySplitIds = 
splitReader.fetch();
                 if (recordsBySplitIds.nextSplit() != null) {
                     // Collect the records in this split.
-                    PulsarMessage<T> record;
+                    Message<byte[]> record;
                     while ((record = recordsBySplitIds.nextRecordFromSplit()) 
!= null) {
                         messages.add(record);
                     }
@@ -207,7 +205,7 @@ abstract class PulsarPartitionSplitReaderTestBase extends 
PulsarTestSuiteBase {
     }
 
     @TestTemplate
-    void pollMessageAfterTimeout(PulsarPartitionSplitReaderBase<String> 
splitReader)
+    void pollMessageAfterTimeout(PulsarPartitionSplitReaderBase splitReader)
             throws InterruptedException, TimeoutException {
         String topicName = randomAlphabetic(10);
 
@@ -215,7 +213,7 @@ abstract class PulsarPartitionSplitReaderTestBase extends 
PulsarTestSuiteBase {
         handleSplit(splitReader, topicName, 0, MessageId.latest);
 
         // Poll once with a null message
-        PulsarMessage<String> message1 = fetchedMessage(splitReader);
+        Message<byte[]> message1 = fetchedMessage(splitReader);
         assertThat(message1).isNull();
 
         // Send a message to pulsar
@@ -225,7 +223,7 @@ abstract class PulsarPartitionSplitReaderTestBase extends 
PulsarTestSuiteBase {
         // Poll this message again
         waitUtil(
                 () -> {
-                    PulsarMessage<String> message2 = 
fetchedMessage(splitReader);
+                    Message<byte[]> message2 = fetchedMessage(splitReader);
                     return message2 != null;
                 },
                 ofSeconds(Integer.MAX_VALUE),
@@ -234,7 +232,7 @@ abstract class PulsarPartitionSplitReaderTestBase extends 
PulsarTestSuiteBase {
 
     @TestTemplate
     void consumeMessageCreatedAfterHandleSplitChangesAndFetch(
-            PulsarPartitionSplitReaderBase<String> splitReader) {
+            PulsarPartitionSplitReaderBase splitReader) {
         String topicName = randomAlphabetic(10);
         handleSplit(splitReader, topicName, 0, MessageId.latest);
         operator().sendMessage(topicNameWithPartition(topicName, 0), STRING, 
randomAlphabetic(10));
@@ -243,7 +241,7 @@ abstract class PulsarPartitionSplitReaderTestBase extends 
PulsarTestSuiteBase {
 
     @TestTemplate
     void consumeMessageCreatedBeforeHandleSplitsChanges(
-            PulsarPartitionSplitReaderBase<String> splitReader) {
+            PulsarPartitionSplitReaderBase splitReader) {
         String topicName = randomAlphabetic(10);
         operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
         seekStartPositionAndHandleSplit(splitReader, topicName, 0);
@@ -252,7 +250,7 @@ abstract class PulsarPartitionSplitReaderTestBase extends 
PulsarTestSuiteBase {
 
     @TestTemplate
     void 
consumeMessageCreatedBeforeHandleSplitsChangesAndResetToEarliestPosition(
-            PulsarPartitionSplitReaderBase<String> splitReader) {
+            PulsarPartitionSplitReaderBase splitReader) {
         String topicName = randomAlphabetic(10);
         operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
         seekStartPositionAndHandleSplit(splitReader, topicName, 0, 
MessageId.earliest);
@@ -261,7 +259,7 @@ abstract class PulsarPartitionSplitReaderTestBase extends 
PulsarTestSuiteBase {
 
     @TestTemplate
     void 
consumeMessageCreatedBeforeHandleSplitsChangesAndResetToLatestPosition(
-            PulsarPartitionSplitReaderBase<String> splitReader) {
+            PulsarPartitionSplitReaderBase splitReader) {
         String topicName = randomAlphabetic(10);
         operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
         seekStartPositionAndHandleSplit(splitReader, topicName, 0, 
MessageId.latest);
@@ -270,7 +268,7 @@ abstract class PulsarPartitionSplitReaderTestBase extends 
PulsarTestSuiteBase {
 
     @TestTemplate
     void 
consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageIdCursor(
-            PulsarPartitionSplitReaderBase<String> splitReader) {
+            PulsarPartitionSplitReaderBase splitReader) {
 
         String topicName = randomAlphabetic(10);
         operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
@@ -296,7 +294,7 @@ abstract class PulsarPartitionSplitReaderTestBase extends 
PulsarTestSuiteBase {
     }
 
     @TestTemplate
-    void emptyTopic(PulsarPartitionSplitReaderBase<String> splitReader) {
+    void emptyTopic(PulsarPartitionSplitReaderBase splitReader) {
         String topicName = randomAlphabetic(10);
         operator().createTopic(topicName, DEFAULT_PARTITIONS);
         seekStartPositionAndHandleSplit(splitReader, topicName, 0);
@@ -304,7 +302,7 @@ abstract class PulsarPartitionSplitReaderTestBase extends 
PulsarTestSuiteBase {
     }
 
     @TestTemplate
-    void emptyTopicWithoutSeek(PulsarPartitionSplitReaderBase<String> 
splitReader) {
+    void emptyTopicWithoutSeek(PulsarPartitionSplitReaderBase splitReader) {
         String topicName = randomAlphabetic(10);
         operator().createTopic(topicName, DEFAULT_PARTITIONS);
         handleSplit(splitReader, topicName, 0);
@@ -312,8 +310,7 @@ abstract class PulsarPartitionSplitReaderTestBase extends 
PulsarTestSuiteBase {
     }
 
     @TestTemplate
-    void wakeupSplitReaderShouldNotCauseException(
-            PulsarPartitionSplitReaderBase<String> splitReader) {
+    void 
wakeupSplitReaderShouldNotCauseException(PulsarPartitionSplitReaderBase 
splitReader) {
         handleSplit(splitReader, "non-exist", 0);
         AtomicReference<Throwable> error = new AtomicReference<>();
         Thread t =
@@ -336,25 +333,18 @@ abstract class PulsarPartitionSplitReaderTestBase extends 
PulsarTestSuiteBase {
     }
 
     @TestTemplate
-    void assignNoSplits(PulsarPartitionSplitReaderBase<String> splitReader) {
+    void assignNoSplits(PulsarPartitionSplitReaderBase splitReader) {
         assertThat(fetchedMessage(splitReader)).isNull();
     }
 
     /** Create a split reader with max message 1, fetch timeout 1s. */
-    private PulsarPartitionSplitReaderBase<String> 
splitReader(SubscriptionType subscriptionType) {
+    private PulsarPartitionSplitReaderBase splitReader(SubscriptionType 
subscriptionType) {
         if (subscriptionType == SubscriptionType.Failover) {
-            return new PulsarOrderedPartitionSplitReader<>(
-                    operator().client(),
-                    operator().admin(),
-                    sourceConfig(),
-                    flinkSchema(new SimpleStringSchema()));
+            return new PulsarOrderedPartitionSplitReader(
+                    operator().client(), operator().admin(), sourceConfig());
         } else {
-            return new PulsarUnorderedPartitionSplitReader<>(
-                    operator().client(),
-                    operator().admin(),
-                    sourceConfig(),
-                    flinkSchema(new SimpleStringSchema()),
-                    null);
+            return new PulsarUnorderedPartitionSplitReader(
+                    operator().client(), operator().admin(), sourceConfig(), 
null);
         }
     }
 
@@ -382,9 +372,9 @@ abstract class PulsarPartitionSplitReaderTestBase extends 
PulsarTestSuiteBase {
     public static class PulsarSplitReaderInvocationContext
             implements TestTemplateInvocationContext {
 
-        private final PulsarPartitionSplitReaderBase<?> splitReader;
+        private final PulsarPartitionSplitReaderBase splitReader;
 
-        public 
PulsarSplitReaderInvocationContext(PulsarPartitionSplitReaderBase<?> 
splitReader) {
+        public 
PulsarSplitReaderInvocationContext(PulsarPartitionSplitReaderBase splitReader) {
             this.splitReader = checkNotNull(splitReader);
         }
 

Reply via email to