http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java index d67ed09..6b906c2 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java @@ -121,7 +121,7 @@ import com.sun.jersey.api.client.ClientResponse.Status; @SupportsBatching @Tags({"http", "https", "remote", "copy", "archive"}) @CapabilityDescription("Performs an HTTP Post with the content of the FlowFile") -@ReadsAttribute(attribute="mime.type", description="If not sending data as a FlowFile, the mime.type attribute will be used to set the HTTP Header for Content-Type") +@ReadsAttribute(attribute = "mime.type", description = "If not sending data as a FlowFile, the mime.type attribute will be used to set the HTTP Header for Content-Type") public class PostHTTP extends AbstractProcessor { public static final String CONTENT_TYPE = "Content-Type"; @@ -143,7 +143,8 @@ public class PostHTTP extends AbstractProcessor { public static final PropertyDescriptor URL = new PropertyDescriptor.Builder() .name("URL") - .description("The URL to POST to. The first part of the URL must be static. However, the path of the URL may be defined using the Attribute Expression Language. For example, https://${hostname} is not valid, but https://1.1.1.1:8080/files/${nf.file.name} is valid.") + .description("The URL to POST to. The first part of the URL must be static. However, the path of the URL may be defined using the Attribute Expression Language. " + + "For example, https://${hostname} is not valid, but https://1.1.1.1:8080/files/${nf.file.name} is valid.") .required(true) .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("https?\\://.*"))) .addValidator(StandardValidators.URL_VALIDATOR) @@ -210,7 +211,9 @@ public class PostHTTP extends AbstractProcessor { .build(); public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder() .name("Max Batch Size") - .description("If the Send as FlowFile property is true, specifies the max data size for a batch of FlowFiles to send in a single HTTP POST. If not specified, each FlowFile will be sent separately. If the Send as FlowFile property is false, this property is ignored") + .description("If the Send as FlowFile property is true, specifies the max data size for a batch of FlowFiles to send in a single " + + "HTTP POST. If not specified, each FlowFile will be sent separately. If the Send as FlowFile property is false, this " + + "property is ignored") .required(false) .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) .defaultValue("100 MB") @@ -230,8 +233,14 @@ public class PostHTTP extends AbstractProcessor { .identifiesControllerService(SSLContextService.class) .build(); - public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Files that are successfully send will be transferred to success").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Files that fail to send will transferred to failure").build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Files that are successfully send will be transferred to success") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Files that fail to send will transferred to failure") + .build(); private Set<Relationship> relationships; private List<PropertyDescriptor> properties; @@ -281,9 +290,7 @@ public class PostHTTP extends AbstractProcessor { if (context.getProperty(URL).getValue().startsWith("https") && context.getProperty(SSL_CONTEXT_SERVICE).getValue() == null) { results.add(new ValidationResult.Builder() .explanation("URL is set to HTTPS protocol but no SSLContext has been specified") - .valid(false) - .subject("SSL Context") - .build()); + .valid(false).subject("SSL Context").build()); } return results; @@ -325,7 +332,7 @@ public class PostHTTP extends AbstractProcessor { final PoolingHttpClientConnectionManager conMan; final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - if ( sslContextService == null ) { + if (sslContextService == null) { conMan = new PoolingHttpClientConnectionManager(); } else { final SSLContext sslContext; @@ -334,16 +341,14 @@ public class PostHTTP extends AbstractProcessor { } catch (final Exception e) { throw new ProcessException(e); } - - final SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, new String[] { "TLSv1" }, null, - SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER); - - final Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create() - .register("https", sslsf).build(); - + + final SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, new String[]{"TLSv1"}, null, SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER); + + final Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create().register("https", sslsf).build(); + conMan = new PoolingHttpClientConnectionManager(socketFactoryRegistry); } - + conMan.setDefaultMaxPerRoute(context.getMaxConcurrentTasks()); conMan.setMaxTotal(context.getMaxConcurrentTasks()); config = new Config(conMan); @@ -351,15 +356,13 @@ public class PostHTTP extends AbstractProcessor { return (existingConfig == null) ? config : existingConfig; } - - - private SSLContext createSSLContext(final SSLContextService service) throws KeyStoreException, IOException, NoSuchAlgorithmException, - CertificateException, KeyManagementException, UnrecoverableKeyException - { + + private SSLContext createSSLContext(final SSLContextService service) + throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, KeyManagementException, UnrecoverableKeyException { SSLContextBuilder builder = SSLContexts.custom(); final String trustFilename = service.getTrustStoreFile(); - if ( trustFilename != null ) { - final KeyStore truststore = KeyStore.getInstance(service.getTrustStoreType()); + if (trustFilename != null) { + final KeyStore truststore = KeyStore.getInstance(service.getTrustStoreType()); try (final InputStream in = new FileInputStream(new File(service.getTrustStoreFile()))) { truststore.load(in, service.getTrustStorePassword().toCharArray()); } @@ -367,14 +370,14 @@ public class PostHTTP extends AbstractProcessor { } final String keyFilename = service.getKeyStoreFile(); - if ( keyFilename != null ) { - final KeyStore keystore = KeyStore.getInstance(service.getKeyStoreType()); + if (keyFilename != null) { + final KeyStore keystore = KeyStore.getInstance(service.getKeyStoreType()); try (final InputStream in = new FileInputStream(new File(service.getKeyStoreFile()))) { keystore.load(in, service.getKeyStorePassword().toCharArray()); } builder = builder.loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray()); } - + SSLContext sslContext = builder.build(); return sslContext; } @@ -391,7 +394,7 @@ public class PostHTTP extends AbstractProcessor { requestConfigBuilder.setRedirectsEnabled(false); requestConfigBuilder.setSocketTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); final RequestConfig requestConfig = requestConfigBuilder.build(); - + final StreamThrottler throttler = throttlerRef.get(); final ProcessorLog logger = getLogger(); @@ -415,8 +418,9 @@ public class PostHTTP extends AbstractProcessor { try { new java.net.URL(url); } catch (final MalformedURLException e) { - logger.error("After substituting attribute values for {}, URL is {}; this is not a valid URL, so routing to failure", - new Object[]{flowFile, url}); + logger. + error("After substituting attribute values for {}, URL is {}; this is not a valid URL, so routing to failure", + new Object[]{flowFile, url}); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); continue; @@ -434,36 +438,37 @@ public class PostHTTP extends AbstractProcessor { if (client == null || destinationAccepts == null) { final Config config = getConfig(url, context); final HttpClientConnectionManager conMan = config.getConnectionManager(); - + final HttpClientBuilder clientBuilder = HttpClientBuilder.create(); clientBuilder.setConnectionManager(conMan); clientBuilder.setUserAgent(userAgent); - clientBuilder.addInterceptorFirst(new HttpResponseInterceptor() { - @Override - public void process(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException { - HttpCoreContext coreContext = HttpCoreContext.adapt(httpContext); - ManagedHttpClientConnection conn = coreContext.getConnection(ManagedHttpClientConnection.class); - if ( !conn.isOpen() ) { - return; - } - - SSLSession sslSession = conn.getSSLSession(); - - if ( sslSession != null ) { - final X509Certificate[] certChain = sslSession.getPeerCertificateChain(); - if (certChain == null || certChain.length == 0) { - throw new SSLPeerUnverifiedException("No certificates found"); + clientBuilder. + addInterceptorFirst(new HttpResponseInterceptor() { + @Override + public void process(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException { + HttpCoreContext coreContext = HttpCoreContext.adapt(httpContext); + ManagedHttpClientConnection conn = coreContext.getConnection(ManagedHttpClientConnection.class); + if (!conn.isOpen()) { + return; + } + + SSLSession sslSession = conn.getSSLSession(); + + if (sslSession != null) { + final X509Certificate[] certChain = sslSession.getPeerCertificateChain(); + if (certChain == null || certChain.length == 0) { + throw new SSLPeerUnverifiedException("No certificates found"); + } + + final X509Certificate cert = certChain[0]; + dnHolder.set(cert.getSubjectDN().getName().trim()); + } } + }); - final X509Certificate cert = certChain[0]; - dnHolder.set(cert.getSubjectDN().getName().trim()); - } - } - }); - clientBuilder.disableAutomaticRetries(); clientBuilder.disableContentCompression(); - + final String username = context.getProperty(USERNAME).getValue(); final String password = context.getProperty(PASSWORD).getValue(); // set the credentials if appropriate @@ -473,7 +478,7 @@ public class PostHTTP extends AbstractProcessor { credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username)); } else { credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); - }; + } clientBuilder.setDefaultCredentialsProvider(credentialsProvider); } client = clientBuilder.build(); @@ -492,7 +497,8 @@ public class PostHTTP extends AbstractProcessor { } catch (IOException e) { flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); - logger.error("Unable to communicate with destination {} to determine whether or not it can accept flowfiles/gzip; routing {} to failure due to {}", new Object[]{url, flowFile, e}); + logger.error("Unable to communicate with destination {} to determine whether or not it can accept " + + "flowfiles/gzip; routing {} to failure due to {}", new Object[]{url, flowFile, e}); context.yield(); return; } @@ -580,7 +586,7 @@ public class PostHTTP extends AbstractProcessor { entity.setChunked(context.getProperty(CHUNKED_ENCODING).asBoolean()); post.setEntity(entity); post.setConfig(requestConfig); - + final String contentType; if (sendAsFlowFile) { if (accepts.isFlowFileV3Accepted()) { @@ -590,7 +596,8 @@ public class PostHTTP extends AbstractProcessor { } else if (accepts.isFlowFileV1Accepted()) { contentType = APPLICATION_FLOW_FILE_V1; } else { - logger.error("Cannot send data to {} because the destination does not accept FlowFiles and this processor is configured to deliver FlowFiles; rolling back session", new Object[]{url}); + logger.error("Cannot send data to {} because the destination does not accept FlowFiles and this processor is " + + "configured to deliver FlowFiles; rolling back session", new Object[]{url}); session.rollback(); context.yield(); return; @@ -646,11 +653,11 @@ public class PostHTTP extends AbstractProcessor { } return; } finally { - if ( response != null ) { + if (response != null) { try { response.close(); } catch (IOException e) { - getLogger().warn("Failed to close HTTP Response due to {}", new Object[] {e}); + getLogger().warn("Failed to close HTTP Response due to {}", new Object[]{e}); } } } @@ -676,7 +683,8 @@ public class PostHTTP extends AbstractProcessor { if (holdUri == null) { for (FlowFile flowFile : toSend) { flowFile = session.penalize(flowFile); - logger.error("Failed to Post {} to {}: sent content and received status code {}:{} but no Hold URI", new Object[]{flowFile, url, responseCode, responseReason}); + logger.error("Failed to Post {} to {}: sent content and received status code {}:{} but no Hold URI", + new Object[]{flowFile, url, responseCode, responseReason}); session.transfer(flowFile, REL_FAILURE); } return; @@ -687,7 +695,9 @@ public class PostHTTP extends AbstractProcessor { if (responseCode == HttpServletResponse.SC_SERVICE_UNAVAILABLE) { for (FlowFile flowFile : toSend) { flowFile = session.penalize(flowFile); - logger.error("Failed to Post {} to {}: response code was {}:{}; will yield processing, since the destination is temporarily unavailable", new Object[]{flowFile, url, responseCode, responseReason}); + logger.error("Failed to Post {} to {}: response code was {}:{}; will yield processing, " + + "since the destination is temporarily unavailable", + new Object[]{flowFile, url, responseCode, responseReason}); session.transfer(flowFile, REL_FAILURE); } context.yield(); @@ -697,14 +707,15 @@ public class PostHTTP extends AbstractProcessor { if (responseCode >= 300) { for (FlowFile flowFile : toSend) { flowFile = session.penalize(flowFile); - logger.error("Failed to Post {} to {}: response code was {}:{}", new Object[]{flowFile, url, responseCode, responseReason}); + logger.error("Failed to Post {} to {}: response code was {}:{}", + new Object[]{flowFile, url, responseCode, responseReason}); session.transfer(flowFile, REL_FAILURE); } return; } - logger.info("Successfully Posted {} to {} in {} at a rate of {}", new Object[]{ - flowFileDescription, url, FormatUtils.formatMinutesSeconds(uploadMillis, TimeUnit.MILLISECONDS), uploadDataRate}); + logger.info("Successfully Posted {} to {} in {} at a rate of {}", + new Object[]{flowFileDescription, url, FormatUtils.formatMinutesSeconds(uploadMillis, TimeUnit.MILLISECONDS), uploadDataRate}); for (final FlowFile flowFile : toSend) { session.getProvenanceReporter().send(flowFile, url, "Remote DN=" + dnHolder.get(), uploadMillis, true); @@ -759,8 +770,7 @@ public class PostHTTP extends AbstractProcessor { return; } - logger.info("Successfully Posted {} to {} in {} milliseconds at a rate of {}", - new Object[]{flowFileDescription, url, uploadMillis, uploadDataRate}); + logger.info("Successfully Posted {} to {} in {} milliseconds at a rate of {}", new Object[]{flowFileDescription, url, uploadMillis, uploadDataRate}); for (FlowFile flowFile : toSend) { session.getProvenanceReporter().send(flowFile, url); @@ -773,7 +783,8 @@ public class PostHTTP extends AbstractProcessor { if (!isScheduled()) { context.yield(); - logger.warn("Failed to delete Hold that destination placed on {}; Processor has been stopped so routing FlowFile(s) to failure", new Object[]{flowFileDescription}); + logger. + warn("Failed to delete Hold that destination placed on {}; Processor has been stopped so routing FlowFile(s) to failure", new Object[]{flowFileDescription}); for (FlowFile flowFile : toSend) { flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); @@ -783,7 +794,6 @@ public class PostHTTP extends AbstractProcessor { } } - private DestinationAccepts getDestinationAcceptance(final HttpClient client, final String uri, final ProcessorLog logger, final String transactionId) throws IOException { final HttpHead head = new HttpHead(uri); head.addHeader(TRANSACTION_ID_HEADER, transactionId); @@ -856,7 +866,8 @@ public class PostHTTP extends AbstractProcessor { return new DestinationAccepts(acceptsFlowFileV3, acceptsFlowFileV2, acceptsFlowFileV1, acceptsGzip, protocolVersion); } else { - logger.warn("Unable to communicate with destination; when attempting to perform an HTTP HEAD, got unexpected response code of " + statusCode + ": " + response.getStatusLine().getReasonPhrase()); + logger.warn("Unable to communicate with destination; when attempting to perform an HTTP HEAD, got unexpected response code of " + + statusCode + ": " + response.getStatusLine().getReasonPhrase()); return new DestinationAccepts(false, false, false, false, null); } } @@ -869,8 +880,7 @@ public class PostHTTP extends AbstractProcessor { private final boolean gzip; private final Integer protocolVersion; - public DestinationAccepts(final boolean flowFileV3, final boolean flowFileV2, final boolean flowFileV1, - final boolean gzip, final Integer protocolVersion) { + public DestinationAccepts(final boolean flowFileV3, final boolean flowFileV2, final boolean flowFileV1, final boolean gzip, final Integer protocolVersion) { this.flowFileV3 = flowFileV3; this.flowFileV2 = flowFileV2; this.flowFileV1 = flowFileV1;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java index 144dd5b..8cad06f 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java @@ -74,7 +74,7 @@ public class PutEmail extends AbstractProcessor { .name("SMTP Hostname") .description("The hostname of the SMTP host") .required(true) - .expressionLanguageSupported(true) + .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); public static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder() @@ -82,142 +82,151 @@ public class PutEmail extends AbstractProcessor { .description("The Port used for SMTP communications") .required(true) .defaultValue("25") - .expressionLanguageSupported(true) + .expressionLanguageSupported(true) .addValidator(StandardValidators.PORT_VALIDATOR) .build(); public static final PropertyDescriptor SMTP_USERNAME = new PropertyDescriptor.Builder() .name("SMTP Username") .description("Username for the SMTP account") - .expressionLanguageSupported(true) + .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .required(false) .build(); public static final PropertyDescriptor SMTP_PASSWORD = new PropertyDescriptor.Builder() .name("SMTP Password") .description("Password for the SMTP account") - .expressionLanguageSupported(true) + .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .required(false) .sensitive(true) .build(); public static final PropertyDescriptor SMTP_AUTH = new PropertyDescriptor.Builder() - .name("SMTP Auth") - .description("Flag indicating whether authentication should be used") - .required(true) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .defaultValue("true") - .build(); - public static final PropertyDescriptor SMTP_TLS = new PropertyDescriptor.Builder() - .name("SMTP TLS") - .description("Flag indicating whether TLS should be enabled") - .required(true) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .defaultValue("false") - .build(); - public static final PropertyDescriptor SMTP_SOCKET_FACTORY = new PropertyDescriptor.Builder() - .name("SMTP Socket Factory") - .description("Socket Factory to use for SMTP Connection") - .required(true) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .defaultValue("javax.net.ssl.SSLSocketFactory") - .build(); - public static final PropertyDescriptor HEADER_XMAILER = new PropertyDescriptor.Builder() - .name("SMTP X-Mailer Header") - .description("X-Mailer used in the header of the outgoing email") - .required(true) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .defaultValue("NiFi") - .build(); - public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder() - .name("Content Type") - .description("Mime Type used to interpret the contents of the email, such as text/plain or text/html") - .required(true) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .defaultValue("text/plain") - .build(); - public static final PropertyDescriptor FROM = new PropertyDescriptor.Builder() - .name("From") - .description("Specifies the Email address to use as the sender") + .name("SMTP Auth") + .description("Flag indicating whether authentication should be used") .required(true) .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor TO = new PropertyDescriptor.Builder() - .name("To") - .description("The recipients to include in the To-Line of the email") - .required(false) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor CC = new PropertyDescriptor.Builder() - .name("CC") - .description("The recipients to include in the CC-Line of the email") - .required(false) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor BCC = new PropertyDescriptor.Builder() - .name("BCC") - .description("The recipients to include in the BCC-Line of the email") - .required(false) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("true") .build(); - public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder() - .name("Subject") - .description("The email subject") + public static final PropertyDescriptor SMTP_TLS = new PropertyDescriptor.Builder() + .name("SMTP TLS") + .description("Flag indicating whether TLS should be enabled") .required(true) .expressionLanguageSupported(true) - .defaultValue("Message from NiFi") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("false") .build(); - public static final PropertyDescriptor MESSAGE = new PropertyDescriptor.Builder() - .name("Message") - .description("The body of the email message") + public static final PropertyDescriptor SMTP_SOCKET_FACTORY = new PropertyDescriptor.Builder() + .name("SMTP Socket Factory") + .description("Socket Factory to use for SMTP Connection") .required(true) .expressionLanguageSupported(true) - .defaultValue("") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("javax.net.ssl.SSLSocketFactory") .build(); - public static final PropertyDescriptor ATTACH_FILE = new PropertyDescriptor.Builder() - .name("Attach File") - .description("Specifies whether or not the FlowFile content should be attached to the email") - .required(true) - .allowableValues("true", "false") - .defaultValue("false") - .build(); - public static final PropertyDescriptor INCLUDE_ALL_ATTRIBUTES = new PropertyDescriptor.Builder() - .name("Include All Attributes In Message") - .description("Specifies whether or not all FlowFile attributes should be recorded in the body of the email message") - .required(true) - .allowableValues("true", "false") - .defaultValue("false") - .build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that are successfully sent will be routed to this relationship").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles that fail to send will be routed to this relationship").build(); + public static final PropertyDescriptor HEADER_XMAILER = new PropertyDescriptor.Builder(). + name("SMTP X-Mailer Header"). + description("X-Mailer used in the header of the outgoing email"). + required(true). + expressionLanguageSupported(true). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + defaultValue("NiFi"). + build(); + public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder(). + name("Content Type"). + description("Mime Type used to interpret the contents of the email, such as text/plain or text/html"). + required(true). + expressionLanguageSupported(true). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + defaultValue("text/plain"). + build(); + public static final PropertyDescriptor FROM = new PropertyDescriptor.Builder(). + name("From"). + description("Specifies the Email address to use as the sender"). + required(true). + expressionLanguageSupported(true). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + build(); + public static final PropertyDescriptor TO = new PropertyDescriptor.Builder() + .name("To"). + description("The recipients to include in the To-Line of the email"). + required(false). + expressionLanguageSupported(true). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + build(); + public static final PropertyDescriptor CC = new PropertyDescriptor.Builder() + .name("CC"). + description("The recipients to include in the CC-Line of the email"). + required(false). + expressionLanguageSupported(true). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + build(); + public static final PropertyDescriptor BCC = new PropertyDescriptor.Builder(). + name("BCC"). + description("The recipients to include in the BCC-Line of the email"). + required(false). + expressionLanguageSupported(true). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + build(); + public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder(). + name("Subject"). + description("The email subject"). + required(true). + expressionLanguageSupported(true). + defaultValue("Message from NiFi"). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + build(); + public static final PropertyDescriptor MESSAGE = new PropertyDescriptor.Builder(). + name("Message"). + description("The body of the email message"). + required(true). + expressionLanguageSupported(true). + defaultValue(""). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + build(); + public static final PropertyDescriptor ATTACH_FILE = new PropertyDescriptor.Builder(). + name("Attach File"). + description("Specifies whether or not the FlowFile content should be attached to the email"). + required(true). + allowableValues("true", "false"). + defaultValue("false"). + build(); + public static final PropertyDescriptor INCLUDE_ALL_ATTRIBUTES = new PropertyDescriptor.Builder(). + name("Include All Attributes In Message"). + description("Specifies whether or not all FlowFile attributes should be recorded in the body of the email message"). + required(true). + allowableValues("true", "false"). + defaultValue("false"). + build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder(). + name("success"). + description("FlowFiles that are successfully sent will be routed to this relationship"). + build(); + public static final Relationship REL_FAILURE = new Relationship.Builder(). + name("failure"). + description("FlowFiles that fail to send will be routed to this relationship"). + build(); private List<PropertyDescriptor> properties; private Set<Relationship> relationships; - + /** - * Mapping of the mail properties to the NiFi PropertyDescriptors that will be evaluated at runtime + * Mapping of the mail properties to the NiFi PropertyDescriptors that will + * be evaluated at runtime */ - private static Map<String, PropertyDescriptor> propertyToContext = new HashMap<String, PropertyDescriptor>(); + private static final Map<String, PropertyDescriptor> propertyToContext = new HashMap<>(); + static { - propertyToContext.put("mail.smtp.host", SMTP_HOSTNAME); - propertyToContext.put("mail.smtp.port", SMTP_PORT); - propertyToContext.put("mail.smtp.socketFactory.port", SMTP_PORT); - propertyToContext.put("mail.smtp.socketFactory.class", SMTP_SOCKET_FACTORY); - propertyToContext.put("mail.smtp.auth", SMTP_AUTH); - propertyToContext.put("mail.smtp.starttls.enable", SMTP_TLS); - propertyToContext.put("mail.smtp.user", SMTP_USERNAME); - propertyToContext.put("mail.smtp.password", SMTP_PASSWORD); + propertyToContext.put("mail.smtp.host", SMTP_HOSTNAME); + propertyToContext.put("mail.smtp.port", SMTP_PORT); + propertyToContext.put("mail.smtp.socketFactory.port", SMTP_PORT); + propertyToContext. + put("mail.smtp.socketFactory.class", SMTP_SOCKET_FACTORY); + propertyToContext.put("mail.smtp.auth", SMTP_AUTH); + propertyToContext.put("mail.smtp.starttls.enable", SMTP_TLS); + propertyToContext.put("mail.smtp.user", SMTP_USERNAME); + propertyToContext.put("mail.smtp.password", SMTP_PASSWORD); } @Override @@ -260,14 +269,21 @@ public class PutEmail extends AbstractProcessor { @Override protected Collection<ValidationResult> customValidate(final ValidationContext context) { - final List<ValidationResult> errors = new ArrayList<>(super.customValidate(context)); + final List<ValidationResult> errors = new ArrayList<>(super. + customValidate(context)); - final String to = context.getProperty(TO).getValue(); - final String cc = context.getProperty(CC).getValue(); - final String bcc = context.getProperty(BCC).getValue(); + final String to = context.getProperty(TO). + getValue(); + final String cc = context.getProperty(CC). + getValue(); + final String bcc = context.getProperty(BCC). + getValue(); if (to == null && cc == null && bcc == null) { - errors.add(new ValidationResult.Builder().subject("To, CC, BCC").valid(false).explanation("Must specify at least one To/CC/BCC address").build()); + errors.add(new ValidationResult.Builder().subject("To, CC, BCC"). + valid(false). + explanation("Must specify at least one To/CC/BCC address"). + build()); } return errors; @@ -280,142 +296,187 @@ public class PutEmail extends AbstractProcessor { return; } - final Properties properties = this.getMailPropertiesFromFlowFile(context, flowFile); - + final Properties properties = this. + getMailPropertiesFromFlowFile(context, flowFile); + final Session mailSession = this.createMailSession(properties); - + final Message message = new MimeMessage(mailSession); final ProcessorLog logger = getLogger(); try { - message.setFrom(InternetAddress.parse(context.getProperty(FROM).evaluateAttributeExpressions(flowFile).getValue())[0]); + message.setFrom(InternetAddress.parse(context.getProperty(FROM). + evaluateAttributeExpressions(flowFile). + getValue())[0]); - final InternetAddress[] toAddresses = toInetAddresses(context.getProperty(TO).evaluateAttributeExpressions(flowFile).getValue()); + final InternetAddress[] toAddresses = toInetAddresses(context. + getProperty(TO). + evaluateAttributeExpressions(flowFile). + getValue()); message.setRecipients(RecipientType.TO, toAddresses); - final InternetAddress[] ccAddresses = toInetAddresses(context.getProperty(CC).evaluateAttributeExpressions(flowFile).getValue()); + final InternetAddress[] ccAddresses = toInetAddresses(context. + getProperty(CC). + evaluateAttributeExpressions(flowFile). + getValue()); message.setRecipients(RecipientType.CC, ccAddresses); - final InternetAddress[] bccAddresses = toInetAddresses(context.getProperty(BCC).evaluateAttributeExpressions(flowFile).getValue()); + final InternetAddress[] bccAddresses = toInetAddresses(context. + getProperty(BCC). + evaluateAttributeExpressions(flowFile). + getValue()); message.setRecipients(RecipientType.BCC, bccAddresses); - message.setHeader("X-Mailer", context.getProperty(HEADER_XMAILER).evaluateAttributeExpressions(flowFile).getValue()); - message.setSubject(context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue()); - String messageText = context.getProperty(MESSAGE).evaluateAttributeExpressions(flowFile).getValue(); + message.setHeader("X-Mailer", context.getProperty(HEADER_XMAILER). + evaluateAttributeExpressions(flowFile). + getValue()); + message.setSubject(context.getProperty(SUBJECT). + evaluateAttributeExpressions(flowFile). + getValue()); + String messageText = context.getProperty(MESSAGE). + evaluateAttributeExpressions(flowFile). + getValue(); - if (context.getProperty(INCLUDE_ALL_ATTRIBUTES).asBoolean()) { + if (context.getProperty(INCLUDE_ALL_ATTRIBUTES). + asBoolean()) { messageText = formatAttributes(flowFile, messageText); } - - String contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(flowFile).getValue(); + + String contentType = context.getProperty(CONTENT_TYPE). + evaluateAttributeExpressions(flowFile). + getValue(); message.setContent(messageText, contentType); message.setSentDate(new Date()); - - if (context.getProperty(ATTACH_FILE).asBoolean()) { + + if (context.getProperty(ATTACH_FILE). + asBoolean()) { final MimeBodyPart mimeText = new PreencodedMimeBodyPart("base64"); - mimeText.setDataHandler(new DataHandler(new ByteArrayDataSource(Base64.encodeBase64(messageText.getBytes("UTF-8")), "text/plain; charset=\"utf-8\""))); + mimeText. + setDataHandler(new DataHandler(new ByteArrayDataSource(Base64. + encodeBase64(messageText. + getBytes("UTF-8")), "text/plain; charset=\"utf-8\""))); final MimeBodyPart mimeFile = new MimeBodyPart(); session.read(flowFile, new InputStreamCallback() { @Override public void process(final InputStream stream) throws IOException { try { - mimeFile.setDataHandler(new DataHandler(new ByteArrayDataSource(stream, "application/octet-stream"))); + mimeFile. + setDataHandler(new DataHandler(new ByteArrayDataSource(stream, "application/octet-stream"))); } catch (final Exception e) { throw new IOException(e); } } }); - mimeFile.setFileName(flowFile.getAttribute(CoreAttributes.FILENAME.key())); + mimeFile.setFileName(flowFile. + getAttribute(CoreAttributes.FILENAME.key())); MimeMultipart multipart = new MimeMultipart(); multipart.addBodyPart(mimeText); multipart.addBodyPart(mimeFile); message.setContent(multipart); } - + Transport.send(message); - session.getProvenanceReporter().send(flowFile, "mailto:" + message.getAllRecipients()[0].toString()); + session.getProvenanceReporter(). + send(flowFile, "mailto:" + message.getAllRecipients()[0]. + toString()); session.transfer(flowFile, REL_SUCCESS); - logger.info("Sent email as a result of receiving {}", new Object[]{flowFile}); + logger. + info("Sent email as a result of receiving {}", new Object[]{flowFile}); } catch (final ProcessException | MessagingException | IOException e) { context.yield(); - logger.error("Failed to send email for {}: {}; routing to failure", new Object[]{flowFile, e}); + logger. + error("Failed to send email for {}: {}; routing to failure", new Object[]{flowFile, e}); session.transfer(flowFile, REL_FAILURE); } } /** - * Based on the input properties, determine whether an authenticate or unauthenticated session - * should be used. If authenticated, creates a Password Authenticator for use in sending the email. - * - * @param properties - * @return + * Based on the input properties, determine whether an authenticate or + * unauthenticated session should be used. If authenticated, creates a + * Password Authenticator for use in sending the email. + * + * @param properties mail properties + * @return session */ - private Session createMailSession(final Properties properties) { - String authValue = properties.getProperty("mail.smtp.auth"); + private Session createMailSession(final Properties properties) { + String authValue = properties.getProperty("mail.smtp.auth"); Boolean auth = Boolean.valueOf(authValue); - + /* * Conditionally create a password authenticator if the 'auth' parameter is set. */ - final Session mailSession = auth ? Session.getInstance(properties, new Authenticator() { - @Override - public PasswordAuthentication getPasswordAuthentication() { - String username = properties.getProperty("mail.smtp.user"), - password = properties.getProperty("mail.smtp.password"); - return new PasswordAuthentication(username, password); - } - }) : Session.getInstance(properties); // without auth - return mailSession; - } + final Session mailSession = auth ? Session. + getInstance(properties, new Authenticator() { + @Override + public PasswordAuthentication getPasswordAuthentication() { + String username = properties. + getProperty("mail.smtp.user"), + password = properties.getProperty("mail.smtp.password"); + return new PasswordAuthentication(username, password); + } + }) : Session.getInstance(properties); // without auth + return mailSession; + } /** - * Uses the mapping of javax.mail properties to NiFi PropertyDescriptors to build - * the required Properties object to be used for sending this email - * - * @param context - * @param flowFile - * @return + * Uses the mapping of javax.mail properties to NiFi PropertyDescriptors to + * build the required Properties object to be used for sending this email + * + * @param context context + * @param flowFile flowFile + * @return mail properties */ private Properties getMailPropertiesFromFlowFile(final ProcessContext context, final FlowFile flowFile) { final Properties properties = new Properties(); - + final ProcessorLog logger = this.getLogger(); - - for(Entry<String, PropertyDescriptor> entry : propertyToContext.entrySet()) { - - // Evaluate the property descriptor against the flow file - String flowFileValue = context.getProperty(entry.getValue()).evaluateAttributeExpressions(flowFile).getValue(); - - String property = entry.getKey(); - - logger.debug("Evaluated Mail Property: {} with Value: {}", new Object[]{property, flowFileValue}); - - // Nullable values are not allowed, so filter out - if(null != flowFileValue) { - properties.setProperty(property, flowFileValue); - } - + + for (Entry<String, PropertyDescriptor> entry : propertyToContext. + entrySet()) { + + // Evaluate the property descriptor against the flow file + String flowFileValue = context.getProperty(entry.getValue()). + evaluateAttributeExpressions(flowFile). + getValue(); + + String property = entry.getKey(); + + logger. + debug("Evaluated Mail Property: {} with Value: {}", new Object[]{property, flowFileValue}); + + // Nullable values are not allowed, so filter out + if (null != flowFileValue) { + properties.setProperty(property, flowFileValue); + } + } - + return properties; - - } - public static final String BODY_SEPARATOR = "\n\n--------------------------------------------------\n"; + } + + public static final String BODY_SEPARATOR = "\n\n--------------------------------------------------\n"; private static String formatAttributes(final FlowFile flowFile, final String messagePrepend) { StringBuilder message = new StringBuilder(messagePrepend); message.append(BODY_SEPARATOR); message.append("\nStandard FlowFile Metadata:"); - message.append(String.format("\n\t%1$s = '%2$s'", "id", flowFile.getId())); - message.append(String.format("\n\t%1$s = '%2$s'", "entryDate", new Date(flowFile.getEntryDate()))); - message.append(String.format("\n\t%1$s = '%2$s'", "fileSize", flowFile.getSize())); + message.append(String. + format("\n\t%1$s = '%2$s'", "id", flowFile.getId())); + message.append(String. + format("\n\t%1$s = '%2$s'", "entryDate", new Date(flowFile. + getEntryDate()))); + message.append(String.format("\n\t%1$s = '%2$s'", "fileSize", flowFile. + getSize())); message.append("\nFlowFile Attributes:"); - for (Entry<String, String> attribute : flowFile.getAttributes().entrySet()) { - message.append(String.format("\n\t%1$s = '%2$s'", attribute.getKey(), attribute.getValue())); + for (Entry<String, String> attribute : flowFile.getAttributes(). + entrySet()) { + message.append(String. + format("\n\t%1$s = '%2$s'", attribute.getKey(), attribute. + getValue())); } message.append("\n"); return message.toString(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFTP.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFTP.java index 6e75661..6786bf0 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFTP.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFTP.java @@ -44,20 +44,23 @@ import org.apache.nifi.processors.standard.util.FTPTransfer; @Tags({"remote", "copy", "egress", "put", "ftp", "archive", "files"}) @CapabilityDescription("Sends FlowFiles to an FTP Server") @SeeAlso(GetFTP.class) -@DynamicProperties({@DynamicProperty(name="pre.cmd._____", value="Not used", description="The command specified in the key will be executed before doing a put. You may add these optional properties " + - " to send any commands to the FTP server before the file is actually transferred (before the put command)." + - " This option is only available for the PutFTP processor, as only FTP has this functionality. This is" + - " essentially the same as sending quote commands to an FTP server from the command line. While this is the same as sending a quote command, it is very important that" + - " you leave off the ."), - @DynamicProperty(name="post.cmd._____", value="Not used", description="The command specified in the key will be executed after doing a put. You may add these optional properties " + - " to send any commands to the FTP server before the file is actually transferred (before the put command)." + - " This option is only available for the PutFTP processor, as only FTP has this functionality. This is" + - " essentially the same as sending quote commands to an FTP server from the command line. While this is the same as sending a quote command, it is very important that" + - " you leave off the .")}) +@DynamicProperties({ + @DynamicProperty(name = "pre.cmd._____", value = "Not used", description = "The command specified in the key will be executed before doing a put. You may add these optional properties " + + " to send any commands to the FTP server before the file is actually transferred (before the put command)." + + " This option is only available for the PutFTP processor, as only FTP has this functionality. This is" + + " essentially the same as sending quote commands to an FTP server from the command line. While this is the same as sending a quote command, it is very important that" + + " you leave off the ."), + @DynamicProperty(name = "post.cmd._____", value = "Not used", description = "The command specified in the key will be executed after doing a put. You may add these optional properties " + + " to send any commands to the FTP server before the file is actually transferred (before the put command)." + + " This option is only available for the PutFTP processor, as only FTP has this functionality. This is" + + " essentially the same as sending quote commands to an FTP server from the command line. While this is the same as sending a quote command, it is very important that" + + " you leave off the .")}) public class PutFTP extends PutFileTransfer<FTPTransfer> { - private static final Pattern PRE_SEND_CMD_PATTERN = Pattern.compile("^pre\\.cmd\\.(\\d+)$"); - private static final Pattern POST_SEND_CMD_PATTERN = Pattern.compile("^post\\.cmd\\.(\\d+)$"); + private static final Pattern PRE_SEND_CMD_PATTERN = Pattern. + compile("^pre\\.cmd\\.(\\d+)$"); + private static final Pattern POST_SEND_CMD_PATTERN = Pattern. + compile("^post\\.cmd\\.(\\d+)$"); private final AtomicReference<List<PropertyDescriptor>> preSendDescriptorRef = new AtomicReference<>(); private final AtomicReference<List<PropertyDescriptor>> postSendDescriptorRef = new AtomicReference<>(); @@ -90,7 +93,7 @@ public class PutFTP extends PutFileTransfer<FTPTransfer> { properties.add(FTPTransfer.PROXY_PORT); properties.add(FTPTransfer.HTTP_PROXY_USERNAME); properties.add(FTPTransfer.HTTP_PROXY_PASSWORD); - + this.properties = Collections.unmodifiableList(properties); } @@ -101,12 +104,14 @@ public class PutFTP extends PutFileTransfer<FTPTransfer> { @Override protected void beforePut(final FlowFile flowFile, final ProcessContext context, final FTPTransfer transfer) throws IOException { - transfer.sendCommands(getCommands(preSendDescriptorRef.get(), context, flowFile), flowFile); + transfer. + sendCommands(getCommands(preSendDescriptorRef.get(), context, flowFile), flowFile); } @Override protected void afterPut(final FlowFile flowFile, final ProcessContext context, final FTPTransfer transfer) throws IOException { - transfer.sendCommands(getCommands(postSendDescriptorRef.get(), context, flowFile), flowFile); + transfer. + sendCommands(getCommands(postSendDescriptorRef.get(), context, flowFile), flowFile); } @Override @@ -117,10 +122,10 @@ public class PutFTP extends PutFileTransfer<FTPTransfer> { @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .dynamic(true) - .build(); + .name(propertyDescriptorName). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + dynamic(true). + build(); } @OnScheduled @@ -128,7 +133,8 @@ public class PutFTP extends PutFileTransfer<FTPTransfer> { final Map<Integer, PropertyDescriptor> preDescriptors = new TreeMap<>(); final Map<Integer, PropertyDescriptor> postDescriptors = new TreeMap<>(); - for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { + for (final PropertyDescriptor descriptor : context.getProperties(). + keySet()) { final String name = descriptor.getName(); final Matcher preMatcher = PRE_SEND_CMD_PATTERN.matcher(name); if (preMatcher.matches()) { @@ -143,8 +149,10 @@ public class PutFTP extends PutFileTransfer<FTPTransfer> { } } - final List<PropertyDescriptor> preDescriptorList = new ArrayList<>(preDescriptors.values()); - final List<PropertyDescriptor> postDescriptorList = new ArrayList<>(postDescriptors.values()); + final List<PropertyDescriptor> preDescriptorList = new ArrayList<>(preDescriptors. + values()); + final List<PropertyDescriptor> postDescriptorList = new ArrayList<>(postDescriptors. + values()); this.preSendDescriptorRef.set(preDescriptorList); this.postSendDescriptorRef.set(postDescriptorList); } @@ -152,7 +160,9 @@ public class PutFTP extends PutFileTransfer<FTPTransfer> { private List<String> getCommands(final List<PropertyDescriptor> descriptors, final ProcessContext context, final FlowFile flowFile) { final List<String> cmds = new ArrayList<>(); for (final PropertyDescriptor descriptor : descriptors) { - cmds.add(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue()); + cmds.add(context.getProperty(descriptor). + evaluateAttributeExpressions(flowFile). + getValue()); } return cmds; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java index ba78133..ce03491 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java @@ -64,65 +64,76 @@ public class PutFile extends AbstractProcessor { public static final String FILE_MODIFY_DATE_ATTRIBUTE = "file.lastModifiedTime"; public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ"; - public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() - .name("Directory") - .description("The directory to which files should be written. You may use expression language such as /aa/bb/${path}") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - public static final PropertyDescriptor MAX_DESTINATION_FILES = new PropertyDescriptor.Builder() - .name("Maximum File Count") - .description("Specifies the maximum number of files that can exist in the output directory") - .required(false) - .addValidator(StandardValidators.INTEGER_VALIDATOR) - .build(); - public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder() - .name("Conflict Resolution Strategy") - .description("Indicates what should happen when a file with the same name already exists in the output directory") - .required(true) - .defaultValue(FAIL_RESOLUTION) - .allowableValues(REPLACE_RESOLUTION, IGNORE_RESOLUTION, FAIL_RESOLUTION) - .build(); - public static final PropertyDescriptor CHANGE_LAST_MODIFIED_TIME = new PropertyDescriptor.Builder() - .name("Last Modified Time") - .description("Sets the lastModifiedTime on the output file to the value of this attribute. Format must be yyyy-MM-dd'T'HH:mm:ssZ. You may also use expression language such as ${file.lastModifiedTime}.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - public static final PropertyDescriptor CHANGE_PERMISSIONS = new PropertyDescriptor.Builder() - .name("Permissions") - .description("Sets the permissions on the output file to the value of this attribute. Format must be either UNIX rwxrwxrwx with a - in place of denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644). You may also use expression language such as ${file.permissions}.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - public static final PropertyDescriptor CHANGE_OWNER = new PropertyDescriptor.Builder() - .name("Owner") - .description("Sets the owner on the output file to the value of this attribute. You may also use expression language such as ${file.owner}.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - public static final PropertyDescriptor CHANGE_GROUP = new PropertyDescriptor.Builder() - .name("Group") - .description("Sets the group on the output file to the value of this attribute. You may also use expression language such as ${file.group}.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - public static final PropertyDescriptor CREATE_DIRS = new PropertyDescriptor.Builder() - .name("Create Missing Directories") - .description("If true, then missing destination directories will be created. If false, flowfiles are penalized and sent to failure.") - .required(true) - .allowableValues("true", "false") - .defaultValue("true") - .build(); + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder(). + name("Directory"). + description("The directory to which files should be written. You may use expression language such as /aa/bb/${path}"). + required(true). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + expressionLanguageSupported(true). + build(); + public static final PropertyDescriptor MAX_DESTINATION_FILES = new PropertyDescriptor.Builder(). + name("Maximum File Count"). + description("Specifies the maximum number of files that can exist in the output directory"). + required(false). + addValidator(StandardValidators.INTEGER_VALIDATOR). + build(); + public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder(). + name("Conflict Resolution Strategy"). + description("Indicates what should happen when a file with the same name already exists in the output directory"). + required(true). + defaultValue(FAIL_RESOLUTION). + allowableValues(REPLACE_RESOLUTION, IGNORE_RESOLUTION, FAIL_RESOLUTION). + build(); + public static final PropertyDescriptor CHANGE_LAST_MODIFIED_TIME = new PropertyDescriptor.Builder(). + name("Last Modified Time"). + description("Sets the lastModifiedTime on the output file to the value of this attribute. Format must be yyyy-MM-dd'T'HH:mm:ssZ. " + + "You may also use expression language such as ${file.lastModifiedTime}."). + required(false). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + expressionLanguageSupported(true). + build(); + public static final PropertyDescriptor CHANGE_PERMISSIONS = new PropertyDescriptor.Builder(). + name("Permissions"). + description("Sets the permissions on the output file to the value of this attribute. Format must be either UNIX rwxrwxrwx with a - in " + + "place of denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644). You may also use expression language such as " + + "${file.permissions}."). + required(false). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + expressionLanguageSupported(true). + build(); + public static final PropertyDescriptor CHANGE_OWNER = new PropertyDescriptor.Builder(). + name("Owner"). + description("Sets the owner on the output file to the value of this attribute. You may also use expression language such as " + + "${file.owner}."). + required(false). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + expressionLanguageSupported(true). + build(); + public static final PropertyDescriptor CHANGE_GROUP = new PropertyDescriptor.Builder(). + name("Group"). + description("Sets the group on the output file to the value of this attribute. You may also use expression language such " + + "as ${file.group}."). + required(false). + addValidator(StandardValidators.NON_EMPTY_VALIDATOR). + expressionLanguageSupported(true). + build(); + public static final PropertyDescriptor CREATE_DIRS = new PropertyDescriptor.Builder(). + name("Create Missing Directories"). + description("If true, then missing destination directories will be created. If false, flowfiles are penalized and sent to failure."). + required(true). + allowableValues("true", "false"). + defaultValue("true"). + build(); public static final int MAX_FILE_LOCK_ATTEMPTS = 10; - public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Files that have been successfully written to the output directory are transferred to this relationship").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Files that could not be written to the output directory for some reason are transferred to this relationship").build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder(). + name("success"). + description("Files that have been successfully written to the output directory are transferred to this relationship"). + build(); + public static final Relationship REL_FAILURE = new Relationship.Builder(). + name("failure"). + description("Files that could not be written to the output directory for some reason are transferred to this relationship"). + build(); private List<PropertyDescriptor> properties; private Set<Relationship> relationships; @@ -166,24 +177,35 @@ public class PutFile extends AbstractProcessor { } final StopWatch stopWatch = new StopWatch(true); - final Path configuredRootDirPath = Paths.get(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue()); - final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue(); - final Integer maxDestinationFiles = context.getProperty(MAX_DESTINATION_FILES).asInteger(); + final Path configuredRootDirPath = Paths.get(context. + getProperty(DIRECTORY). + evaluateAttributeExpressions(flowFile). + getValue()); + final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION). + getValue(); + final Integer maxDestinationFiles = context. + getProperty(MAX_DESTINATION_FILES). + asInteger(); final ProcessorLog logger = getLogger(); Path tempDotCopyFile = null; try { final Path rootDirPath = configuredRootDirPath; - final Path tempCopyFile = rootDirPath.resolve("." + flowFile.getAttribute(CoreAttributes.FILENAME.key())); - final Path copyFile = rootDirPath.resolve(flowFile.getAttribute(CoreAttributes.FILENAME.key())); + final Path tempCopyFile = rootDirPath.resolve("." + flowFile. + getAttribute(CoreAttributes.FILENAME.key())); + final Path copyFile = rootDirPath.resolve(flowFile. + getAttribute(CoreAttributes.FILENAME.key())); if (!Files.exists(rootDirPath)) { - if (context.getProperty(CREATE_DIRS).asBoolean()) { + if (context.getProperty(CREATE_DIRS). + asBoolean()) { Files.createDirectories(rootDirPath); } else { flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); - logger.error("Penalizing {} and routing to 'failure' because the output directory {} does not exist and Processor is configured not to create missing directories", new Object[]{flowFile, rootDirPath}); + logger. + error("Penalizing {} and routing to 'failure' because the output directory {} does not exist and Processor is " + + "configured not to create missing directories", new Object[]{flowFile, rootDirPath}); return; } } @@ -194,11 +216,14 @@ public class PutFile extends AbstractProcessor { final Path finalCopyFileDir = finalCopyFile.getParent(); if (Files.exists(finalCopyFileDir) && maxDestinationFiles != null) { // check if too many files already - final int numFiles = finalCopyFileDir.toFile().list().length; + final int numFiles = finalCopyFileDir.toFile(). + list().length; if (numFiles >= maxDestinationFiles) { flowFile = session.penalize(flowFile); - logger.info("Penalizing {} and routing to 'failure' because the output directory {} has {} files, which exceeds the configured maximum number of files", new Object[]{flowFile, finalCopyFileDir, numFiles}); + logger. + info("Penalizing {} and routing to 'failure' because the output directory {} has {} files, which exceeds the " + + "configured maximum number of files", new Object[]{flowFile, finalCopyFileDir, numFiles}); session.transfer(flowFile, REL_FAILURE); return; } @@ -208,15 +233,18 @@ public class PutFile extends AbstractProcessor { switch (conflictResponse) { case REPLACE_RESOLUTION: Files.delete(finalCopyFile); - logger.info("Deleted {} as configured in order to replace with the contents of {}", new Object[]{finalCopyFile, flowFile}); + logger. + info("Deleted {} as configured in order to replace with the contents of {}", new Object[]{finalCopyFile, flowFile}); break; case IGNORE_RESOLUTION: session.transfer(flowFile, REL_SUCCESS); - logger.info("Transferring {} to success because file with same name already exists", new Object[]{flowFile}); + logger. + info("Transferring {} to success because file with same name already exists", new Object[]{flowFile}); return; case FAIL_RESOLUTION: flowFile = session.penalize(flowFile); - logger.info("Penalizing {} and routing to failure as configured because file with the same name already exists", new Object[]{flowFile}); + logger. + info("Penalizing {} and routing to failure as configured because file with the same name already exists", new Object[]{flowFile}); session.transfer(flowFile, REL_FAILURE); return; default: @@ -226,53 +254,82 @@ public class PutFile extends AbstractProcessor { session.exportTo(flowFile, dotCopyFile, false); - final String lastModifiedTime = context.getProperty(CHANGE_LAST_MODIFIED_TIME).evaluateAttributeExpressions(flowFile).getValue(); - if (lastModifiedTime != null && !lastModifiedTime.trim().isEmpty()) { + final String lastModifiedTime = context. + getProperty(CHANGE_LAST_MODIFIED_TIME). + evaluateAttributeExpressions(flowFile). + getValue(); + if (lastModifiedTime != null && !lastModifiedTime.trim(). + isEmpty()) { try { final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US); - final Date fileModifyTime = formatter.parse(lastModifiedTime); - dotCopyFile.toFile().setLastModified(fileModifyTime.getTime()); + final Date fileModifyTime = formatter. + parse(lastModifiedTime); + dotCopyFile.toFile(). + setLastModified(fileModifyTime.getTime()); } catch (Exception e) { - logger.warn("Could not set file lastModifiedTime to {} because {}", new Object[]{lastModifiedTime, e}); + logger. + warn("Could not set file lastModifiedTime to {} because {}", new Object[]{lastModifiedTime, e}); } } - final String permissions = context.getProperty(CHANGE_PERMISSIONS).evaluateAttributeExpressions(flowFile).getValue(); - if (permissions != null && !permissions.trim().isEmpty()) { + final String permissions = context.getProperty(CHANGE_PERMISSIONS). + evaluateAttributeExpressions(flowFile). + getValue(); + if (permissions != null && !permissions.trim(). + isEmpty()) { try { String perms = stringPermissions(permissions); if (!perms.isEmpty()) { - Files.setPosixFilePermissions(dotCopyFile, PosixFilePermissions.fromString(perms)); + Files. + setPosixFilePermissions(dotCopyFile, PosixFilePermissions. + fromString(perms)); } } catch (Exception e) { - logger.warn("Could not set file permissions to {} because {}", new Object[]{permissions, e}); + logger. + warn("Could not set file permissions to {} because {}", new Object[]{permissions, e}); } } - final String owner = context.getProperty(CHANGE_OWNER).evaluateAttributeExpressions(flowFile).getValue(); - if (owner != null && !owner.trim().isEmpty()) { + final String owner = context.getProperty(CHANGE_OWNER). + evaluateAttributeExpressions(flowFile). + getValue(); + if (owner != null && !owner.trim(). + isEmpty()) { try { - UserPrincipalLookupService lookupService = dotCopyFile.getFileSystem().getUserPrincipalLookupService(); - Files.setOwner(dotCopyFile, lookupService.lookupPrincipalByName(owner)); + UserPrincipalLookupService lookupService = dotCopyFile. + getFileSystem(). + getUserPrincipalLookupService(); + Files.setOwner(dotCopyFile, lookupService. + lookupPrincipalByName(owner)); } catch (Exception e) { - logger.warn("Could not set file owner to {} because {}", new Object[]{owner, e}); + logger. + warn("Could not set file owner to {} because {}", new Object[]{owner, e}); } } - final String group = context.getProperty(CHANGE_GROUP).evaluateAttributeExpressions(flowFile).getValue(); - if (group != null && !group.trim().isEmpty()) { + final String group = context.getProperty(CHANGE_GROUP). + evaluateAttributeExpressions(flowFile). + getValue(); + if (group != null && !group.trim(). + isEmpty()) { try { - UserPrincipalLookupService lookupService = dotCopyFile.getFileSystem().getUserPrincipalLookupService(); - PosixFileAttributeView view = Files.getFileAttributeView(dotCopyFile, PosixFileAttributeView.class); - view.setGroup(lookupService.lookupPrincipalByGroupName(group)); + UserPrincipalLookupService lookupService = dotCopyFile. + getFileSystem(). + getUserPrincipalLookupService(); + PosixFileAttributeView view = Files. + getFileAttributeView(dotCopyFile, PosixFileAttributeView.class); + view.setGroup(lookupService. + lookupPrincipalByGroupName(group)); } catch (Exception e) { - logger.warn("Could not set file group to {} because {}", new Object[]{group, e}); + logger. + warn("Could not set file group to {} because {}", new Object[]{group, e}); } } boolean renamed = false; for (int i = 0; i < 10; i++) { // try rename up to 10 times. - if (dotCopyFile.toFile().renameTo(finalCopyFile.toFile())) { + if (dotCopyFile.toFile(). + renameTo(finalCopyFile.toFile())) { renamed = true; break;// rename was successful } @@ -280,27 +337,36 @@ public class PutFile extends AbstractProcessor { } if (!renamed) { - if (Files.exists(dotCopyFile) && dotCopyFile.toFile().delete()) { - logger.debug("Deleted dot copy file {}", new Object[]{dotCopyFile}); + if (Files.exists(dotCopyFile) && dotCopyFile.toFile(). + delete()) { + logger. + debug("Deleted dot copy file {}", new Object[]{dotCopyFile}); } throw new ProcessException("Could not rename: " + dotCopyFile); } else { - logger.info("Produced copy of {} at location {}", new Object[]{flowFile, finalCopyFile}); + logger. + info("Produced copy of {} at location {}", new Object[]{flowFile, finalCopyFile}); } - session.getProvenanceReporter().send(flowFile, finalCopyFile.toFile().toURI().toString(), stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.getProvenanceReporter(). + send(flowFile, finalCopyFile.toFile(). + toURI(). + toString(), stopWatch. + getElapsed(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS); } catch (final Throwable t) { if (tempDotCopyFile != null) { try { Files.deleteIfExists(tempDotCopyFile); } catch (final Exception e) { - logger.error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e}); + logger. + error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e}); } } flowFile = session.penalize(flowFile); - logger.error("Penalizing {} and transferring to failure due to {}", new Object[]{flowFile, t}); + logger. + error("Penalizing {} and transferring to failure due to {}", new Object[]{flowFile, t}); session.transfer(flowFile, REL_FAILURE); } } @@ -309,9 +375,11 @@ public class PutFile extends AbstractProcessor { String permissions = ""; final Pattern rwxPattern = Pattern.compile("^[rwx-]{9}$"); final Pattern numPattern = Pattern.compile("\\d+"); - if (rwxPattern.matcher(perms).matches()) { + if (rwxPattern.matcher(perms). + matches()) { permissions = perms; - } else if (numPattern.matcher(perms).matches()) { + } else if (numPattern.matcher(perms). + matches()) { try { int number = Integer.parseInt(perms, 8); StringBuilder permBuilder = new StringBuilder();