markap14 commented on code in PR #11034:
URL: https://github.com/apache/nifi/pull/11034#discussion_r3102858886


##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java:
##########
@@ -810,4 +810,751 @@ public String changePartitionName(String swapLocation, 
String newPartitionName)
             return swapLocation;
         }
     }
+
+    // 
=========================================================================
+    // Truncation Feature: Helpers
+    // 
=========================================================================
+
+    /**
+     * Creates a mock queue + connection + queueProvider wired together, 
suitable for runtime truncation tests.
+     * Returns [claimManager, queueProvider, queue].
+     */
+    private record RuntimeRepoContext(StandardResourceClaimManager 
claimManager, TestQueueProvider queueProvider, FlowFileQueue queue) {
+    }
+
+    private RuntimeRepoContext createRuntimeRepoContext() {
+        final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+        final TestQueueProvider queueProvider = new TestQueueProvider();
+        final Connection connection = Mockito.mock(Connection.class);
+        when(connection.getIdentifier()).thenReturn("1234");
+        
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+        final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+        when(queue.getIdentifier()).thenReturn("1234");
+        when(connection.getFlowFileQueue()).thenReturn(queue);
+        queueProvider.addConnection(connection);
+        return new RuntimeRepoContext(claimManager, queueProvider, queue);
+    }
+
+    private StandardContentClaim createClaim(final ResourceClaim rc, final 
long offset, final long length, final boolean truncationCandidate) {
+        final StandardContentClaim claim = new StandardContentClaim(rc, 
offset);
+        claim.setLength(length);
+        if (truncationCandidate) {
+            claim.setTruncationCandidate(true);
+        }
+        return claim;
+    }
+
+    private void createAndDeleteFlowFile(final WriteAheadFlowFileRepository 
repo, final FlowFileQueue queue,
+                                         final ContentClaim claim) throws 
IOException {
+        final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+                .id(1L)
+                .addAttribute("uuid", UUID.randomUUID().toString())
+                .contentClaim(claim)
+                .build();
+
+        final StandardRepositoryRecord createRecord = new 
StandardRepositoryRecord(queue);
+        createRecord.setWorking(flowFile, false);
+        createRecord.setDestination(queue);
+        repo.updateRepository(List.of(createRecord));
+
+        final StandardRepositoryRecord deleteRecord = new 
StandardRepositoryRecord(queue, flowFile);
+        deleteRecord.markForDelete();
+        repo.updateRepository(List.of(deleteRecord));
+    }
+
+    /**
+     * Writes FlowFiles (one per claim) to a new repo, closes it, then 
recovers into a fresh repo
+     * and returns the recovered FlowFileRecords.
+     */
+    private List<FlowFileRecord> writeAndRecover(final ContentClaim... claims) 
throws IOException {
+        final ResourceClaimManager writeClaimManager = new 
StandardResourceClaimManager();
+        final TestQueueProvider writeQueueProvider = new TestQueueProvider();
+        final Connection writeConnection = Mockito.mock(Connection.class);
+        when(writeConnection.getIdentifier()).thenReturn("1234");
+        
when(writeConnection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+        final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
+        final FlowFileQueue writeQueue = new StandardFlowFileQueue("1234", 
null, null, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
+        when(writeConnection.getFlowFileQueue()).thenReturn(writeQueue);
+        writeQueueProvider.addConnection(writeConnection);
+
+        try (final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo.initialize(writeClaimManager);
+            repo.loadFlowFiles(writeQueueProvider);
+
+            final List<RepositoryRecord> records = new ArrayList<>();
+            for (int i = 0; i < claims.length; i++) {
+                final FlowFileRecord ff = new StandardFlowFileRecord.Builder()
+                        .id(i + 1L)
+                        .addAttribute("uuid", "11111111-1111-1111-1111-" + 
String.format("%012d", i + 1))
+                        .contentClaim(claims[i])
+                        .build();
+                final StandardRepositoryRecord rec = new 
StandardRepositoryRecord(writeQueue);
+                rec.setWorking(ff, false);
+                rec.setDestination(writeQueue);
+                records.add(rec);
+            }
+            repo.updateRepository(records);
+        }
+
+        // Recover
+        final List<FlowFileRecord> recovered = new ArrayList<>();
+        final FlowFileQueue recoveryQueue = Mockito.mock(FlowFileQueue.class);
+        when(recoveryQueue.getIdentifier()).thenReturn("1234");
+        doAnswer(invocation -> {
+            recovered.add((FlowFileRecord) invocation.getArguments()[0]);
+            return null;
+        }).when(recoveryQueue).put(any(FlowFileRecord.class));
+
+        final Connection recoveryConnection = Mockito.mock(Connection.class);
+        when(recoveryConnection.getIdentifier()).thenReturn("1234");
+        when(recoveryConnection.getFlowFileQueue()).thenReturn(recoveryQueue);
+        final TestQueueProvider recoveryQueueProvider = new 
TestQueueProvider();
+        recoveryQueueProvider.addConnection(recoveryConnection);
+
+        try (final WriteAheadFlowFileRepository repo2 = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo2.initialize(new StandardResourceClaimManager());
+            repo2.loadFlowFiles(recoveryQueueProvider);
+        }
+
+        return recovered;
+    }
+
+    private FlowFileRecord findRecoveredByOffset(final List<FlowFileRecord> 
recovered, final long offset) {
+        return recovered.stream()
+                .filter(ff -> ff.getContentClaim() != null && 
ff.getContentClaim().getOffset() == offset)
+                .findFirst()
+                .orElse(null);
+    }
+
+    // 
=========================================================================
+    // Truncation Feature: Runtime Tests
+    // 
=========================================================================
+
+    @Test
+    public void testDeleteRecordRoutesTruncatableClaimToTruncationQueue() 
throws IOException {
+        final RuntimeRepoContext context = createRuntimeRepoContext();
+        final ResourceClaim resourceClaim = 
context.claimManager().newResourceClaim("container", "section", "1", false, 
false);
+        context.claimManager().incrementClaimantCount(resourceClaim);
+        context.claimManager().incrementClaimantCount(resourceClaim); // count 
= 2 so that after delete decrement it stays > 0 (not destructable)
+        final StandardContentClaim contentClaim = createClaim(resourceClaim, 
1024L, 5_000_000L, true);

Review Comment:
   I find named constants to be distracting and significantly reducing code 
readability so I avoid them for constants unless they meet 2 conditions: (a) 
the value is repeated at least twice; and (b) changing one of them necessitates 
or strongly influences the need to change the others.
   
   Having several constants throughout the code that happen to be the same is 
perfectly fine if they are entirely independent of one another. So analysis of 
the repeated constants shows:
   
   `Offsets` (1024L, 2048L, 100L, (i + 1) * 1024L): arbitrary positions per 
test; not coupled across tests. This should be left as-is
   
   5_000_000L: used in ~9 runtime tests, always paired with true (pre-marked 
candidate). All represent the same concept: "a claim length large enough to be 
a truncation candidate (> 1 MB threshold)." If the threshold ever changed, all 
of these must be updated together. These should be extracted into a constant.
   
   2_000_000L: used in ~5 recovery tests, but several of those tests also 
encode an adjacent offset (e.g. 2_000_100L = 100L + 2_000_000L). Extracting the 
length without also coupling the offsets would introduce inconsistency within 
those tests. These should be left as-is to avoid fragile cross-coupling.
   
   `new byte[100]`: appears in five tests as "a small amount of data for a 
non-truncatable claim" but each is entirely independent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to