si-sun commented on a change in pull request #4169:
URL: https://github.com/apache/nifi/pull/4169#discussion_r464252831



##########
File path: 
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java
##########
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import com.hierynomus.msdtyp.AccessMask;
+import com.hierynomus.mserref.NtStatus;
+import com.hierynomus.msfscc.FileAttributes;
+import com.hierynomus.msfscc.fileinformation.FileAllInformation;
+import com.hierynomus.msfscc.fileinformation.FileBasicInformation;
+import com.hierynomus.msfscc.fileinformation.FileIdBothDirectoryInformation;
+import com.hierynomus.mssmb2.SMB2CreateDisposition;
+import com.hierynomus.mssmb2.SMB2CreateOptions;
+import com.hierynomus.mssmb2.SMB2ShareAccess;
+import com.hierynomus.mssmb2.SMBApiException;
+import com.hierynomus.smbj.SMBClient;
+import com.hierynomus.smbj.auth.AuthenticationContext;
+import com.hierynomus.smbj.connection.Connection;
+import com.hierynomus.smbj.session.Session;
+import com.hierynomus.smbj.share.DiskShare;
+import com.hierynomus.smbj.share.File;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.InputStream;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Collections;
+import java.util.ListIterator;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.ArrayList;
+import java.util.Locale;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+
+@TriggerWhenEmpty
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"samba, smb, cifs, files, get"})
+@CapabilityDescription("Reads file from a samba network location to FlowFiles. 
" +
+    "Use this processor instead of a cifs mounts if share access control is 
important.")
+@SeeAlso({PutSmbFile.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The filename 
is set to the name of the file on the network share"),
+        @WritesAttribute(attribute = "path", description = "The path is set to 
the relative path of the file's network share name. For example, "
+                + "if the input is set to \\\\hostname\\share\\tmp, files 
picked up from \\tmp will have the path attribute set to tmp"),
+        @WritesAttribute(attribute = "file.creationTime", description = "The 
date and time that the file was created. May not work on all file systems"),
+        @WritesAttribute(attribute = "file.lastModifiedTime", description = 
"The date and time that the file was last modified. May not work on all "
+                + "file systems"),
+        @WritesAttribute(attribute = "file.lastAccessTime", description = "The 
date and time that the file was last accessed. May not work on all "
+                + "file systems"),
+        @WritesAttribute(attribute = "absolute.path", description = "The full 
path from where a file was picked up. This includes "
+                + "the hostname and the share name")})
+public class GetSmbFile extends AbstractProcessor {
+    public static final String SHARE_ACCESS_NONE = "none";
+    public static final String SHARE_ACCESS_READ = "read";
+    public static final String SHARE_ACCESS_READDELETE = "read, delete";
+    public static final String SHARE_ACCESS_READWRITEDELETE = "read, write, 
delete";
+
+
+    public static final PropertyDescriptor HOSTNAME = new 
PropertyDescriptor.Builder()
+            .name("Hostname")
+            .description("The network host to which files should be written.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor SHARE = new 
PropertyDescriptor.Builder()
+            .name("Share")
+            .description("The network share to which files should be written.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor DIRECTORY = new 
PropertyDescriptor.Builder()
+            .name("Directory")
+            .description("The network folder to which files should be written. 
You may use expression language.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor DOMAIN = new 
PropertyDescriptor.Builder()
+            .name("Domain")
+            .description("The domain use for authentication")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
+            .name("Username")
+            .description("The username use for authentication")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
+            .name("Password")
+            .description("The password use for authentication")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor SHARE_ACCESS = new 
PropertyDescriptor.Builder()
+            .name("Share Access Strategy")
+            .description("Indicates which shared access are granted on the 
file during the read. " +
+                "None is the most restrictive, but the safest setting to 
prevent corruption.")
+            .required(true)
+            .defaultValue(SHARE_ACCESS_NONE)
+            .allowableValues(SHARE_ACCESS_NONE, SHARE_ACCESS_READ, 
SHARE_ACCESS_READDELETE, SHARE_ACCESS_READWRITEDELETE)
+            .build();
+    public static final PropertyDescriptor RECURSE = new 
PropertyDescriptor.Builder()
+            .name("Recurse Subdirectories")
+            .description("Indicates whether or not to pull files from 
subdirectories")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+    public static final PropertyDescriptor KEEP_SOURCE_FILE = new 
PropertyDescriptor.Builder()
+            .name("Keep Source File")
+            .description("If true, the file is not deleted after it has been 
copied to the Content Repository; "
+                    + "this causes the file to be picked up continually and is 
useful for testing purposes.  "
+                    + "If not keeping original NiFi will need write 
permissions on the directory it is pulling "
+                    + "from otherwise it will ignore the file.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+    public static final PropertyDescriptor FILE_FILTER = new 
PropertyDescriptor.Builder()
+            .name("File Filter")
+            .description("Only files whose names match the given regular 
expression will be picked up")
+            .required(false)
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor PATH_FILTER = new 
PropertyDescriptor.Builder()
+            .name("Path Filter")
+            .description("When " + RECURSE.getName() + " is true, then only 
subdirectories whose path matches the given regular expression will be scanned")
+            .required(false)
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new 
PropertyDescriptor.Builder()
+            .name("Ignore Hidden Files")
+            .description("Indicates whether or not hidden files should be 
ignored")
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor POLLING_INTERVAL = new 
PropertyDescriptor.Builder()
+            .name("Polling Interval")
+            .description("Indicates how long to wait before performing a 
directory listing")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("0 sec")
+            .build();
+    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Batch Size")
+            .description("The maximum number of files to pull in each 
iteration")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("10")
+            .build();
+
+    public static final String FILE_CREATION_TIME_ATTRIBUTE = 
"file.creationTime";
+    public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = 
"file.lastModifiedTime";
+    public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = 
"file.lastAccessTime";
+
+    public static final String FILE_MODIFY_DATE_ATTR_FORMAT = 
"yyyy-MM-dd'T'HH:mm:ssZ";
+    final static DateFormat dateFormatter = new 
SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
+
+    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success").description("All files are routed to 
success").build();
+
+    private List<PropertyDescriptor> descriptors;
+    private Set<Relationship> relationships;
+
+
+    private final BlockingQueue<String> fileQueue = new 
LinkedBlockingQueue<>();
+    private final Set<String> inProcess = new HashSet<>();    // guarded by 
queueLock
+    private final Set<String> recentlyProcessed = new HashSet<>();    // 
guarded by queueLock
+    private final Lock queueLock = new ReentrantLock();
+
+    private final Lock listingLock = new ReentrantLock();
+
+    private final AtomicLong queueLastUpdated = new AtomicLong(0L);
+
+    private SMBClient smbClient = null;

Review comment:
       @joewitt it's the same answer as above. We're calling connect in the 
onTrigger function. This [connect method is in a synchronized 
block](https://github.com/hierynomus/smbj/blob/a8c74679c977946ff99a68cd19e69c6b20cf93d6/src/main/java/com/hierynomus/smbj/SMBClient.java#L96),
 so we don't need to put triggerserially.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to