This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 1c2469bf33 NIFI-13213 Added Validation for Swap File Names This closes 8812 1c2469bf33 is described below commit 1c2469bf33ffac1554dd418aadbfb1c7f2e98357 Author: exceptionfactory <exceptionfact...@apache.org> AuthorDate: Thu May 9 16:20:56 2024 -0500 NIFI-13213 Added Validation for Swap File Names This closes 8812 Signed-off-by: Joseph Witt <joew...@apache.org> --- .../nifi/controller/FileSystemSwapManager.java | 24 ++++++++++-- .../nifi/controller/TestFileSystemSwapManager.java | 45 +++++++++++++++------- 2 files changed, 52 insertions(+), 17 deletions(-) diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java index b01bdc9f79..2b56cedb7b 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java @@ -65,6 +65,7 @@ import java.util.List; import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -79,6 +80,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+?(\\..*?)?\\.swap"); private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+?(\\..*?)?\\.swap\\.part"); + private static final Pattern UUID_PATTERN = Pattern.compile("([a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12})"); public static final String EVENT_CATEGORY = "Swap FlowFiles"; private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class); @@ -133,11 +135,11 @@ public class FileSystemSwapManager implements FlowFileSwapManager { return null; } - final String swapFilePrefix = System.currentTimeMillis() + "-" + flowFileQueue.getIdentifier() + "-" + UUID.randomUUID().toString(); - final String swapFileBaseName = partitionName == null ? swapFilePrefix : swapFilePrefix + "." + partitionName; - final String swapFileName = swapFileBaseName + ".swap"; + final String swapFileName = getSwapFileName(flowFileQueue.getIdentifier(), partitionName); + final Path storageDirectoryPath = storageDirectory.toPath(); + final Path swapFilePath = storageDirectoryPath.resolve(swapFileName).toAbsolutePath(); - final File swapFile = new File(storageDirectory, swapFileName); + final File swapFile = swapFilePath.toFile(); final File swapTempFile = new File(swapFile.getParentFile(), swapFile.getName() + ".part"); final String swapLocation = swapFile.getAbsolutePath(); @@ -482,4 +484,18 @@ public class FileSystemSwapManager implements FlowFileSwapManager { logger.debug("Changed Partition for Swap File by renaming from {} to {}", swapLocation, newPartitionName); return newFile.getAbsolutePath(); } + + private String getSwapFileName(final String flowFileQueueIdentifier, final String partitionName) { + final UUID identifier; + final Matcher identifierMatcher = UUID_PATTERN.matcher(flowFileQueueIdentifier); + if (identifierMatcher.find()) { + identifier = UUID.fromString(identifierMatcher.group(1)); + } else { + throw new IllegalArgumentException("FlowFile Queue Identifier [%s] not valid".formatted(flowFileQueueIdentifier)); + } + + final String swapFilePrefix = System.currentTimeMillis() + "-" + identifier + "-" + UUID.randomUUID(); + final String swapFileBaseName = partitionName == null ? swapFilePrefix : swapFilePrefix + "." + partitionName; + return swapFileBaseName + ".swap"; + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java index a00b2c01f8..cc91b405ae 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java @@ -27,7 +27,6 @@ import org.apache.nifi.events.EventReporter; import org.apache.nifi.stream.io.StreamUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import org.mockito.Mockito; import java.io.BufferedInputStream; import java.io.DataInputStream; @@ -41,24 +40,44 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TestFileSystemSwapManager { + @Test + public void testFlowFileQueueIdentifierNotValid() { + final String identifier = "invalid-identifier"; + + final FlowFileQueue flowFileQueue = mock(FlowFileQueue.class); + when(flowFileQueue.getIdentifier()).thenReturn(identifier); + final FlowFileRepository flowFileRepo = mock(FlowFileRepository.class); + final FileSystemSwapManager swapManager = createSwapManager(flowFileRepo); + final List<FlowFileRecord> flowFileRecords = Collections.singletonList(new MockFlowFileRecord(0)); + + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, + () -> swapManager.swapOut(flowFileRecords, flowFileQueue, "partition-1")); + + assertTrue(exception.getMessage().contains(identifier)); + } + @Test public void testBackwardCompatible() throws IOException { - try (final InputStream fis = new FileInputStream(new File("src/test/resources/old-swap-file.swap")); - final DataInputStream in = new DataInputStream(new BufferedInputStream(fis))) { + try (final InputStream fis = new FileInputStream("src/test/resources/old-swap-file.swap"); + final DataInputStream in = new DataInputStream(new BufferedInputStream(fis))) { - final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); + final FlowFileQueue flowFileQueue = mock(FlowFileQueue.class); when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4"); final FileSystemSwapManager swapManager = createSwapManager(); @@ -76,11 +95,11 @@ public class TestFileSystemSwapManager { @Test public void testFailureOnRepoSwapOut() throws IOException { - final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); + final FlowFileQueue flowFileQueue = mock(FlowFileQueue.class); when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4"); - final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class); - Mockito.doThrow(new IOException("Intentional IOException for unit test")) + final FlowFileRepository flowFileRepo = mock(FlowFileRepository.class); + doThrow(new IOException("Intentional IOException for unit test")) .when(flowFileRepo).swapFlowFilesOut(any(), any(), any()); final FileSystemSwapManager swapManager = createSwapManager(flowFileRepo); @@ -96,7 +115,7 @@ public class TestFileSystemSwapManager { @Test public void testSwapFileUnknownToRepoNotSwappedIn() throws IOException { - final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); + final FlowFileQueue flowFileQueue = mock(FlowFileQueue.class); when(flowFileQueue.getIdentifier()).thenReturn(""); final File targetDir = new File("target/swap"); @@ -111,7 +130,7 @@ public class TestFileSystemSwapManager { final FileSystemSwapManager swapManager = new FileSystemSwapManager(Paths.get("target")); final ResourceClaimManager resourceClaimManager = new NopResourceClaimManager(); - final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class); + final FlowFileRepository flowFileRepo = mock(FlowFileRepository.class); swapManager.initialize(new SwapManagerInitializationContext() { @Override @@ -134,7 +153,7 @@ public class TestFileSystemSwapManager { final List<String> recoveredLocations = swapManager.recoverSwapLocations(flowFileQueue, null); assertEquals(1, recoveredLocations.size()); - final String firstLocation = recoveredLocations.get(0); + final String firstLocation = recoveredLocations.getFirst(); final SwapContents emptyContents = swapManager.swapIn(firstLocation, flowFileQueue); assertEquals(0, emptyContents.getFlowFiles().size()); @@ -144,8 +163,8 @@ public class TestFileSystemSwapManager { assertEquals(10000, contents.getFlowFiles().size()); } - private FileSystemSwapManager createSwapManager() throws IOException { - final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class); + private FileSystemSwapManager createSwapManager() { + final FlowFileRepository flowFileRepo = mock(FlowFileRepository.class); return createSwapManager(flowFileRepo); } @@ -175,7 +194,7 @@ public class TestFileSystemSwapManager { return swapManager; } - public class NopResourceClaimManager implements ResourceClaimManager { + public static class NopResourceClaimManager implements ResourceClaimManager { @Override public ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant, boolean writable) { return null;