This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 6caffca NIFI-9348 NIFI-7863 This closes #5495. Added temporary suffix and fixed [NIFI-7863] creation of the directories 6caffca is described below commit 6caffca811bcebcd1d83561cb5a036a80206ea95 Author: Gabriel Barbu <gabriel.ba...@nagarro.com> AuthorDate: Fri Oct 29 16:03:42 2021 +0300 NIFI-9348 NIFI-7863 This closes #5495. Added temporary suffix and fixed [NIFI-7863] creation of the directories Signed-off-by: Joe Witt <joew...@apache.org> --- .../org/apache/nifi/processors/smb/PutSmbFile.java | 136 ++++++++++++++++----- .../apache/nifi/processors/smb/GetSmbFileTest.java | 4 - .../apache/nifi/processors/smb/PutSmbFileTest.java | 101 +++++++++++++++ 3 files changed, 205 insertions(+), 36 deletions(-) diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java index 1d8a9e0..2accbbf 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java @@ -51,6 +51,7 @@ import java.net.URI; import com.hierynomus.smbj.SMBClient; import com.hierynomus.smbj.connection.Connection; import com.hierynomus.smbj.auth.AuthenticationContext; +import com.hierynomus.smbj.share.DiskEntry; import com.hierynomus.smbj.share.DiskShare; import com.hierynomus.smbj.session.Session; import com.hierynomus.msfscc.FileAttributes; @@ -148,6 +149,12 @@ public class PutSmbFile extends AbstractProcessor { .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .defaultValue("100") .build(); + public static final PropertyDescriptor RENAME_SUFFIX = new PropertyDescriptor.Builder() + .name("Temporary Suffix") + .description("A temporary suffix which will be apended to the filename while it's transfering. After the transfer is complete, the suffix will be removed.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("Files that have been successfully written to the output network path are transferred to this relationship") @@ -178,6 +185,7 @@ public class PutSmbFile extends AbstractProcessor { descriptors.add(SHARE_ACCESS); descriptors.add(CONFLICT_RESOLUTION); descriptors.add(BATCH_SIZE); + descriptors.add(RENAME_SUFFIX); this.descriptors = Collections.unmodifiableList(descriptors); final Set<Relationship> relationships = new HashSet<Relationship>(); @@ -236,6 +244,29 @@ public class PutSmbFile extends AbstractProcessor { this.smbClient = smbClient; } + private void createMissingDirectoriesRecursevly(ComponentLog logger, DiskShare share, String pathToCreate) { + List<String> paths = new ArrayList<>(); + + java.io.File file = new java.io.File(pathToCreate); + paths.add(file.getPath()); + + while (file.getParent() != null) { + String parent = file.getParent(); + paths.add(parent); + file = new java.io.File(parent); + } + + Collections.reverse(paths); + for (String path : paths) { + if (!share.folderExists(path)) { + logger.debug("Creating folder {}", new Object[]{path}); + share.mkdir(path); + } else { + logger.debug("Folder already exists {}. Moving on", new Object[]{path}); + } + } + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); @@ -268,33 +299,40 @@ public class PutSmbFile extends AbstractProcessor { DiskShare share = (DiskShare) smbSession.connectShare(shareName)) { for (FlowFile flowFile : flowFiles) { - String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); - final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); - final long sendStart = System.nanoTime(); - String fullPath; - - if (directory == null) { - directory = ""; - fullPath = filename; - } else { - fullPath = directory + "\\" + filename; + final long processingStartTime = System.nanoTime(); - // missing directory handling - if (context.getProperty(CREATE_DIRS).asBoolean() && !share.folderExists(directory)) { - logger.debug("Creating folder {}", new Object[]{directory}); - share.mkdir(directory); - } + final String destinationDirectory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); + final String destinationFilename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + + String destinationFullPath; + + // build destination path for the flowfile + if (destinationDirectory == null || destinationDirectory.trim().isEmpty()) { + destinationFullPath = destinationFilename; + } else { + destinationFullPath = new java.io.File(destinationDirectory, destinationFilename).getPath(); } - final URI uri = new URI("smb", hostname, "/" + fullPath.replace('\\', '/'), null); + // handle missing directory + final String destinationFileParentDirectory = new java.io.File(destinationFullPath).getParent(); + final Boolean createMissingDirectories = context.getProperty(CREATE_DIRS).asBoolean(); + if (!createMissingDirectories && !share.folderExists(destinationFileParentDirectory)) { + flowFile = session.penalize(flowFile); + logger.warn( + "Penalizing {} and routing to failure as configured because the destination directory ({}) doesn't exist", + new Object[]{ flowFile, destinationFileParentDirectory }); + session.transfer(flowFile, REL_FAILURE); + continue; + } else if (!share.folderExists(destinationFileParentDirectory)) { + createMissingDirectoriesRecursevly(logger, share, destinationFileParentDirectory); + } - // replace strategy handling - SMB2CreateDisposition createDisposition = SMB2CreateDisposition.FILE_OVERWRITE_IF; + // handle conflict resolution final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue(); - if (!conflictResolution.equals(REPLACE_RESOLUTION) && share.fileExists(fullPath)) { + if (share.fileExists(destinationFullPath)) { if (conflictResolution.equals(IGNORE_RESOLUTION)) { session.transfer(flowFile, REL_SUCCESS); - logger.info("Transferring {} to success because file with same name already exists", new Object[]{flowFile}); + logger.info("Transferring {} to success as configured because file with same name already exists", new Object[]{flowFile}); continue; } else if (conflictResolution.equals(FAIL_RESOLUTION)) { flowFile = session.penalize(flowFile); @@ -304,27 +342,61 @@ public class PutSmbFile extends AbstractProcessor { } } + // handle temporary suffix + final String renameSuffixValue = context.getProperty(RENAME_SUFFIX).getValue(); + final Boolean renameSuffix = renameSuffixValue != null && !renameSuffixValue.trim().isEmpty(); + String finalDestinationFullPath = destinationFullPath; + if (renameSuffix) { + finalDestinationFullPath += renameSuffixValue; + } - try (File f = share.openFile( - fullPath, + // handle the transfer + try ( + File shareDestinationFile = share.openFile( + finalDestinationFullPath, EnumSet.of(AccessMask.GENERIC_WRITE), EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL), sharedAccess, - createDisposition, + SMB2CreateDisposition.FILE_OVERWRITE_IF, EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH)); - OutputStream os = f.getOutputStream()) { - - session.exportTo(flowFile, os); - - final long sendNanos = System.nanoTime() - sendStart; - final long sendMillis = TimeUnit.MILLISECONDS.convert(sendNanos, TimeUnit.NANOSECONDS); - session.getProvenanceReporter().send(flowFile, uri.toString(), sendMillis); - session.transfer(flowFile, REL_SUCCESS); + OutputStream shareDestinationFileOutputStream = shareDestinationFile.getOutputStream()) { + session.exportTo(flowFile, shareDestinationFileOutputStream); } catch (Exception e) { flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); - logger.error("Penalizing {} and routing to 'failure' because of error {}", new Object[]{flowFile, e}); + logger.error("Cannot transfer the file. Penalizing {} and routing to 'failure' because of error {}", new Object[]{flowFile, e}); + continue; + } + + // handle the rename + if (renameSuffix) { + try(DiskEntry fileDiskEntry = share.open( + finalDestinationFullPath, + EnumSet.of(AccessMask.DELETE, AccessMask.GENERIC_WRITE), + EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL), + sharedAccess, + SMB2CreateDisposition.FILE_OPEN, + EnumSet.of(SMB2CreateOptions.FILE_WRITE_THROUGH))) { + + // normalize path slashes for the network share + destinationFullPath = destinationFullPath.replace("/", "\\"); + + // rename the file on the share and replace it in case it exists + fileDiskEntry.rename(destinationFullPath, true); + } catch (Exception e) { + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + logger.error("Cannot rename the file. Penalizing {} and routing to 'failure' because of error {}", new Object[]{flowFile, e}); + continue; + } } + + // handle the success + final URI provenanceUri = new URI("smb", hostname, "/" + destinationFullPath.replace('\\', '/'), null); + final long processingTimeInNano = System.nanoTime() - processingStartTime; + final long processingTimeInMilli = TimeUnit.MILLISECONDS.convert(processingTimeInNano, TimeUnit.NANOSECONDS); + session.getProvenanceReporter().send(flowFile, provenanceUri.toString(), processingTimeInMilli); + session.transfer(flowFile, REL_SUCCESS); } } catch (Exception e) { session.transfer(flowFiles, REL_FAILURE); diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/GetSmbFileTest.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/GetSmbFileTest.java index 8c1f6fe..83e6e03 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/GetSmbFileTest.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/GetSmbFileTest.java @@ -37,7 +37,6 @@ import org.junit.jupiter.api.Test; import org.mockito.MockitoAnnotations; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; @@ -60,7 +59,6 @@ public class GetSmbFileTest { private Connection connection; private Session session; private DiskShare diskShare; - private ByteArrayOutputStream baOutputStream; private final static String HOSTNAME = "host"; private final static String SHARE = "share"; @@ -75,8 +73,6 @@ public class GetSmbFileTest { session = mock(Session.class); diskShare = mock(DiskShare.class); - baOutputStream = new ByteArrayOutputStream(); - when(smbClient.connect(any(String.class))).thenReturn(connection); when(connection.authenticate(any(AuthenticationContext.class))).thenReturn(session); when(session.connectShare(SHARE)).thenReturn(diskShare); diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java index d50eb6a..e045fa6 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java @@ -22,6 +22,7 @@ import com.hierynomus.smbj.SMBClient; import com.hierynomus.smbj.auth.AuthenticationContext; import com.hierynomus.smbj.connection.Connection; import com.hierynomus.smbj.session.Session; +import com.hierynomus.smbj.share.DiskEntry; import com.hierynomus.smbj.share.DiskShare; import com.hierynomus.smbj.share.File; import org.apache.nifi.util.TestRunner; @@ -42,6 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -55,6 +57,7 @@ public class PutSmbFileTest { private Connection connection; private Session session; private DiskShare diskShare; + private DiskEntry diskEntry; private File smbfile; private ByteArrayOutputStream baOutputStream; @@ -75,6 +78,7 @@ public class PutSmbFileTest { connection = mock(Connection.class); session = mock(Session.class); diskShare = mock(DiskShare.class); + diskEntry = mock(DiskEntry.class); smbfile = mock(File.class); baOutputStream = new ByteArrayOutputStream(); @@ -89,6 +93,14 @@ public class PutSmbFileTest { any(SMB2CreateDisposition.class), anySet() )).thenReturn(smbfile); + when(diskShare.open( + any(String.class), + anySet(), + anySet(), + anySet(), + any(SMB2CreateDisposition.class), + anySet() + )).thenReturn(diskEntry); when(smbfile.getOutputStream()).thenReturn(baOutputStream); testRunner.setProperty(PutSmbFile.HOSTNAME, HOSTNAME); @@ -172,8 +184,25 @@ public class PutSmbFileTest { } @Test + public void testDirectoriesCreatedWhenDontExists() throws IOException { + final String directory = new java.io.File("a\\b\\c\\b\\e").getPath(); + final int count = directory.split(java.util.regex.Pattern.quote(java.io.File.separator)).length; + when(diskShare.folderExists(DIRECTORY)).thenReturn(false); + + testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true"); + testRunner.setProperty(PutSmbFile.DIRECTORY, directory); + testRunner.enqueue("data"); + testRunner.run(); + + verify(diskShare, times(count)).mkdir( + any(String.class) + ); + } + + @Test public void testFileShareNone() throws IOException { testRunner.setProperty(PutSmbFile.SHARE_ACCESS, PutSmbFile.SHARE_ACCESS_NONE); + testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true"); Set<SMB2ShareAccess> shareAccessSet = testOpenFileShareAccess(); assertTrue(shareAccessSet.isEmpty()); } @@ -181,6 +210,7 @@ public class PutSmbFileTest { @Test public void testFileShareRead() throws IOException { testRunner.setProperty(PutSmbFile.SHARE_ACCESS, PutSmbFile.SHARE_ACCESS_READ); + testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true"); Set<SMB2ShareAccess> shareAccessSet = testOpenFileShareAccess(); assertTrue(shareAccessSet.contains(SMB2ShareAccess.FILE_SHARE_READ)); } @@ -188,6 +218,7 @@ public class PutSmbFileTest { @Test public void testFileShareReadWriteDelete() throws IOException { testRunner.setProperty(PutSmbFile.SHARE_ACCESS, PutSmbFile.SHARE_ACCESS_READWRITEDELETE); + testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true"); Set<SMB2ShareAccess> shareAccessSet = testOpenFileShareAccess(); assertTrue(shareAccessSet.contains(SMB2ShareAccess.FILE_SHARE_READ)); assertTrue(shareAccessSet.contains(SMB2ShareAccess.FILE_SHARE_WRITE)); @@ -209,6 +240,76 @@ public class PutSmbFileTest { } @Test + public void testTemporarySuffixIsUnset() throws IOException { + testRunner.enqueue("data"); + testRunner.run(); + + verify(diskShare, never()).open( + any(String.class), + anySet(), + anySet(), + anySet(), + any(SMB2CreateDisposition.class), + anySet() + ); + } + + @Test + public void testTemporarySuffixIsSet() throws IOException { + final String suffix = ".test"; + + testRunner.setProperty(PutSmbFile.RENAME_SUFFIX, suffix); + testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true"); + testRunner.enqueue("data"); + testRunner.run(); + + ArgumentCaptor<String> filename = ArgumentCaptor.forClass(String.class); + + verify(diskShare, times(1)).open( + filename.capture(), + anySet(), + anySet(), + anySet(), + any(SMB2CreateDisposition.class), + anySet() + ); + + assertTrue(filename.getValue().endsWith(suffix), "Suffix is not present"); + } + + @Test + public void testTemporarySuffixIsSetRenameIsCalled() throws IOException { + final String suffix = ".test"; + + testRunner.setProperty(PutSmbFile.RENAME_SUFFIX, suffix); + testRunner.setProperty(PutSmbFile.CREATE_DIRS, "true"); + testRunner.enqueue("data"); + testRunner.run(); + + ArgumentCaptor<String> initialFilename = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<String> finalFilename = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<Boolean> replace = ArgumentCaptor.forClass(Boolean.class); + + verify(diskShare, times(1)).open( + initialFilename.capture(), + anySet(), + anySet(), + anySet(), + any(SMB2CreateDisposition.class), + anySet() + ); + + verify(diskEntry, times(1)).rename( + finalFilename.capture(), + replace.capture() + ); + + assertTrue(initialFilename.getValue().endsWith(suffix), "Suffix is not present and it should be"); + assertTrue(!finalFilename.getValue().endsWith(suffix), "Suffix is present and it shouldn't be"); + assertTrue(replace.getValue(), "Replace flag shold be true"); + } + + @Test public void testConnectionError() throws IOException { String emsg = "mock connection exception"; when(smbClient.connect(any(String.class))).thenThrow(new IOException(emsg));