BewareMyPower commented on code in PR #25148:
URL: https://github.com/apache/pulsar/pull/25148#discussion_r2696593233


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java:
##########
@@ -457,6 +457,56 @@ public void run() {
         }
     }
 
+    @Test
+    public void testConcurrentResetCursorByTimestamp() throws Exception {

Review Comment:
   This test can pass even without the change of this PR.
   
   > fence reset-cursor-by-timestamp to avoid concurrent resets
   
   Is the fencing mechanism change necessary? Even if so, it would be better to 
open a separated PR for it.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java:
##########
@@ -428,7 +428,7 @@ public void testConcurrentResetCursor() throws Exception {
             messageIds.add(msgId);
         }
 
-        List<PulsarAdminException> exceptions = new ArrayList<>();
+        List<PulsarAdminException> exceptions = new 
java.util.concurrent.CopyOnWriteArrayList<>();

Review Comment:
   Same long type



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java:
##########
@@ -183,6 +185,59 @@ public void 
testPartialCachingWithMultipleEntriesInCacheWhilePartialReadFails()
         verify(pendingReadsManager, times(1)).readEntries(any(), eq(0L), 
eq(99L), any(), any(), any());
     }
 
+    @Test
+    public void testReadFromStorageRetriesWhenHandleClosed() {
+        RangeEntryCacheManagerImpl mockEntryCacheManager = 
mock(RangeEntryCacheManagerImpl.class);
+        ManagedLedgerFactoryMBeanImpl mlFactoryMBean = 
mock(ManagedLedgerFactoryMBeanImpl.class);
+        
when(mockEntryCacheManager.getMlFactoryMBean()).thenReturn(mlFactoryMBean);
+        ManagedLedgerImpl mockManagedLedger = mock(ManagedLedgerImpl.class);
+        ManagedLedgerMBeanImpl mockManagedLedgerMBean = 
mock(ManagedLedgerMBeanImpl.class);
+        when(mockManagedLedger.getMbean()).thenReturn(mockManagedLedgerMBean);
+        when(mockManagedLedger.getName()).thenReturn("testManagedLedger");
+        
when(mockManagedLedger.getExecutor()).thenReturn(mock(java.util.concurrent.ExecutorService.class));
+        
when(mockManagedLedger.getOptionalLedgerInfo(1L)).thenReturn(Optional.empty());
+        RangeCacheRemovalQueue mockRangeCacheRemovalQueue = 
mock(RangeCacheRemovalQueue.class);
+        when(mockRangeCacheRemovalQueue.addEntry(any())).thenReturn(true);
+        InflightReadsLimiter inflightReadsLimiter = 
mock(InflightReadsLimiter.class);
+        
when(mockEntryCacheManager.getInflightReadsLimiter()).thenReturn(inflightReadsLimiter);
+        doAnswer(invocation -> {
+            long permits = invocation.getArgument(0);
+            InflightReadsLimiter.Handle handle = new 
InflightReadsLimiter.Handle(permits, System.currentTimeMillis(),
+                    true);
+            return Optional.of(handle);
+        }).when(inflightReadsLimiter).acquire(anyLong(), any());
+
+        RangeEntryCacheImpl cache = new 
RangeEntryCacheImpl(mockEntryCacheManager, mockManagedLedger, false,
+                mockRangeCacheRemovalQueue, EntryLengthFunction.DEFAULT, 
mock(PendingReadsManager.class));
+
+        ReadHandle readHandle = mock(ReadHandle.class);
+        when(readHandle.getId()).thenReturn(1L);
+        
when(mockManagedLedger.reopenReadHandle(1L)).thenReturn(CompletableFuture.completedFuture(readHandle));
+
+        LedgerEntryImpl ledgerEntry = LedgerEntryImpl.create(1L, 0L, 1, 
Unpooled.wrappedBuffer(new byte[] {1}));
+        org.apache.bookkeeper.client.api.LedgerEntries ledgerEntries =
+                mock(org.apache.bookkeeper.client.api.LedgerEntries.class);
+        List<LedgerEntry> entryList = List.of((LedgerEntry) ledgerEntry);
+        when(ledgerEntries.iterator()).thenReturn(entryList.iterator());
+
+        java.util.concurrent.atomic.AtomicInteger readAttempts = new 
java.util.concurrent.atomic.AtomicInteger();

Review Comment:
   I see some types include full package path, could you just import them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to