http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java index 1220b7c..8e80e91 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java @@ -66,17 +66,26 @@ import org.xml.sax.XMLReader; @CapabilityDescription("Splits an XML File into multiple separate FlowFiles, each comprising a child or descendant of the original root element") public class SplitXml extends AbstractProcessor { - public static final PropertyDescriptor SPLIT_DEPTH = new PropertyDescriptor.Builder() - .name("Split Depth") - .description("Indicates the XML-nesting depth to start splitting XML fragments. A depth of 1 means split the root's children, whereas a depth of 2 means split the root's children's children and so forth.") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("1") - .build(); - - public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to this relationship").build(); - public static final Relationship REL_SPLIT = new Relationship.Builder().name("split").description("All segments of the original FlowFile will be routed to this relationship").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid XML), it will be routed to this relationship").build(); + public static final PropertyDescriptor SPLIT_DEPTH = new PropertyDescriptor.Builder(). + name("Split Depth"). + description("Indicates the XML-nesting depth to start splitting XML fragments. A depth of 1 means split the root's children, whereas a depth of 2 means split the root's children's children and so forth."). + required(true). + addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR). + defaultValue("1"). + build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder(). + name("original"). + description("The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to this relationship"). + build(); + public static final Relationship REL_SPLIT = new Relationship.Builder(). + name("split"). + description("All segments of the original FlowFile will be routed to this relationship"). + build(); + public static final Relationship REL_FAILURE = new Relationship.Builder(). + name("failure"). + description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid XML), it will be routed to this relationship"). + build(); private List<PropertyDescriptor> properties; private Set<Relationship> relationships; @@ -84,7 +93,8 @@ public class SplitXml extends AbstractProcessor { private static final String FEATURE_PREFIX = "http://xml.org/sax/features/"; public static final String ENABLE_NAMESPACES_FEATURE = FEATURE_PREFIX + "namespaces"; public static final String ENABLE_NAMESPACE_PREFIXES_FEATURE = FEATURE_PREFIX + "namespace-prefixes"; - private static final SAXParserFactory saxParserFactory = SAXParserFactory.newInstance(); + private static final SAXParserFactory saxParserFactory = SAXParserFactory. + newInstance(); static { saxParserFactory.setNamespaceAware(true); @@ -93,7 +103,8 @@ public class SplitXml extends AbstractProcessor { saxParserFactory.setFeature(ENABLE_NAMESPACE_PREFIXES_FEATURE, true); } catch (Exception e) { final Logger staticLogger = LoggerFactory.getLogger(SplitXml.class); - staticLogger.warn("Unable to configure SAX Parser to make namespaces available", e); + staticLogger. + warn("Unable to configure SAX Parser to make namespaces available", e); } } @@ -127,7 +138,8 @@ public class SplitXml extends AbstractProcessor { return; } - final int depth = context.getProperty(SPLIT_DEPTH).asInteger(); + final int depth = context.getProperty(SPLIT_DEPTH). + asInteger(); final ProcessorLog logger = getLogger(); final List<FlowFile> splits = new ArrayList<>(); @@ -157,7 +169,8 @@ public class SplitXml extends AbstractProcessor { reader.setContentHandler(parser); reader.parse(new InputSource(in)); } catch (final ParserConfigurationException | SAXException e) { - logger.error("Unable to parse {} due to {}", new Object[]{original, e}); + logger. + error("Unable to parse {} due to {}", new Object[]{original, e}); failed.set(true); } } @@ -170,7 +183,9 @@ public class SplitXml extends AbstractProcessor { } else { session.transfer(splits, REL_SPLIT); session.transfer(original, REL_ORIGINAL); - logger.info("Split {} into {} FlowFiles", new Object[]{original, splits.size()}); + logger. + info("Split {} into {} FlowFiles", new Object[]{original, splits. + size()}); } } @@ -232,7 +247,9 @@ public class SplitXml extends AbstractProcessor { // if we're at a level where we care about capturing text, then add the closing element if (newDepth >= splitDepth) { // Add the element end tag. - sb.append("</").append(qName).append(">"); + sb.append("</"). + append(qName). + append(">"); } // If we have now returned to level 1, we have finished processing @@ -284,8 +301,14 @@ public class SplitXml extends AbstractProcessor { int attCount = atts.getLength(); for (int i = 0; i < attCount; i++) { String attName = atts.getQName(i); - String attValue = StringEscapeUtils.escapeXml10(atts.getValue(i)); - sb.append(" ").append(attName).append("=").append("\"").append(attValue).append("\""); + String attValue = StringEscapeUtils.escapeXml10(atts. + getValue(i)); + sb.append(" "). + append(attName). + append("="). + append("\""). + append(attValue). + append("\""); } sb.append(">");
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java index ff37902..3451516 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java @@ -65,19 +65,25 @@ import org.apache.nifi.util.Tuple; @CapabilityDescription("Applies the provided XSLT file to the flowfile XML payload. A new FlowFile is created " + "with transformed content and is routed to the 'success' relationship. If the XSL transform " + "fails, the original FlowFile is routed to the 'failure' relationship") -@DynamicProperty(name="An XSLT transform parameter name", value="An XSLT transform parameter value", supportsExpressionLanguage=true, -description="These XSLT parameters are passed to the transformer") +@DynamicProperty(name = "An XSLT transform parameter name", value = "An XSLT transform parameter value", supportsExpressionLanguage = true, + description = "These XSLT parameters are passed to the transformer") public class TransformXml extends AbstractProcessor { - public static final PropertyDescriptor XSLT_FILE_NAME = new PropertyDescriptor.Builder() - .name("XSLT file name") - .description("Provides the name (including full path) of the XSLT file to apply to the flowfile XML content.") - .required(true) - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) - .build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("The FlowFile with transformed content will be routed to this relationship").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid XML), it will be routed to this relationship").build(); + public static final PropertyDescriptor XSLT_FILE_NAME = new PropertyDescriptor.Builder(). + name("XSLT file name"). + description("Provides the name (including full path) of the XSLT file to apply to the flowfile XML content."). + required(true). + addValidator(StandardValidators.FILE_EXISTS_VALIDATOR). + build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder(). + name("success"). + description("The FlowFile with transformed content will be routed to this relationship"). + build(); + public static final Relationship REL_FAILURE = new Relationship.Builder(). + name("failure"). + description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid XML), it will be routed to this relationship"). + build(); private List<PropertyDescriptor> properties; private Set<Relationship> relationships; @@ -107,12 +113,13 @@ public class TransformXml extends AbstractProcessor { @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) - .required(false) - .dynamic(true) - .build(); + .name(propertyDescriptorName). + expressionLanguageSupported(true). + addValidator(StandardValidators. + createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)). + required(false). + dynamic(true). + build(); } @Override @@ -126,38 +133,52 @@ public class TransformXml extends AbstractProcessor { final StopWatch stopWatch = new StopWatch(true); try { - FlowFile transformed = session.write(original, new StreamCallback() { - @Override - public void process(final InputStream rawIn, final OutputStream out) throws IOException { - try (final InputStream in = new BufferedInputStream(rawIn)) { - - File stylesheet = new File(context.getProperty(XSLT_FILE_NAME).getValue()); - StreamSource styleSource = new StreamSource(stylesheet); - TransformerFactory tfactory = new net.sf.saxon.TransformerFactoryImpl(); - Transformer transformer = tfactory.newTransformer(styleSource); - - // pass all dynamic properties to the transformer - for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { - if (entry.getKey().isDynamic()) { - String value = context.newPropertyValue(entry.getValue()).evaluateAttributeExpressions(original).getValue(); - transformer.setParameter(entry.getKey().getName(), value); + FlowFile transformed = session. + write(original, new StreamCallback() { + @Override + public void process(final InputStream rawIn, final OutputStream out) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn)) { + + File stylesheet = new File(context. + getProperty(XSLT_FILE_NAME). + getValue()); + StreamSource styleSource = new StreamSource(stylesheet); + TransformerFactory tfactory = new net.sf.saxon.TransformerFactoryImpl(); + Transformer transformer = tfactory. + newTransformer(styleSource); + + // pass all dynamic properties to the transformer + for (final Map.Entry<PropertyDescriptor, String> entry : context. + getProperties(). + entrySet()) { + if (entry.getKey(). + isDynamic()) { + String value = context. + newPropertyValue(entry.getValue()). + evaluateAttributeExpressions(original). + getValue(); + transformer.setParameter(entry.getKey(). + getName(), value); + } + } + + // use a StreamSource with Saxon + StreamSource source = new StreamSource(in); + StreamResult result = new StreamResult(out); + transformer.transform(source, result); + } catch (final Exception e) { + throw new IOException(e); } } - - // use a StreamSource with Saxon - StreamSource source = new StreamSource(in); - StreamResult result = new StreamResult(out); - transformer.transform(source, result); - } catch (final Exception e) { - throw new IOException(e); - } - } - }); + }); session.transfer(transformed, REL_SUCCESS); - session.getProvenanceReporter().modifyContent(transformed, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.getProvenanceReporter(). + modifyContent(transformed, stopWatch. + getElapsed(TimeUnit.MILLISECONDS)); logger.info("Transformed {}", new Object[]{original}); } catch (ProcessException e) { - logger.error("Unable to transform {} due to {}", new Object[]{original, e}); + logger. + error("Unable to transform {} due to {}", new Object[]{original, e}); session.transfer(original, REL_FAILURE); } } @@ -170,7 +191,8 @@ public class TransformXml extends AbstractProcessor { @Override public ValidationResult validate(final String subject, final String input, final ValidationContext validationContext) { final Tuple<String, ValidationResult> lastResult = this.cachedResult; - if (lastResult != null && lastResult.getKey().equals(input)) { + if (lastResult != null && lastResult.getKey(). + equals(input)) { return lastResult.getValue(); } else { String error = null; @@ -186,10 +208,11 @@ public class TransformXml extends AbstractProcessor { this.cachedResult = new Tuple<>(input, new ValidationResult.Builder() - .input(input) - .subject(subject) - .valid(error == null) - .explanation(error).build()); + .input(input). + subject(subject). + valid(error == null). + explanation(error). + build()); return this.cachedResult.getValue(); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java index 34b0693..451ba57 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java @@ -71,11 +71,11 @@ import org.apache.nifi.util.ObjectHolder; @CapabilityDescription("Unpacks the content of FlowFiles that have been packaged with one of several different Packaging Formats, emitting one to many FlowFiles for each input FlowFile") @ReadsAttribute(attribute = "mime.type", description = "If the <Packaging Format> property is set to use mime.type attribute, this attribute is used to determine the FlowFile's MIME Type. In this case, if the attribute is set to application/tar, the TAR Packaging Format will be used. If the attribute is set to application/zip, the ZIP Packaging Format will be used. If the attribute is set to application/flowfile-v3 or application/flowfile-v2 or application/flowfile-v1, the appropriate FlowFile Packaging Format will be used. If this attribute is missing, the FlowFile will be routed to 'failure'. Otherwise, if the attribute's value is not one of those mentioned above, the FlowFile will be routed to 'success' without being unpacked") @WritesAttributes({ - @WritesAttribute(attribute = "mime.type", description = "If the FlowFile is successfully unpacked, its MIME Type is no longer known, so the mime.type attribute is set to application/octet-stream."), - @WritesAttribute(attribute = "fragment.identifier", description = "All unpacked FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"), - @WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the unpacked FlowFiles that were created from a single parent FlowFile"), - @WritesAttribute(attribute = "fragment.count", description = "The number of unpacked FlowFiles generated from the parent FlowFile"), - @WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile. Extensions of .tar, .zip or .pkg are removed because the MergeContent processor automatically adds those extensions if it is used to rebuild the original FlowFile") }) + @WritesAttribute(attribute = "mime.type", description = "If the FlowFile is successfully unpacked, its MIME Type is no longer known, so the mime.type attribute is set to application/octet-stream."), + @WritesAttribute(attribute = "fragment.identifier", description = "All unpacked FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"), + @WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the unpacked FlowFiles that were created from a single parent FlowFile"), + @WritesAttribute(attribute = "fragment.count", description = "The number of unpacked FlowFiles generated from the parent FlowFile"), + @WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile. Extensions of .tar, .zip or .pkg are removed because the MergeContent processor automatically adds those extensions if it is used to rebuild the original FlowFile")}) @SeeAlso(MergeContent.class) public class UnpackContent extends AbstractProcessor { @@ -94,17 +94,26 @@ public class UnpackContent extends AbstractProcessor { public static final String OCTET_STREAM = "application/octet-stream"; - public static final PropertyDescriptor PACKAGING_FORMAT = new PropertyDescriptor.Builder() - .name("Packaging Format") - .description("The Packaging Format used to create the file") - .required(true) - .allowableValues(AUTO_DETECT_FORMAT, TAR_FORMAT, ZIP_FORMAT, FLOWFILE_STREAM_FORMAT_V3, FLOWFILE_STREAM_FORMAT_V2, FLOWFILE_TAR_FORMAT) - .defaultValue(AUTO_DETECT_FORMAT) - .build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Unpacked FlowFiles are sent to this relationship").build(); - public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile is sent to this relationship after it has been successfully unpacked").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("The original FlowFile is sent to this relationship when it cannot be unpacked for some reason").build(); + public static final PropertyDescriptor PACKAGING_FORMAT = new PropertyDescriptor.Builder(). + name("Packaging Format"). + description("The Packaging Format used to create the file"). + required(true). + allowableValues(AUTO_DETECT_FORMAT, TAR_FORMAT, ZIP_FORMAT, FLOWFILE_STREAM_FORMAT_V3, FLOWFILE_STREAM_FORMAT_V2, FLOWFILE_TAR_FORMAT). + defaultValue(AUTO_DETECT_FORMAT). + build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder(). + name("success"). + description("Unpacked FlowFiles are sent to this relationship"). + build(); + public static final Relationship REL_ORIGINAL = new Relationship.Builder(). + name("original"). + description("The original FlowFile is sent to this relationship after it has been successfully unpacked"). + build(); + public static final Relationship REL_FAILURE = new Relationship.Builder(). + name("failure"). + description("The original FlowFile is sent to this relationship when it cannot be unpacked for some reason"). + build(); private Set<Relationship> relationships; private List<PropertyDescriptor> properties; @@ -140,11 +149,15 @@ public class UnpackContent extends AbstractProcessor { } final ProcessorLog logger = getLogger(); - String packagingFormat = context.getProperty(PACKAGING_FORMAT).getValue().toLowerCase(); + String packagingFormat = context.getProperty(PACKAGING_FORMAT). + getValue(). + toLowerCase(); if (AUTO_DETECT_FORMAT.equals(packagingFormat)) { - final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()); + final String mimeType = flowFile. + getAttribute(CoreAttributes.MIME_TYPE.key()); if (mimeType == null) { - logger.error("No mime.type attribute set for {}; routing to failure", new Object[]{flowFile}); + logger. + error("No mime.type attribute set for {}; routing to failure", new Object[]{flowFile}); session.transfer(flowFile, REL_FAILURE); return; } @@ -166,7 +179,8 @@ public class UnpackContent extends AbstractProcessor { packagingFormat = FLOWFILE_TAR_FORMAT; break; default: { - logger.info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", new Object[]{flowFile, mimeType}); + logger. + info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", new Object[]{flowFile, mimeType}); session.transfer(flowFile, REL_SUCCESS); return; } @@ -197,14 +211,17 @@ public class UnpackContent extends AbstractProcessor { addFragmentAttrs = false; break; default: - throw new AssertionError("Packaging Format was " + context.getProperty(PACKAGING_FORMAT).getValue()); + throw new AssertionError("Packaging Format was " + context. + getProperty(PACKAGING_FORMAT). + getValue()); } final List<FlowFile> unpacked = new ArrayList<>(); try { unpacker.unpack(session, flowFile, unpacked); if (unpacked.isEmpty()) { - logger.error("Unable to unpack {} because it does not appear to have any entries; routing to failure", new Object[]{flowFile}); + logger. + error("Unable to unpack {} because it does not appear to have any entries; routing to failure", new Object[]{flowFile}); session.transfer(flowFile, REL_FAILURE); return; } @@ -214,10 +231,13 @@ public class UnpackContent extends AbstractProcessor { } session.transfer(unpacked, REL_SUCCESS); session.transfer(flowFile, REL_ORIGINAL); - session.getProvenanceReporter().fork(flowFile, unpacked); - logger.info("Unpacked {} into {} and transferred to success", new Object[]{flowFile, unpacked}); + session.getProvenanceReporter(). + fork(flowFile, unpacked); + logger. + info("Unpacked {} into {} and transferred to success", new Object[]{flowFile, unpacked}); } catch (final ProcessException e) { - logger.error("Unable to unpack {} due to {}; routing to failure", new Object[]{flowFile, e}); + logger. + error("Unable to unpack {} due to {}; routing to failure", new Object[]{flowFile, e}); session.transfer(flowFile, REL_FAILURE); session.remove(unpacked); } @@ -232,7 +252,8 @@ public class UnpackContent extends AbstractProcessor { @Override public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) { - final String fragmentId = UUID.randomUUID().toString(); + final String fragmentId = UUID.randomUUID(). + toString(); session.read(source, new InputStreamCallback() { @Override public void process(final InputStream in) throws IOException { @@ -247,28 +268,38 @@ public class UnpackContent extends AbstractProcessor { final Path filePath = file.toPath(); final String filePathString = filePath.getParent() + "/"; final Path absPath = filePath.toAbsolutePath(); - final String absPathString = absPath.getParent().toString() + "/"; + final String absPathString = absPath.getParent(). + toString() + "/"; FlowFile unpackedFile = session.create(source); try { final Map<String, String> attributes = new HashMap<>(); - attributes.put(CoreAttributes.FILENAME.key(), file.getName()); - attributes.put(CoreAttributes.PATH.key(), filePathString); - attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString); - attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); + attributes. + put(CoreAttributes.FILENAME.key(), file. + getName()); + attributes. + put(CoreAttributes.PATH.key(), filePathString); + attributes.put(CoreAttributes.ABSOLUTE_PATH. + key(), absPathString); + attributes. + put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); attributes.put(FRAGMENT_ID, fragmentId); - attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount)); + attributes.put(FRAGMENT_INDEX, String. + valueOf(++fragmentCount)); - unpackedFile = session.putAllAttributes(unpackedFile, attributes); + unpackedFile = session. + putAllAttributes(unpackedFile, attributes); final long fileSize = tarEntry.getSize(); - unpackedFile = session.write(unpackedFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - StreamUtils.copy(tarIn, out, fileSize); - } - }); + unpackedFile = session. + write(unpackedFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + StreamUtils. + copy(tarIn, out, fileSize); + } + }); } finally { unpacked.add(unpackedFile); } @@ -283,7 +314,8 @@ public class UnpackContent extends AbstractProcessor { @Override public void unpack(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) { - final String fragmentId = UUID.randomUUID().toString(); + final String fragmentId = UUID.randomUUID(). + toString(); session.read(source, new InputStreamCallback() { @Override public void process(final InputStream in) throws IOException { @@ -295,28 +327,39 @@ public class UnpackContent extends AbstractProcessor { continue; } final File file = new File(zipEntry.getName()); - final String parentDirectory = (file.getParent() == null) ? "/" : file.getParent(); - final Path absPath = file.toPath().toAbsolutePath(); - final String absPathString = absPath.getParent().toString() + "/"; + final String parentDirectory = (file.getParent() == null) ? "/" : file. + getParent(); + final Path absPath = file.toPath(). + toAbsolutePath(); + final String absPathString = absPath.getParent(). + toString() + "/"; FlowFile unpackedFile = session.create(source); try { final Map<String, String> attributes = new HashMap<>(); - attributes.put(CoreAttributes.FILENAME.key(), file.getName()); - attributes.put(CoreAttributes.PATH.key(), parentDirectory); - attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString); - attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); + attributes. + put(CoreAttributes.FILENAME.key(), file. + getName()); + attributes. + put(CoreAttributes.PATH.key(), parentDirectory); + attributes.put(CoreAttributes.ABSOLUTE_PATH. + key(), absPathString); + attributes. + put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); attributes.put(FRAGMENT_ID, fragmentId); - attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount)); - - unpackedFile = session.putAllAttributes(unpackedFile, attributes); - unpackedFile = session.write(unpackedFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - StreamUtils.copy(zipIn, out); - } - }); + attributes.put(FRAGMENT_INDEX, String. + valueOf(++fragmentCount)); + + unpackedFile = session. + putAllAttributes(unpackedFile, attributes); + unpackedFile = session. + write(unpackedFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + StreamUtils.copy(zipIn, out); + } + }); } finally { unpacked.add(unpackedFile); } @@ -345,20 +388,24 @@ public class UnpackContent extends AbstractProcessor { final ObjectHolder<Map<String, String>> attributesRef = new ObjectHolder<>(null); FlowFile unpackedFile = session.create(source); try { - unpackedFile = session.write(unpackedFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream rawOut) throws IOException { - try (final OutputStream out = new BufferedOutputStream(rawOut)) { - final Map<String, String> attributes = unpackager.unpackageFlowFile(in, out); - if (attributes == null) { - throw new IOException("Failed to unpack " + source + ": stream had no Attributes"); + unpackedFile = session. + write(unpackedFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final OutputStream out = new BufferedOutputStream(rawOut)) { + final Map<String, String> attributes = unpackager. + unpackageFlowFile(in, out); + if (attributes == null) { + throw new IOException("Failed to unpack " + source + ": stream had no Attributes"); + } + attributesRef. + set(attributes); + } } - attributesRef.set(attributes); - } - } - }); + }); - final Map<String, String> attributes = attributesRef.get(); + final Map<String, String> attributes = attributesRef. + get(); // Remove the UUID from the attributes because we don't want to use the same UUID for this FlowFile. // If we do, then we get into a weird situation if we use MergeContent to create a FlowFile Package @@ -366,16 +413,24 @@ public class UnpackContent extends AbstractProcessor { attributes.remove(CoreAttributes.UUID.key()); // maintain backward compatibility with legacy NiFi attribute names - mapAttributes(attributes, "nf.file.name", CoreAttributes.FILENAME.key()); - mapAttributes(attributes, "nf.file.path", CoreAttributes.PATH.key()); - mapAttributes(attributes, "content-encoding", CoreAttributes.MIME_TYPE.key()); - mapAttributes(attributes, "content-type", CoreAttributes.MIME_TYPE.key()); - - if (!attributes.containsKey(CoreAttributes.MIME_TYPE.key())) { - attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); + mapAttributes(attributes, "nf.file.name", CoreAttributes.FILENAME. + key()); + mapAttributes(attributes, "nf.file.path", CoreAttributes.PATH. + key()); + mapAttributes(attributes, "content-encoding", CoreAttributes.MIME_TYPE. + key()); + mapAttributes(attributes, "content-type", CoreAttributes.MIME_TYPE. + key()); + + if (!attributes. + containsKey(CoreAttributes.MIME_TYPE. + key())) { + attributes.put(CoreAttributes.MIME_TYPE. + key(), OCTET_STREAM); } - unpackedFile = session.putAllAttributes(unpackedFile, attributes); + unpackedFile = session. + putAllAttributes(unpackedFile, attributes); } finally { unpacked.add(unpackedFile); } @@ -419,9 +474,12 @@ public class UnpackContent extends AbstractProcessor { } } - String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key()); - if (originalFilename.endsWith(".tar") || originalFilename.endsWith(".zip") || originalFilename.endsWith(".pkg")) { - originalFilename = originalFilename.substring(0, originalFilename.length() - 4); + String originalFilename = source.getAttribute(CoreAttributes.FILENAME. + key()); + if (originalFilename.endsWith(".tar") || originalFilename. + endsWith(".zip") || originalFilename.endsWith(".pkg")) { + originalFilename = originalFilename.substring(0, originalFilename. + length() - 4); } // second pass adds fragment attributes http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateXml.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateXml.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateXml.java index 4808a59..3f761d1 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateXml.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateXml.java @@ -58,15 +58,21 @@ import org.xml.sax.SAXException; @CapabilityDescription("Validates the contents of FlowFiles against a user-specified XML Schema file") public class ValidateXml extends AbstractProcessor { - public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder() - .name("Schema File") - .description("The path to the Schema file that is to be used for validation") - .required(true) - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) - .build(); - - public static final Relationship REL_VALID = new Relationship.Builder().name("valid").description("FlowFiles that are successfully validated against the schema are routed to this relationship").build(); - public static final Relationship REL_INVALID = new Relationship.Builder().name("invalid").description("FlowFiles that are not valid according to the specified schema are routed to this relationship").build(); + public static final PropertyDescriptor SCHEMA_FILE = new PropertyDescriptor.Builder(). + name("Schema File"). + description("The path to the Schema file that is to be used for validation"). + required(true). + addValidator(StandardValidators.FILE_EXISTS_VALIDATOR). + build(); + + public static final Relationship REL_VALID = new Relationship.Builder(). + name("valid"). + description("FlowFiles that are successfully validated against the schema are routed to this relationship"). + build(); + public static final Relationship REL_INVALID = new Relationship.Builder(). + name("invalid"). + description("FlowFiles that are not valid according to the specified schema are routed to this relationship"). + build(); private static final String SCHEMA_LANGUAGE = "http://www.w3.org/2001/XMLSchema"; @@ -99,8 +105,10 @@ public class ValidateXml extends AbstractProcessor { @OnScheduled public void parseSchema(final ProcessContext context) throws IOException, SAXException { try { - final File file = new File(context.getProperty(SCHEMA_FILE).getValue()); - final SchemaFactory schemaFactory = SchemaFactory.newInstance(SCHEMA_LANGUAGE); + final File file = new File(context.getProperty(SCHEMA_FILE). + getValue()); + final SchemaFactory schemaFactory = SchemaFactory. + newInstance(SCHEMA_LANGUAGE); final Schema schema = schemaFactory.newSchema(file); this.schemaRef.set(schema); } catch (final SAXException e) { @@ -128,18 +136,23 @@ public class ValidateXml extends AbstractProcessor { validator.validate(new StreamSource(in)); } catch (final IllegalArgumentException | SAXException e) { valid.set(false); - logger.debug("Failed to validate {} against schema due to {}", new Object[]{flowFile, e}); + logger. + debug("Failed to validate {} against schema due to {}", new Object[]{flowFile, e}); } } }); if (valid.get()) { - logger.info("Successfully validated {} against schema; routing to 'valid'", new Object[]{flowFile}); - session.getProvenanceReporter().route(flowFile, REL_VALID); + logger. + info("Successfully validated {} against schema; routing to 'valid'", new Object[]{flowFile}); + session.getProvenanceReporter(). + route(flowFile, REL_VALID); session.transfer(flowFile, REL_VALID); } else { - logger.info("Failed to validate {} against schema; routing to 'invalid'", new Object[]{flowFile}); - session.getProvenanceReporter().route(flowFile, REL_INVALID); + logger. + info("Failed to validate {} against schema; routing to 'invalid'", new Object[]{flowFile}); + session.getProvenanceReporter(). + route(flowFile, REL_INVALID); session.transfer(flowFile, REL_INVALID); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java index 51fcbc2..40c7e65 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java @@ -59,24 +59,33 @@ public class ContentAcknowledgmentServlet extends HttpServlet { @Override public void init(final ServletConfig config) throws ServletException { final ServletContext context = config.getServletContext(); - this.processor = (Processor) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PROCESSOR); - this.logger = (ProcessorLog) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER); - this.authorizedPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN); - this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP); + this.processor = (Processor) context. + getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PROCESSOR); + this.logger = (ProcessorLog) context. + getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER); + this.authorizedPattern = (Pattern) context. + getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN); + this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context. + getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP); } @Override protected void doDelete(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { - final X509Certificate[] certs = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate"); + final X509Certificate[] certs = (X509Certificate[]) request. + getAttribute("javax.servlet.request.X509Certificate"); String foundSubject = DEFAULT_FOUND_SUBJECT; if (certs != null && certs.length > 0) { for (final X509Certificate cert : certs) { - foundSubject = cert.getSubjectDN().getName(); - if (authorizedPattern.matcher(foundSubject).matches()) { + foundSubject = cert.getSubjectDN(). + getName(); + if (authorizedPattern.matcher(foundSubject). + matches()) { break; } else { - logger.warn(processor + " rejecting transfer attempt from " + foundSubject + " because the DN is not authorized"); - response.sendError(HttpServletResponse.SC_FORBIDDEN, "not allowed based on dn"); + logger. + warn(processor + " rejecting transfer attempt from " + foundSubject + " because the DN is not authorized"); + response. + sendError(HttpServletResponse.SC_FORBIDDEN, "not allowed based on dn"); return; } } @@ -92,7 +101,10 @@ public class ContentAcknowledgmentServlet extends HttpServlet { final String uuid = uri.substring(slashIndex + 1, questionIndex); final FlowFileEntryTimeWrapper timeWrapper = flowFileMap.remove(uuid); if (timeWrapper == null) { - logger.warn("received DELETE for HOLD with ID " + uuid + " from Remote Host: [" + request.getRemoteHost() + "] Port [" + request.getRemotePort() + "] SubjectDN [" + foundSubject + "], but no HOLD exists with that ID; sending response with Status Code 404"); + logger. + warn("received DELETE for HOLD with ID " + uuid + " from Remote Host: [" + request. + getRemoteHost() + "] Port [" + request. + getRemotePort() + "] SubjectDN [" + foundSubject + "], but no HOLD exists with that ID; sending response with Status Code 404"); response.sendError(HttpServletResponse.SC_NOT_FOUND); return; } @@ -100,7 +112,8 @@ public class ContentAcknowledgmentServlet extends HttpServlet { try { final Set<FlowFile> flowFiles = timeWrapper.getFlowFiles(); - final long transferTime = System.currentTimeMillis() - timeWrapper.getEntryTime(); + final long transferTime = System.currentTimeMillis() - timeWrapper. + getEntryTime(); long totalFlowFileSize = 0; for (final FlowFile flowFile : flowFiles) { totalFlowFileSize += flowFile.getSize(); @@ -111,10 +124,13 @@ public class ContentAcknowledgmentServlet extends HttpServlet { seconds = .00000001D; } final double bytesPerSecond = ((double) totalFlowFileSize / seconds); - final String transferRate = FormatUtils.formatDataSize(bytesPerSecond) + "/sec"; + final String transferRate = FormatUtils. + formatDataSize(bytesPerSecond) + "/sec"; - logger.info("received {} files/{} bytes from Remote Host: [{}] Port [{}] SubjectDN [{}] in {} milliseconds at a rate of {}; transferring to 'success': {}", - new Object[]{flowFiles.size(), totalFlowFileSize, request.getRemoteHost(), request.getRemotePort(), foundSubject, transferTime, transferRate, flowFiles}); + logger. + info("received {} files/{} bytes from Remote Host: [{}] Port [{}] SubjectDN [{}] in {} milliseconds at a rate of {}; transferring to 'success': {}", + new Object[]{flowFiles.size(), totalFlowFileSize, request. + getRemoteHost(), request.getRemotePort(), foundSubject, transferTime, transferRate, flowFiles}); final ProcessSession session = timeWrapper.getSession(); session.transfer(flowFiles, ListenHTTP.RELATIONSHIP_SUCCESS); @@ -123,9 +139,12 @@ public class ContentAcknowledgmentServlet extends HttpServlet { response.setStatus(HttpServletResponse.SC_OK); response.flushBuffer(); } catch (final Throwable t) { - timeWrapper.getSession().rollback(); - logger.error("received DELETE for HOLD with ID {} from Remote Host: [{}] Port [{}] SubjectDN [{}], but failed to process the request due to {}", - new Object[]{uuid, request.getRemoteHost(), request.getRemotePort(), foundSubject, t.toString()}); + timeWrapper.getSession(). + rollback(); + logger. + error("received DELETE for HOLD with ID {} from Remote Host: [{}] Port [{}] SubjectDN [{}], but failed to process the request due to {}", + new Object[]{uuid, request.getRemoteHost(), request. + getRemotePort(), foundSubject, t.toString()}); if (logger.isDebugEnabled()) { logger.error("", t); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java index 1cf5f1f..7e2338a 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java @@ -103,13 +103,20 @@ public class ListenHTTPServlet extends HttpServlet { @Override public void init(final ServletConfig config) throws ServletException { final ServletContext context = config.getServletContext(); - this.logger = (ProcessorLog) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER); - this.sessionFactoryHolder = (AtomicReference<ProcessSessionFactory>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER); - this.processContext = (ProcessContext) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER); - this.authorizedPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN); - this.headerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN); - this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP); - this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER); + this.logger = (ProcessorLog) context. + getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER); + this.sessionFactoryHolder = (AtomicReference<ProcessSessionFactory>) context. + getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER); + this.processContext = (ProcessContext) context. + getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER); + this.authorizedPattern = (Pattern) context. + getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN); + this.headerPattern = (Pattern) context. + getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN); + this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context. + getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP); + this.streamThrottler = (StreamThrottler) context. + getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER); } @Override @@ -122,7 +129,7 @@ public class ListenHTTPServlet extends HttpServlet { @Override protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { final ProcessContext context = processContext; - + ProcessSessionFactory sessionFactory; do { sessionFactory = sessionFactoryHolder.get(); @@ -141,12 +148,15 @@ public class ListenHTTPServlet extends HttpServlet { try { final long n = filesReceived.getAndIncrement() % FILES_BEFORE_CHECKING_DESTINATION_SPACE; if (n == 0 || !spaceAvailable.get()) { - if (context.getAvailableRelationships().isEmpty()) { + if (context.getAvailableRelationships(). + isEmpty()) { spaceAvailable.set(false); if (logger.isDebugEnabled()) { - logger.debug("Received request from " + request.getRemoteHost() + " but no space available; Indicating Service Unavailable"); + logger.debug("Received request from " + request. + getRemoteHost() + " but no space available; Indicating Service Unavailable"); } - response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); + response. + sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); return; } else { spaceAvailable.set(true); @@ -154,24 +164,32 @@ public class ListenHTTPServlet extends HttpServlet { } response.setHeader("Content-Type", MediaType.TEXT_PLAIN); - final boolean contentGzipped = Boolean.parseBoolean(request.getHeader(GZIPPED_HEADER)); + final boolean contentGzipped = Boolean.parseBoolean(request. + getHeader(GZIPPED_HEADER)); - final X509Certificate[] certs = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate"); + final X509Certificate[] certs = (X509Certificate[]) request. + getAttribute("javax.servlet.request.X509Certificate"); foundSubject = DEFAULT_FOUND_SUBJECT; if (certs != null && certs.length > 0) { for (final X509Certificate cert : certs) { - foundSubject = cert.getSubjectDN().getName(); - if (authorizedPattern.matcher(foundSubject).matches()) { + foundSubject = cert.getSubjectDN(). + getName(); + if (authorizedPattern.matcher(foundSubject). + matches()) { break; } else { - logger.warn("Rejecting transfer attempt from " + foundSubject + " because the DN is not authorized, host=" + request.getRemoteHost()); - response.sendError(HttpServletResponse.SC_FORBIDDEN, "not allowed based on dn"); + logger. + warn("Rejecting transfer attempt from " + foundSubject + " because the DN is not authorized, host=" + request. + getRemoteHost()); + response. + sendError(HttpServletResponse.SC_FORBIDDEN, "not allowed based on dn"); return; } } } - final String destinationVersion = request.getHeader(PROTOCOL_VERSION_HEADER); + final String destinationVersion = request. + getHeader(PROTOCOL_VERSION_HEADER); Integer protocolVersion = null; if (destinationVersion != null) { try { @@ -182,15 +200,19 @@ public class ListenHTTPServlet extends HttpServlet { } final boolean destinationIsLegacyNiFi = (protocolVersion == null); - final boolean createHold = Boolean.parseBoolean(request.getHeader(FLOWFILE_CONFIRMATION_HEADER)); + final boolean createHold = Boolean.parseBoolean(request. + getHeader(FLOWFILE_CONFIRMATION_HEADER)); final String contentType = request.getContentType(); - final InputStream unthrottled = contentGzipped ? new GZIPInputStream(request.getInputStream()) : request.getInputStream(); + final InputStream unthrottled = contentGzipped ? new GZIPInputStream(request. + getInputStream()) : request.getInputStream(); - final InputStream in = (streamThrottler == null) ? unthrottled : streamThrottler.newThrottledInputStream(unthrottled); + final InputStream in = (streamThrottler == null) ? unthrottled : streamThrottler. + newThrottledInputStream(unthrottled); if (logger.isDebugEnabled()) { - logger.debug("Received request from " + request.getRemoteHost() + ", createHold=" + createHold + ", content-type=" + contentType + ", gzip=" + contentGzipped); + logger. + debug("Received request from " + request.getRemoteHost() + ", createHold=" + createHold + ", content-type=" + contentType + ", gzip=" + contentGzipped); } final AtomicBoolean hasMoreData = new AtomicBoolean(false); @@ -219,16 +241,21 @@ public class ListenHTTPServlet extends HttpServlet { IOUtils.copy(in, bos); hasMoreData.set(false); } else { - attributes.putAll(unpackager.unpackageFlowFile(in, bos)); + attributes.putAll(unpackager. + unpackageFlowFile(in, bos)); if (destinationIsLegacyNiFi) { if (attributes.containsKey("nf.file.name")) { // for backward compatibility with old nifi... - attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name")); + attributes.put(CoreAttributes.FILENAME. + key(), attributes. + remove("nf.file.name")); } if (attributes.containsKey("nf.file.path")) { - attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path")); + attributes. + put(CoreAttributes.PATH.key(), attributes. + remove("nf.file.path")); } } @@ -242,36 +269,45 @@ public class ListenHTTPServlet extends HttpServlet { }); final long transferNanos = System.nanoTime() - startNanos; - final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); + final long transferMillis = TimeUnit.MILLISECONDS. + convert(transferNanos, TimeUnit.NANOSECONDS); // put metadata on flowfile - final String nameVal = request.getHeader(CoreAttributes.FILENAME.key()); + final String nameVal = request. + getHeader(CoreAttributes.FILENAME.key()); if (StringUtils.isNotBlank(nameVal)) { attributes.put(CoreAttributes.FILENAME.key(), nameVal); } - + // put arbitrary headers on flow file - for(Enumeration<String> headerEnum = request.getHeaderNames(); - headerEnum.hasMoreElements(); ) { - String headerName = headerEnum.nextElement(); - if (headerPattern != null && headerPattern.matcher(headerName).matches()) { - String headerValue = request.getHeader(headerName); - attributes.put(headerName, headerValue); - } + for (Enumeration<String> headerEnum = request.getHeaderNames(); + headerEnum.hasMoreElements();) { + String headerName = headerEnum.nextElement(); + if (headerPattern != null && headerPattern. + matcher(headerName). + matches()) { + String headerValue = request.getHeader(headerName); + attributes.put(headerName, headerValue); + } } - String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key()); + String sourceSystemFlowFileIdentifier = attributes. + get(CoreAttributes.UUID.key()); if (sourceSystemFlowFileIdentifier != null) { sourceSystemFlowFileIdentifier = "urn:nifi:" + sourceSystemFlowFileIdentifier; // If we receveied a UUID, we want to give the FlowFile a new UUID and register the sending system's // identifier as the SourceSystemFlowFileIdentifier field in the Provenance RECEIVE event - attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); + attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID(). + toString()); } flowFile = session.putAllAttributes(flowFile, attributes); - session.getProvenanceReporter().receive(flowFile, request.getRequestURL().toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis); - flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject); + session.getProvenanceReporter(). + receive(flowFile, request.getRequestURL(). + toString(), sourceSystemFlowFileIdentifier, "Remote DN=" + foundSubject, transferMillis); + flowFile = session. + putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject); flowFileSet.add(flowFile); if (holdUuid == null) { @@ -280,34 +316,45 @@ public class ListenHTTPServlet extends HttpServlet { } while (hasMoreData.get()); if (createHold) { - String uuid = (holdUuid == null) ? UUID.randomUUID().toString() : holdUuid; + String uuid = (holdUuid == null) ? UUID.randomUUID(). + toString() : holdUuid; if (flowFileMap.containsKey(uuid)) { - uuid = UUID.randomUUID().toString(); + uuid = UUID.randomUUID(). + toString(); } - final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis()); + final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System. + currentTimeMillis()); FlowFileEntryTimeWrapper previousWrapper; do { previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper); if (previousWrapper != null) { - uuid = UUID.randomUUID().toString(); + uuid = UUID.randomUUID(). + toString(); } } while (previousWrapper != null); response.setStatus(HttpServletResponse.SC_SEE_OTHER); final String ackUri = ListenHTTP.URI + "/holds/" + uuid; response.addHeader(LOCATION_HEADER_NAME, ackUri); - response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE); - response.getOutputStream().write(ackUri.getBytes("UTF-8")); + response. + addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE); + response.getOutputStream(). + write(ackUri.getBytes("UTF-8")); if (logger.isDebugEnabled()) { - logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}]; placed hold on these {} files with ID {}", - new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet.size(), uuid}); + logger. + debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}]; placed hold on these {} files with ID {}", + new Object[]{flowFileSet, request. + getRemoteHost(), request.getRemotePort(), foundSubject, flowFileSet. + size(), uuid}); } } else { response.setStatus(HttpServletResponse.SC_OK); - logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success' {}", - new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject, flowFile}); + logger. + info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}]; transferring to 'success' {}", + new Object[]{request.getRemoteHost(), request. + getRemotePort(), foundSubject, flowFile}); session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS); session.commit(); @@ -315,11 +362,16 @@ public class ListenHTTPServlet extends HttpServlet { } catch (final Throwable t) { session.rollback(); if (flowFile == null) { - logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, t}); + logger. + error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{request. + getRemoteHost(), foundSubject, t}); } else { - logger.error("Unable to receive file {} from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{flowFile, request.getRemoteHost(), foundSubject, t}); + logger. + error("Unable to receive file {} from Remote Host: [{}] SubjectDN [{}] due to {}", new Object[]{flowFile, request. + getRemoteHost(), foundSubject, t}); } - response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString()); + response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t. + toString()); } } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/Bin.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/Bin.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/Bin.java index 45bd8ed..aa5cdc3 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/Bin.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/Bin.java @@ -117,7 +117,8 @@ public class Bin { * @return true if added; false otherwise */ public boolean offer(final FlowFile flowFile, final ProcessSession session) { - if (((size + flowFile.getSize()) > maximumSizeBytes) || (binContents.size() >= maximumEntries)) { + if (((size + flowFile.getSize()) > maximumSizeBytes) || (binContents. + size() >= maximumEntries)) { successiveFailedOfferings++; return false; } @@ -143,7 +144,8 @@ public class Bin { if (value == null) { return null; } - if (!intPattern.matcher(value).matches()) { + if (!intPattern.matcher(value). + matches()) { return null; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/BinManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/BinManager.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/BinManager.java index 722efae..eeadfa6 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/BinManager.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/BinManager.java @@ -60,8 +60,10 @@ public class BinManager { try { for (final List<Bin> binList : groupBinMap.values()) { for (final Bin bin : binList) { - for (final FlowFileSessionWrapper wrapper : bin.getContents()) { - wrapper.getSession().rollback(); + for (final FlowFileSessionWrapper wrapper : bin. + getContents()) { + wrapper.getSession(). + rollback(); } } } @@ -126,7 +128,8 @@ public class BinManager { final List<Bin> currentBins = groupBinMap.get(groupIdentifier); if (currentBins == null) { // this is a new group we need to register final List<Bin> bins = new ArrayList<>(); - final Bin bin = new Bin(minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(), maxEntries.get(), fileCountAttribute.get()); + final Bin bin = new Bin(minSizeBytes.get(), currentMaxSizeBytes, minEntries. + get(), maxEntries.get(), fileCountAttribute.get()); bins.add(bin); groupBinMap.put(groupIdentifier, bins); binCount++; @@ -140,7 +143,8 @@ public class BinManager { } //if we've reached this point then we couldn't fit it into any existing bins - gotta make a new one - final Bin bin = new Bin(minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(), maxEntries.get(), fileCountAttribute.get()); + final Bin bin = new Bin(minSizeBytes.get(), currentMaxSizeBytes, minEntries. + get(), maxEntries.get(), fileCountAttribute.get()); currentBins.add(bin); binCount++; return bin.offer(flowFile, session); @@ -157,7 +161,7 @@ public class BinManager { * @param relaxFullnessConstraint if false will require bins to be full * before considered ready; if true bins only have to meet their minimum * size criteria or be 'old' and then they'll be considered ready - * @return + * @return */ public Collection<Bin> removeReadyBins(boolean relaxFullnessConstraint) { final Map<String, List<Bin>> newGroupMap = new HashMap<>(); @@ -165,10 +169,12 @@ public class BinManager { wLock.lock(); try { - for (final Map.Entry<String, List<Bin>> group : groupBinMap.entrySet()) { + for (final Map.Entry<String, List<Bin>> group : groupBinMap. + entrySet()) { final List<Bin> remainingBins = new ArrayList<>(); for (final Bin bin : group.getValue()) { - if (relaxFullnessConstraint && (bin.isFullEnough() || bin.isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS))) { //relaxed check + if (relaxFullnessConstraint && (bin.isFullEnough() || bin. + isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS))) { //relaxed check readyBins.add(bin); } else if (!relaxFullnessConstraint && bin.isFull()) { //strict check readyBins.add(bin); @@ -195,7 +201,8 @@ public class BinManager { Bin oldestBin = null; String oldestBinGroup = null; - for (final Map.Entry<String, List<Bin>> group : groupBinMap.entrySet()) { + for (final Map.Entry<String, List<Bin>> group : groupBinMap. + entrySet()) { for (final Bin bin : group.getValue()) { if (oldestBin == null || bin.isOlderThan(oldestBin)) { oldestBin = bin; @@ -228,7 +235,8 @@ public class BinManager { try { for (final List<Bin> bins : groupBinMap.values()) { for (final Bin bin : bins) { - if (bin.isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS)) { + if (bin. + isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS)) { return true; } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/DocumentReaderCallback.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/DocumentReaderCallback.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/DocumentReaderCallback.java index bf28003..3131f40 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/DocumentReaderCallback.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/DocumentReaderCallback.java @@ -52,7 +52,8 @@ public class DocumentReaderCallback implements InputStreamCallback { @Override public void process(final InputStream stream) throws IOException { try { - DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + DocumentBuilderFactory factory = DocumentBuilderFactory. + newInstance(); factory.setNamespaceAware(isNamespaceAware); DocumentBuilder builder = factory.newDocumentBuilder(); document = builder.parse(stream);