NIFI-10: Added FETCH Provenance Event and updated processors to use this new event type
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/51f56402 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/51f56402 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/51f56402 Branch: refs/heads/NIFI-655 Commit: 51f564024a2fbe7fbd08760635561f08619be0e4 Parents: aec32a2 Author: Mark Payne <marka...@hotmail.com> Authored: Thu Oct 15 17:00:20 2015 -0400 Committer: Mark Payne <marka...@hotmail.com> Committed: Mon Oct 26 14:58:50 2015 -0400 ---------------------------------------------------------------------- .../nifi/provenance/ProvenanceEventType.java | 26 ++- .../nifi/provenance/ProvenanceReporter.java | 37 ++++ .../nifi/util/MockProvenanceReporter.java | 35 ++- .../nifi/processors/aws/s3/FetchS3Object.java | 2 +- .../repository/StandardProvenanceReporter.java | 31 ++- .../nifi/processors/standard/InvokeHTTP.java | 211 +++++++++---------- .../processors/standard/TestInvokeHTTP.java | 54 ++--- 7 files changed, 258 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/51f56402/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java index e5e47b7..188e8fc 100644 --- a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java @@ -23,45 +23,66 @@ public enum ProvenanceEventType { * not received from a remote system or external process */ CREATE, + /** - * Indicates a provenance event for receiving data from an external process + * Indicates a provenance event for receiving data from an external process. This Event Type + * is expected to be the first event for a FlowFile. As such, a Processor that receives data + * from an external source and uses that data to replace the content of an existing FlowFile + * should use the {@link #FETCH} event type, rather than the RECEIVE event type. */ RECEIVE, + + /** + * Indicates that the contents of a FlowFile were overwritten using the contents of some + * external resource. This is similar to the {@link #RECEIVE} event but varies in that + * RECEIVE events are intended to be used as the event that introduces the FlowFile into + * the system, whereas FETCH is used to indicate that the contents of an existing FlowFile + * were overwritten. + */ + FETCH, + /** * Indicates a provenance event for sending data to an external process */ SEND, + /** * Indicates a provenance event for the conclusion of an object's life for * some reason other than object expiration */ DROP, + /** * Indicates a provenance event for the conclusion of an object's life due * to the fact that the object could not be processed in a timely manner */ EXPIRE, + /** * FORK is used to indicate that one or more FlowFile was derived from a * parent FlowFile. */ FORK, + /** * JOIN is used to indicate that a single FlowFile is derived from joining * together multiple parent FlowFiles. */ JOIN, + /** * CLONE is used to indicate that a FlowFile is an exact duplicate of its * parent FlowFile. */ CLONE, + /** * CONTENT_MODIFIED is used to indicate that a FlowFile's content was * modified in some way. When using this Event Type, it is advisable to * provide details about how the content is modified. */ CONTENT_MODIFIED, + /** * ATTRIBUTES_MODIFIED is used to indicate that a FlowFile's attributes were * modified in some way. This event is not needed when another event is @@ -69,17 +90,20 @@ public enum ProvenanceEventType { * FlowFile attributes. */ ATTRIBUTES_MODIFIED, + /** * ROUTE is used to show that a FlowFile was routed to a specified * {@link org.apache.nifi.processor.Relationship Relationship} and should provide * information about why the FlowFile was routed to this relationship. */ ROUTE, + /** * Indicates a provenance event for adding additional information such as a * new linkage to a new URI or UUID */ ADDINFO, + /** * Indicates a provenance event for replaying a FlowFile. The UUID of the * event will indicate the UUID of the original FlowFile that is being http://git-wip-us.apache.org/repos/asf/nifi/blob/51f56402/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java index db589f8..0fd29fd 100644 --- a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java @@ -124,6 +124,43 @@ public interface ProvenanceReporter { void receive(FlowFile flowFile, String transitUri, String sourceSystemFlowFileIdentifier, String details, long transmissionMillis); /** + * Emits a Provenance Event of type + * {@link ProvenanceEventType#FETCH FETCH} that indicates that the content of the given + * FlowFile was overwritten with the data received from an external source. + * + * @param flowFile the FlowFile whose content was replaced + * @param transitUri A URI that provides information about the System and + * Protocol information over which the transfer occurred. + */ + void fetch(FlowFile flowFile, String transitUri); + + /** + * Emits a Provenance Event of type + * {@link ProvenanceEventType#FETCH FETCH} that indicates that the content of the given + * FlowFile was overwritten with the data received from an external source. + * + * @param flowFile the FlowFile whose content was replaced + * @param transitUri A URI that provides information about the System and + * Protocol information over which the transfer occurred. + * @param transmissionMillis the number of milliseconds taken to transfer the data + */ + void fetch(FlowFile flowFile, String transitUri, long transmissionMillis); + + /** + * Emits a Provenance Event of type + * {@link ProvenanceEventType#FETCH FETCH} that indicates that the content of the given + * FlowFile was overwritten with the data received from an external source. + * + * @param flowFile the FlowFile whose content was replaced + * @param transitUri A URI that provides information about the System and + * Protocol information over which the transfer occurred. + * @param details details about the event + * @param transmissionMillis the number of milliseconds taken to transfer + * the data + */ + void fetch(FlowFile flowFile, String transitUri, String details, long transmissionMillis); + + /** * Emits a Provenance Event of type {@link ProvenanceEventType#SEND SEND} * that indicates that a copy of the given FlowFile was sent to an external * destination. The external destination may be a remote system or may be a http://git-wip-us.apache.org/repos/asf/nifi/blob/51f56402/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java index 8c9a320..8458715 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java @@ -124,7 +124,40 @@ public class MockProvenanceReporter implements ProvenanceReporter { try { final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.RECEIVE) - .setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build(); + .setTransitUri(transitUri) + .setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier) + .setEventDuration(transmissionMillis) + .setDetails(details) + .build(); + events.add(record); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + } + + @Override + public void fetch(final FlowFile flowFile, final String transitUri) { + fetch(flowFile, transitUri, -1L); + } + + @Override + public void fetch(final FlowFile flowFile, final String transitUri, final long transmissionMillis) { + fetch(flowFile, transitUri, null, transmissionMillis); + } + + @Override + public void fetch(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis) { + verifyFlowFileKnown(flowFile); + + try { + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.FETCH) + .setTransitUri(transitUri) + .setEventDuration(transmissionMillis) + .setDetails(details) + .build(); events.add(record); } catch (final Exception e) { logger.error("Failed to generate Provenance Event due to " + e); http://git-wip-us.apache.org/repos/asf/nifi/blob/51f56402/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index 131e671..bc6aeec 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -152,7 +152,7 @@ public class FetchS3Object extends AbstractS3Processor { session.transfer(flowFile, REL_SUCCESS); final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully retrieved S3 Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis}); - session.getProvenanceReporter().receive(flowFile, "http://" + bucket + ".amazonaws.com/" + key, transferMillis); + session.getProvenanceReporter().fetch(flowFile, "http://" + bucket + ".amazonaws.com/" + key, transferMillis); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/51f56402/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java index 8852f42..8a89dbf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java @@ -124,7 +124,36 @@ public class StandardProvenanceReporter implements ProvenanceReporter { try { final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.RECEIVE) - .setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build(); + .setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build(); + events.add(record); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + } + + @Override + public void fetch(final FlowFile flowFile, final String transitUri) { + fetch(flowFile, transitUri, -1L); + } + + @Override + public void fetch(final FlowFile flowFile, final String transitUri, final long transmissionMillis) { + fetch(flowFile, transitUri, null, transmissionMillis); + } + + @Override + public void fetch(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis) { + verifyFlowFileKnown(flowFile); + + try { + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.FETCH) + .setTransitUri(transitUri) + .setEventDuration(transmissionMillis) + .setDetails(details) + .build(); events.add(record); } catch (final Exception e) { logger.error("Failed to generate Provenance Event due to " + e); http://git-wip-us.apache.org/repos/asf/nifi/blob/51f56402/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java index c7be728..848652a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java @@ -89,7 +89,7 @@ import org.joda.time.format.DateTimeFormatter; @WritesAttribute(attribute = "invokehttp.tx.id", description = "The transaction ID that is returned after reading the response"), @WritesAttribute(attribute = "invokehttp.remote.dn", description = "The DN of the remote server")}) @DynamicProperty(name = "Trusted Hostname", value = "A hostname", description = "Bypass the normal truststore hostname verifier to allow the specified (single) remote hostname as trusted " - + "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.") + + "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.") public final class InvokeHTTP extends AbstractProcessor { @Override @@ -170,76 +170,75 @@ public final class InvokeHTTP extends AbstractProcessor { // This set includes our strings defined above as well as some standard flowfile // attributes. public static final Set<String> IGNORED_ATTRIBUTES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( - STATUS_CODE, STATUS_MESSAGE, RESPONSE_BODY, REQUEST_URL, TRANSACTION_ID, REMOTE_DN, - "uuid", "filename", "path" - ))); + STATUS_CODE, STATUS_MESSAGE, RESPONSE_BODY, REQUEST_URL, TRANSACTION_ID, REMOTE_DN, + "uuid", "filename", "path"))); // properties public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder() - .name("HTTP Method") - .description("HTTP request method (GET, POST, PUT, DELETE, HEAD, OPTIONS).") - .required(true) - .defaultValue("GET") - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("HTTP Method") + .description("HTTP request method (GET, POST, PUT, DELETE, HEAD, OPTIONS).") + .required(true) + .defaultValue("GET") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder() - .name("Remote URL") - .description("Remote URL which will be connected to, including scheme, host, port, path.") - .required(true) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.URL_VALIDATOR) - .build(); + .name("Remote URL") + .description("Remote URL which will be connected to, including scheme, host, port, path.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); public static final PropertyDescriptor PROP_CONNECT_TIMEOUT = new PropertyDescriptor.Builder() - .name("Connection Timeout") - .description("Max wait time for connection to remote service.") - .required(true) - .defaultValue("5 secs") - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); + .name("Connection Timeout") + .description("Max wait time for connection to remote service.") + .required(true) + .defaultValue("5 secs") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); public static final PropertyDescriptor PROP_READ_TIMEOUT = new PropertyDescriptor.Builder() - .name("Read Timeout") - .description("Max wait time for response from remote service.") - .required(true) - .defaultValue("15 secs") - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); + .name("Read Timeout") + .description("Max wait time for response from remote service.") + .required(true) + .defaultValue("15 secs") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); public static final PropertyDescriptor PROP_DATE_HEADER = new PropertyDescriptor.Builder() - .name("Include Date Header") - .description("Include an RFC-2616 Date header in the request.") - .required(true) - .defaultValue("True") - .allowableValues("True", "False") - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .build(); + .name("Include Date Header") + .description("Include an RFC-2616 Date header in the request.") + .required(true) + .defaultValue("True") + .allowableValues("True", "False") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); public static final PropertyDescriptor PROP_FOLLOW_REDIRECTS = new PropertyDescriptor.Builder() - .name("Follow Redirects") - .description("Follow HTTP redirects issued by remote server.") - .required(true) - .defaultValue("True") - .allowableValues("True", "False") - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .build(); + .name("Follow Redirects") + .description("Follow HTTP redirects issued by remote server.") + .required(true) + .defaultValue("True") + .allowableValues("True", "False") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); public static final PropertyDescriptor PROP_ATTRIBUTES_TO_SEND = new PropertyDescriptor.Builder() - .name("Attributes to Send") - .description("Regular expression that defines which attributes to send as HTTP headers in the request. " - + "If not defined, no attributes are sent as headers.") - .required(false) - .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) - .build(); + .name("Attributes to Send") + .description("Regular expression that defines which attributes to send as HTTP headers in the request. " + + "If not defined, no attributes are sent as headers.") + .required(false) + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() - .name("SSL Context Service") - .description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.") - .required(false) - .identifiesControllerService(SSLContextService.class) - .build(); + .name("SSL Context Service") + .description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); public static final PropertyDescriptor PROP_PROXY_HOST = new PropertyDescriptor.Builder() .name("Proxy Host") @@ -256,33 +255,33 @@ public final class InvokeHTTP extends AbstractProcessor { .build(); // Per RFC 7235, 2617, and 2616. - // basic-credentials = base64-user-pass - // base64-user-pass = userid ":" password - // userid = *<TEXT excluding ":"> - // password = *TEXT + // basic-credentials = base64-user-pass + // base64-user-pass = userid ":" password + // userid = *<TEXT excluding ":"> + // password = *TEXT // - // OCTET = <any 8-bit sequence of data> - // CTL = <any US-ASCII control character (octets 0 - 31) and DEL (127)> - // LWS = [CRLF] 1*( SP | HT ) - // TEXT = <any OCTET except CTLs but including LWS> + // OCTET = <any 8-bit sequence of data> + // CTL = <any US-ASCII control character (octets 0 - 31) and DEL (127)> + // LWS = [CRLF] 1*( SP | HT ) + // TEXT = <any OCTET except CTLs but including LWS> // // Per RFC 7230, username & password in URL are now disallowed in HTTP and HTTPS URIs. public static final PropertyDescriptor PROP_BASIC_AUTH_USERNAME = new PropertyDescriptor.Builder() - .name("Basic Authentication Username") - .displayName("Basic Authentication Username") - .description("The username to be used by the client to authenticate against the Remote URL. Cannot include control characters (0-31), ':', or DEL (127).") - .required(false) - .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$"))) - .build(); + .name("Basic Authentication Username") + .displayName("Basic Authentication Username") + .description("The username to be used by the client to authenticate against the Remote URL. Cannot include control characters (0-31), ':', or DEL (127).") + .required(false) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$"))) + .build(); public static final PropertyDescriptor PROP_BASIC_AUTH_PASSWORD = new PropertyDescriptor.Builder() - .name("Basic Authentication Password") - .displayName("Basic Authentication Password") - .description("The password to be used by the client to authenticate against the Remote URL.") - .required(false) - .sensitive(true) - .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$"))) - .build(); + .name("Basic Authentication Password") + .displayName("Basic Authentication Password") + .description("The password to be used by the client to authenticate against the Remote URL.") + .required(false) + .sensitive(true) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$"))) + .build(); public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( PROP_METHOD, @@ -296,48 +295,46 @@ public final class InvokeHTTP extends AbstractProcessor { PROP_BASIC_AUTH_USERNAME, PROP_BASIC_AUTH_PASSWORD, PROP_PROXY_HOST, - PROP_PROXY_PORT - )); + PROP_PROXY_PORT)); // property to allow the hostname verifier to be overridden // this is a "hidden" property - it's configured using a dynamic user property public static final PropertyDescriptor PROP_TRUSTED_HOSTNAME = new PropertyDescriptor.Builder() - .name("Trusted Hostname") - .description("Bypass the normal truststore hostname verifier to allow the specified (single) remote hostname as trusted " - + "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .dynamic(true) - .build(); + .name("Trusted Hostname") + .description("Bypass the normal truststore hostname verifier to allow the specified (single) remote hostname as trusted " + + "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamic(true) + .build(); // relationships public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder() - .name("Original") - .description("Original FlowFile will be routed upon success (2xx status codes).") - .build(); + .name("Original") + .description("Original FlowFile will be routed upon success (2xx status codes).") + .build(); public static final Relationship REL_SUCCESS_RESP = new Relationship.Builder() - .name("Response") - .description("Response FlowFile will be routed upon success (2xx status codes).") - .build(); + .name("Response") + .description("Response FlowFile will be routed upon success (2xx status codes).") + .build(); public static final Relationship REL_RETRY = new Relationship.Builder() - .name("Retry") - .description("FlowFile will be routed on any status code that can be retried (5xx status codes).") - .build(); + .name("Retry") + .description("FlowFile will be routed on any status code that can be retried (5xx status codes).") + .build(); public static final Relationship REL_NO_RETRY = new Relationship.Builder() - .name("No Retry") - .description("FlowFile will be routed on any status code that should NOT be retried (1xx, 3xx, 4xx status codes).") - .build(); + .name("No Retry") + .description("FlowFile will be routed on any status code that should NOT be retried (1xx, 3xx, 4xx status codes).") + .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("Failure") - .description("FlowFile will be routed on any type of connection failure, timeout or general exception.") - .build(); + .name("Failure") + .description("FlowFile will be routed on any type of connection failure, timeout or general exception.") + .build(); public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( - REL_SUCCESS_REQ, REL_SUCCESS_RESP, REL_RETRY, REL_NO_RETRY, REL_FAILURE - ))); + REL_SUCCESS_REQ, REL_SUCCESS_RESP, REL_RETRY, REL_NO_RETRY, REL_FAILURE))); } @@ -403,7 +400,7 @@ public final class InvokeHTTP extends AbstractProcessor { transfer(); } catch (final Exception e) { // log exception - logger.error("Routing to {} due to exception: {}", new Object[] { REL_FAILURE.getName(), e }, e); + logger.error("Routing to {} due to exception: {}", new Object[] {REL_FAILURE.getName(), e}, e); // penalize request = session.penalize(request); @@ -417,7 +414,7 @@ public final class InvokeHTTP extends AbstractProcessor { session.remove(response); } } catch (final Exception e1) { - logger.error("Could not cleanup response flowfile due to exception: {}", new Object[] { e1 }, e1); + logger.error("Could not cleanup response flowfile due to exception: {}", new Object[] {e1}, e1); } } } @@ -545,7 +542,7 @@ public final class InvokeHTTP extends AbstractProcessor { // emit provenance event final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - session.getProvenanceReporter().modifyContent(response, "Updated content with data received from " + conn.getURL().toExternalForm(), millis); + session.getProvenanceReporter().fetch(response, conn.getURL().toExternalForm(), millis); } } @@ -562,7 +559,7 @@ public final class InvokeHTTP extends AbstractProcessor { // log the status codes from the response logger.info("Request to {} returned status code {} for {}", - new Object[]{conn.getURL().toExternalForm(), statusCode, request}); + new Object[] {conn.getURL().toExternalForm(), statusCode, request}); // transfer to the correct relationship // 2xx -> SUCCESS @@ -660,12 +657,12 @@ public final class InvokeHTTP extends AbstractProcessor { private void logRequest() { logger.debug("\nRequest to remote service:\n\t{}\n{}", - new Object[]{conn.getURL().toExternalForm(), getLogString(conn.getRequestProperties())}); + new Object[] {conn.getURL().toExternalForm(), getLogString(conn.getRequestProperties())}); } private void logResponse() { logger.debug("\nResponse from remote service:\n\t{}\n{}", - new Object[]{conn.getURL().toExternalForm(), getLogString(conn.getHeaderFields())}); + new Object[] {conn.getURL().toExternalForm(), getLogString(conn.getHeaderFields())}); } private String getLogString(Map<String, List<String>> map) { @@ -753,7 +750,7 @@ public final class InvokeHTTP extends AbstractProcessor { return new BufferedInputStream(is); } catch (IOException e) { - logger.warn("Response stream threw an exception: {}", new Object[]{e}, e); + logger.warn("Response stream threw an exception: {}", new Object[] {e}, e); return null; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/51f56402/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java index 46cacca..a4fd3d7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java @@ -149,8 +149,8 @@ public class TestInvokeHTTP { runner.assertTransferCount(Config.REL_NO_RETRY, 0); runner.assertTransferCount(Config.REL_FAILURE, 0); - //expected in request status.code and status.message - //original flow file (+attributes)?????????? + // expected in request status.code and status.message + // original flow file (+attributes)?????????? final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_REQ).get(0); bundle.assertAttributeEquals(Config.STATUS_CODE, "200"); bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "OK"); @@ -159,10 +159,10 @@ public class TestInvokeHTTP { Assert.assertEquals(expected, actual); bundle.assertAttributeEquals("Foo", "Bar"); - //expected in response - //status code, status message, all headers from server response --> ff attributes - //server response message body into payload of ff - //should not contain any original ff attributes + // expected in response + // status code, status message, all headers from server response --> ff attributes + // server response message body into payload of ff + // should not contain any original ff attributes final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_RESP).get(0); bundle1.assertContentEquals("/status/200".getBytes("UTF-8")); bundle1.assertAttributeEquals(Config.STATUS_CODE, "200"); @@ -198,8 +198,8 @@ public class TestInvokeHTTP { runner.assertTransferCount(Config.REL_NO_RETRY, 0); runner.assertTransferCount(Config.REL_FAILURE, 0); - //expected in request status.code and status.message - //original flow file (+attributes)?????????? + // expected in request status.code and status.message + // original flow file (+attributes)?????????? final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_REQ).get(0); bundle.assertAttributeEquals(Config.STATUS_CODE, "200"); bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "OK"); @@ -208,10 +208,10 @@ public class TestInvokeHTTP { final String expected = "Hello"; Assert.assertEquals(expected, actual); - //expected in response - //status code, status message, all headers from server response --> ff attributes - //server response message body into payload of ff - //should not contain any original ff attributes + // expected in response + // status code, status message, all headers from server response --> ff attributes + // server response message body into payload of ff + // should not contain any original ff attributes final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_RESP).get(0); final String bundle1Content = new String(bundle1.toByteArray(), StandardCharsets.UTF_8); assertTrue(bundle1Content.startsWith(expAuth)); // use startsWith instead of equals so we can ignore line endings @@ -223,17 +223,17 @@ public class TestInvokeHTTP { final List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents(); assertEquals(2, provEvents.size()); boolean forkEvent = false; - boolean contentModEvent = false; + boolean fetchEvent = false; for (final ProvenanceEventRecord event : provEvents) { if (event.getEventType() == ProvenanceEventType.FORK) { forkEvent = true; - } else if (event.getEventType() == ProvenanceEventType.CONTENT_MODIFIED) { - contentModEvent = true; + } else if (event.getEventType() == ProvenanceEventType.FETCH) { + fetchEvent = true; } } assertTrue(forkEvent); - assertTrue(contentModEvent); + assertTrue(fetchEvent); } @Test @@ -257,8 +257,8 @@ public class TestInvokeHTTP { runner.assertTransferCount(Config.REL_NO_RETRY, 1); runner.assertTransferCount(Config.REL_FAILURE, 0); - //expected in request status.code and status.message - //original flow file (+attributes)?????????? + // expected in request status.code and status.message + // original flow file (+attributes)?????????? final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0); bundle.assertAttributeEquals(Config.STATUS_CODE, "401"); bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "Unauthorized"); @@ -286,7 +286,7 @@ public class TestInvokeHTTP { runner.assertTransferCount(Config.REL_NO_RETRY, 0); runner.assertTransferCount(Config.REL_FAILURE, 0); - //expected in response + // expected in response final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_RETRY).get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); bundle.assertAttributeEquals(Config.STATUS_CODE, "500"); @@ -313,8 +313,8 @@ public class TestInvokeHTTP { runner.assertTransferCount(Config.REL_RETRY, 0); runner.assertTransferCount(Config.REL_NO_RETRY, 1); runner.assertTransferCount(Config.REL_FAILURE, 0); - //getMyFlowFiles(); - //expected in response + // getMyFlowFiles(); + // expected in response final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); @@ -340,8 +340,8 @@ public class TestInvokeHTTP { runner.assertTransferCount(Config.REL_RETRY, 0); runner.assertTransferCount(Config.REL_NO_RETRY, 1); runner.assertTransferCount(Config.REL_FAILURE, 0); - //getMyFlowFiles(); - //expected in response + // getMyFlowFiles(); + // expected in response final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); @@ -367,8 +367,8 @@ public class TestInvokeHTTP { runner.assertTransferCount(Config.REL_RETRY, 0); runner.assertTransferCount(Config.REL_NO_RETRY, 1); runner.assertTransferCount(Config.REL_FAILURE, 0); - //getMyFlowFiles(); - //expected in response + // getMyFlowFiles(); + // expected in response final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); @@ -397,7 +397,7 @@ public class TestInvokeHTTP { runner.assertTransferCount(Config.REL_NO_RETRY, 1); runner.assertTransferCount(Config.REL_FAILURE, 0); - //expected in response + // expected in response final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); @@ -593,7 +593,7 @@ public class TestInvokeHTTP { @Override public void handle(String target, Request baseRequest, - HttpServletRequest request, HttpServletResponse response) + HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { baseRequest.setHandled(true);