Jackie-Jiang commented on code in PR #12812:
URL: https://github.com/apache/pinot/pull/12812#discussion_r1558333101


##########
pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java:
##########
@@ -45,93 +40,66 @@
 public class PulsarPartitionLevelConsumer extends 
PulsarPartitionLevelConnectionHandler
     implements PartitionGroupConsumer {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PulsarPartitionLevelConsumer.class);
+  private final Reader<byte[]> _reader;
+  private MessageId _nextMessageId = null;
 
-  private final Reader _reader;
-
-  // TODO: Revisit the logic of using a separate executor to manage the 
request timeout. Currently it is not thread safe
-  private final ExecutorService _executorService = 
Executors.newSingleThreadExecutor();
-
-  public PulsarPartitionLevelConsumer(String clientId, StreamConfig 
streamConfig, int partitionId) {
+  public PulsarPartitionLevelConsumer(String clientId, StreamConfig 
streamConfig, int partition) {
     super(clientId, streamConfig);
+    String topicName = _config.getPulsarTopicName();
     try {
-      _reader = createReaderForPartition(partitionId);
+      List<String> partitions = 
_pulsarClient.getPartitionsForTopic(topicName).get();
+      _reader = 
_pulsarClient.newReader().topic(partitions.get(partition)).startMessageId(MessageId.earliest)
+          .startMessageIdInclusive().create();
     } catch (Exception e) {
-      throw new RuntimeException("Caught exception while creating Pulsar 
reader", e);
+      throw new RuntimeException(
+          String.format("Caught exception while creating Pulsar reader for 
topic: %s, partition: %d", topicName,
+              partition), e);
     }
-    LOGGER.info("Created Pulsar reader with topic: {}, partition: {}, initial 
message id: {}",
-        _config.getPulsarTopicName(), partitionId, 
_config.getInitialMessageId());
+    LOGGER.info("Created Pulsar reader for topic: {}, partition: {}", 
topicName, partition);
   }
 
-  /**
-   * Fetch records from the Pulsar stream between the start and end 
StreamPartitionMsgOffset
-   * Used {@link org.apache.pulsar.client.api.Reader} to read the messaged 
from pulsar partitioned topic
-   * The reader seeks to the startMsgOffset and starts reading records in a 
loop until endMsgOffset or timeout is
-   * reached.
-   */
   @Override
-  public PulsarMessageBatch fetchMessages(StreamPartitionMsgOffset 
startMsgOffset, int timeoutMs) {
-    MessageIdStreamOffset startOffset = (MessageIdStreamOffset) startMsgOffset;
+  public synchronized PulsarMessageBatch 
fetchMessages(StreamPartitionMsgOffset startOffset, int timeoutMs) {
+    MessageId startMessageId = ((MessageIdStreamOffset) 
startOffset).getMessageId();
+    long endTimeMs = System.currentTimeMillis() + timeoutMs;
     List<BytesStreamMessage> messages = new ArrayList<>();
-    Future<PulsarMessageBatch> pulsarResultFuture = _executorService.submit(() 
-> fetchMessages(startOffset, messages));
-    try {
-      return pulsarResultFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
-    } catch (TimeoutException e) {
-      // The fetchMessages has thrown an exception. Most common cause is the 
timeout.
-      // We return the records fetched till now along with the next start 
offset.
-      pulsarResultFuture.cancel(true);
-    } catch (Exception e) {
-      LOGGER.warn("Error while fetching records from Pulsar", e);
+
+    // Seek to the start message id if necessary
+    // NOTE: Use Objects.equals() to check reference first for performance.
+    if (!Objects.equals(startMessageId, _nextMessageId)) {

Review Comment:
   When they are the same, we don't need to seek the message id again



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to