markap14 commented on code in PR #11034:
URL: https://github.com/apache/nifi/pull/11034#discussion_r3102858886
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java:
##########
@@ -810,4 +810,751 @@ public String changePartitionName(String swapLocation,
String newPartitionName)
return swapLocation;
}
}
+
+ //
=========================================================================
+ // Truncation Feature: Helpers
+ //
=========================================================================
+
+ /**
+ * Creates a mock queue + connection + queueProvider wired together,
suitable for runtime truncation tests.
+ * Returns [claimManager, queueProvider, queue].
+ */
+ private record RuntimeRepoContext(StandardResourceClaimManager
claimManager, TestQueueProvider queueProvider, FlowFileQueue queue) {
+ }
+
+ private RuntimeRepoContext createRuntimeRepoContext() {
+ final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+ final TestQueueProvider queueProvider = new TestQueueProvider();
+ final Connection connection = Mockito.mock(Connection.class);
+ when(connection.getIdentifier()).thenReturn("1234");
+
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+ final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+ when(queue.getIdentifier()).thenReturn("1234");
+ when(connection.getFlowFileQueue()).thenReturn(queue);
+ queueProvider.addConnection(connection);
+ return new RuntimeRepoContext(claimManager, queueProvider, queue);
+ }
+
+ private StandardContentClaim createClaim(final ResourceClaim rc, final
long offset, final long length, final boolean truncationCandidate) {
+ final StandardContentClaim claim = new StandardContentClaim(rc,
offset);
+ claim.setLength(length);
+ if (truncationCandidate) {
+ claim.setTruncationCandidate(true);
+ }
+ return claim;
+ }
+
+ private void createAndDeleteFlowFile(final WriteAheadFlowFileRepository
repo, final FlowFileQueue queue,
+ final ContentClaim claim) throws
IOException {
+ final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+ .id(1L)
+ .addAttribute("uuid", UUID.randomUUID().toString())
+ .contentClaim(claim)
+ .build();
+
+ final StandardRepositoryRecord createRecord = new
StandardRepositoryRecord(queue);
+ createRecord.setWorking(flowFile, false);
+ createRecord.setDestination(queue);
+ repo.updateRepository(List.of(createRecord));
+
+ final StandardRepositoryRecord deleteRecord = new
StandardRepositoryRecord(queue, flowFile);
+ deleteRecord.markForDelete();
+ repo.updateRepository(List.of(deleteRecord));
+ }
+
+ /**
+ * Writes FlowFiles (one per claim) to a new repo, closes it, then
recovers into a fresh repo
+ * and returns the recovered FlowFileRecords.
+ */
+ private List<FlowFileRecord> writeAndRecover(final ContentClaim... claims)
throws IOException {
+ final ResourceClaimManager writeClaimManager = new
StandardResourceClaimManager();
+ final TestQueueProvider writeQueueProvider = new TestQueueProvider();
+ final Connection writeConnection = Mockito.mock(Connection.class);
+ when(writeConnection.getIdentifier()).thenReturn("1234");
+
when(writeConnection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+ final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
+ final FlowFileQueue writeQueue = new StandardFlowFileQueue("1234",
null, null, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
+ when(writeConnection.getFlowFileQueue()).thenReturn(writeQueue);
+ writeQueueProvider.addConnection(writeConnection);
+
+ try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo.initialize(writeClaimManager);
+ repo.loadFlowFiles(writeQueueProvider);
+
+ final List<RepositoryRecord> records = new ArrayList<>();
+ for (int i = 0; i < claims.length; i++) {
+ final FlowFileRecord ff = new StandardFlowFileRecord.Builder()
+ .id(i + 1L)
+ .addAttribute("uuid", "11111111-1111-1111-1111-" +
String.format("%012d", i + 1))
+ .contentClaim(claims[i])
+ .build();
+ final StandardRepositoryRecord rec = new
StandardRepositoryRecord(writeQueue);
+ rec.setWorking(ff, false);
+ rec.setDestination(writeQueue);
+ records.add(rec);
+ }
+ repo.updateRepository(records);
+ }
+
+ // Recover
+ final List<FlowFileRecord> recovered = new ArrayList<>();
+ final FlowFileQueue recoveryQueue = Mockito.mock(FlowFileQueue.class);
+ when(recoveryQueue.getIdentifier()).thenReturn("1234");
+ doAnswer(invocation -> {
+ recovered.add((FlowFileRecord) invocation.getArguments()[0]);
+ return null;
+ }).when(recoveryQueue).put(any(FlowFileRecord.class));
+
+ final Connection recoveryConnection = Mockito.mock(Connection.class);
+ when(recoveryConnection.getIdentifier()).thenReturn("1234");
+ when(recoveryConnection.getFlowFileQueue()).thenReturn(recoveryQueue);
+ final TestQueueProvider recoveryQueueProvider = new
TestQueueProvider();
+ recoveryQueueProvider.addConnection(recoveryConnection);
+
+ try (final WriteAheadFlowFileRepository repo2 = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo2.initialize(new StandardResourceClaimManager());
+ repo2.loadFlowFiles(recoveryQueueProvider);
+ }
+
+ return recovered;
+ }
+
+ private FlowFileRecord findRecoveredByOffset(final List<FlowFileRecord>
recovered, final long offset) {
+ return recovered.stream()
+ .filter(ff -> ff.getContentClaim() != null &&
ff.getContentClaim().getOffset() == offset)
+ .findFirst()
+ .orElse(null);
+ }
+
+ //
=========================================================================
+ // Truncation Feature: Runtime Tests
+ //
=========================================================================
+
+ @Test
+ public void testDeleteRecordRoutesTruncatableClaimToTruncationQueue()
throws IOException {
+ final RuntimeRepoContext context = createRuntimeRepoContext();
+ final ResourceClaim resourceClaim =
context.claimManager().newResourceClaim("container", "section", "1", false,
false);
+ context.claimManager().incrementClaimantCount(resourceClaim);
+ context.claimManager().incrementClaimantCount(resourceClaim); // count
= 2 so that after delete decrement it stays > 0 (not destructable)
+ final StandardContentClaim contentClaim = createClaim(resourceClaim,
1024L, 5_000_000L, true);
Review Comment:
I find named constants to be distracting and significantly reducing code
readability so I avoid them for constants unless they meet 2 conditions: (a)
the value is repeated at least twice; and (b) changing one of them necessitates
or strongly influences the need to change the others.
Having several constants throughout the code that happen to be the same is
perfectly fine if they are entirely independent of one another. So analysis of
the repeated constants shows:
`Offsets` (1024L, 2048L, 100L, (i + 1) * 1024L): arbitrary positions per
test; not coupled across tests. This should be left as-is
5_000_000L: used in ~9 runtime tests, always paired with true (pre-marked
candidate). All represent the same concept: "a claim length large enough to be
a truncation candidate (> 1 MB threshold)." If the threshold ever changed, all
of these must be updated together. These should be extracted into a constant.
2_000_000L: used in ~5 recovery tests, but several of those tests also
encode an adjacent offset (e.g. 2_000_100L = 100L + 2_000_000L). Extracting the
length without also coupling the offsets would introduce inconsistency within
those tests. These should be left as-is to avoid fragile cross-coupling.
`new byte[100]`: appears in five tests as "a small amount of data for a
non-truncatable claim" but each is entirely independent
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]