http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/expression/ExpressionLanguageCompiler.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/expression/ExpressionLanguageCompiler.java b/nifi-api/src/main/java/org/apache/nifi/expression/ExpressionLanguageCompiler.java new file mode 100644 index 0000000..9383d27 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/expression/ExpressionLanguageCompiler.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.expression; + +import org.apache.nifi.expression.AttributeExpression.ResultType; + +public interface ExpressionLanguageCompiler { + + /** + * Compiles the given Attribute Expression string into an + * AttributeExpression that can be evaluated + * + * @param expression the Attribute Expression to be compiled + * @return expression that can be evaluated + * @throws IllegalArgumentException if the given expression is not valid + */ + AttributeExpression compile(String expression) throws IllegalArgumentException; + + /** + * Indicates whether or not the given string is a valid Attribute + * Expression. + * + * @param expression to validate + * @return if is value or not + */ + boolean isValidExpression(String expression); + + /** + * Attempts to validate the given expression and returns <code>null</code> + * if the expression is syntactically valid or a String indicating why the + * expression is invalid otherwise. + * + * @param expression to validate + * @param allowSurroundingCharacters if <code>true</code> allows characters + * to surround the Expression, otherwise the expression must be exactly + * equal to a valid Expression. E.g., <code>/${path}</code> is valid if and + * only if <code>allowSurroundingCharacters</code> is true + * + * @return a String indicating the reason that the expression is not + * syntactically correct, or <code>null</code> if the expression is + * syntactically correct + */ + String validateExpression(String expression, boolean allowSurroundingCharacters); + + /** + * Returns the ResultType that will be returned by the given Expression + * + * @param expression the Expression to evaluate + * @return result type for the given expression + * @throws IllegalArgumentException if the given Expression is not a valid + * Expression Language Expression; the message of this Exception will + * indicate the problem if the expression is not syntactically valid. + */ + ResultType getResultType(String expression) throws IllegalArgumentException; +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java new file mode 100644 index 0000000..0e2c19d --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.flowfile; + +import java.util.Map; +import java.util.Set; + +/** + * <p> + * A flow file is a logical notion of an item in a flow with its associated + * attributes and identity which can be used as a reference for its actual + * content.</p> + * + * <b>All FlowFile implementations must be Immutable - Thread safe.</b> + */ +public interface FlowFile extends Comparable<FlowFile> { + + /** + * @return the unique identifier for this flow file + */ + long getId(); + + /** + * @return the date at which the flow file entered the flow + */ + long getEntryDate(); + + /** + * @return the date at which the origin of this FlowFile entered the flow. + * For example, if FlowFile Z were derived from FlowFile Y and FlowFile Y + * was derived from FlowFile X, this date would be the entryDate (see + * {@link #getEntryDate()} of FlowFile X. + */ + long getLineageStartDate(); + + /** + * @return the time at which the FlowFile was most recently added to a + * FlowFile queue, or {@code null} if the FlowFile has never been enqueued. + * This value will always be populated before it is passed to a + * {@link FlowFilePrioritizer} + */ + Long getLastQueueDate(); + + /** + * <p> + * If a FlowFile is derived from multiple "parent" FlowFiles, all of the + * parents' Lineage Identifiers will be in the set. + * </p> + * + * @return a set of identifiers that are unique to this FlowFile's lineage. + * If FlowFile X is derived from FlowFile Y, both FlowFiles will have the + * same value for the Lineage Claim ID. + */ + Set<String> getLineageIdentifiers(); + + /** + * @return true if flow file is currently penalized; false otherwise; + */ + boolean isPenalized(); + + /** + * Obtains the attribute value for the given key + * + * @param key of the attribute + * @return value if found; null otherwise + */ + String getAttribute(String key); + + /** + * @return size of flow file contents in bytes + */ + long getSize(); + + /** + * @return an unmodifiable map of the flow file attributes + */ + Map<String, String> getAttributes(); + + public static class KeyValidator { + + public static String validateKey(final String key) { + // We used to validate the key by disallowing a handful of keywords, but this requirement no longer exists. + // Therefore this method simply verifies that the key is not empty. + if (key == null) { + throw new IllegalArgumentException("Invalid attribute key: null"); + } + if (key.trim().isEmpty()) { + throw new IllegalArgumentException("Invalid attribute key: <Empty String>"); + } + return key; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFilePrioritizer.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFilePrioritizer.java b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFilePrioritizer.java new file mode 100644 index 0000000..684f454 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFilePrioritizer.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.flowfile; + +import java.util.Comparator; + +/** + * Provides a mechanism to prioritize flow file objects based on their + * attributes. The actual flow file content will not be available for comparison + * so if features of that content are necessary for prioritization it should be + * extracted to be used as an attribute of the flow file. + * + */ +public interface FlowFilePrioritizer extends Comparator<FlowFile> { +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/logging/ComponentLog.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/logging/ComponentLog.java b/nifi-api/src/main/java/org/apache/nifi/logging/ComponentLog.java new file mode 100644 index 0000000..b4b3c6a --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/logging/ComponentLog.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.logging; + +/** + * <p> + * The ComponentLog provides a mechanism to ensure that all NiFi components are + * logging and reporting information in a consistent way. When messages are + * logged to the ComponentLog, each message has the following characteristics: + * </p> + * + * <ul> + * <li> + * The <code>toString()</code> of the component is automatically prepended to + * the message so that it is clear which component is providing the information. + * This is important, since a single component may have many different instances + * within the same NiFi instance. + * </li> + * <li> + * If the last value in an Object[] argument that is passed to the logger is a + * Throwable, then the logged message will include a <code>toString()</code> of + * the Throwable; in addition, if the component's logger is set to DEBUG level + * via the logback configuration, the Stacktrace will also be logged. This + * provides a mechanism to easily enable stacktraces in the logs when they are + * desired without filling the logs with unneeded stack traces for messages that + * end up occurring often. + * </li> + * <li> + * Any message that is logged with a Severity level that meets or exceeds the + * configured Bulletin Level for that component will also cause a Bulletin to be + * generated, so that the message is visible in the UI, allowing Dataflow + * Managers to understand that a problem exists and what the issue is. + * </li> + * </ul> + * + */ +public interface ComponentLog { + + void warn(String msg, Throwable t); + + void warn(String msg, Object[] os); + + void warn(String msg, Object[] os, Throwable t); + + void warn(String msg); + + void trace(String msg, Throwable t); + + void trace(String msg, Object[] os); + + void trace(String msg); + + void trace(String msg, Object[] os, Throwable t); + + boolean isWarnEnabled(); + + boolean isTraceEnabled(); + + boolean isInfoEnabled(); + + boolean isErrorEnabled(); + + boolean isDebugEnabled(); + + void info(String msg, Throwable t); + + void info(String msg, Object[] os); + + void info(String msg); + + void info(String msg, Object[] os, Throwable t); + + String getName(); + + void error(String msg, Throwable t); + + void error(String msg, Object[] os); + + void error(String msg); + + void error(String msg, Object[] os, Throwable t); + + void debug(String msg, Throwable t); + + void debug(String msg, Object[] os); + + void debug(String msg, Object[] os, Throwable t); + + void debug(String msg); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/logging/LogLevel.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/logging/LogLevel.java b/nifi-api/src/main/java/org/apache/nifi/logging/LogLevel.java new file mode 100644 index 0000000..0c9b64b --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/logging/LogLevel.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.logging; + +public enum LogLevel { + + TRACE, + DEBUG, + INFO, + WARN, + ERROR, + FATAL; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/logging/ProcessorLog.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/logging/ProcessorLog.java b/nifi-api/src/main/java/org/apache/nifi/logging/ProcessorLog.java new file mode 100644 index 0000000..a90ee26 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/logging/ProcessorLog.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.logging; + +/** + * The ProcessorLog is an extension of ComponentLog but provides no additional + * functionality. It exists because ProcessorLog was created first, but when + * Controller Services and Reporting Tasks began to be used more heavily loggers + * were needed for them as well. We did not want to return a ProcessorLog to a + * ControllerService or a ReportingTask, so all of the methods were moved to a + * higher interface named ComponentLog. However, we kept the ProcessorLog + * interface around in order to maintain backward compatibility. + */ +public interface ProcessorLog extends ComponentLog { + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/processor/AbstractProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/AbstractProcessor.java b/nifi-api/src/main/java/org/apache/nifi/processor/AbstractProcessor.java new file mode 100644 index 0000000..7ff568e --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/processor/AbstractProcessor.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor; + +import org.apache.nifi.processor.exception.ProcessException; + +public abstract class AbstractProcessor extends AbstractSessionFactoryProcessor { + + @Override + public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + final ProcessSession session = sessionFactory.createSession(); + try { + onTrigger(context, session); + session.commit(); + } catch (final Throwable t) { + getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t}); + session.rollback(true); + throw t; + } + } + + public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/processor/AbstractSessionFactoryProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/AbstractSessionFactoryProcessor.java b/nifi-api/src/main/java/org/apache/nifi/processor/AbstractSessionFactoryProcessor.java new file mode 100644 index 0000000..2695dcd --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/processor/AbstractSessionFactoryProcessor.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor; + +import java.util.Collections; +import java.util.Set; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AbstractConfigurableComponent; +import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.logging.ProcessorLog; + +/** + * <p> + * Provides a standard partial implementation of a {@link Processor}. This + * implementation provides default behavior and various convenience hooks for + * processing.</p> + * + * <p> + * Implementation/Design note: This class follows the open/closed principle in a + * fairly strict manner meaning that subclasses are free to customize behavior + * in specifically designed points exclusively. If greater flexibility is + * necessary then it is still possible to simply implement the {@link Processor} + * interface.</p> + * + * <p> + * Thread safe</p> + * + */ +public abstract class AbstractSessionFactoryProcessor extends AbstractConfigurableComponent implements Processor { + + private String identifier; + private ProcessorLog logger; + private volatile boolean scheduled = false; + private ControllerServiceLookup serviceLookup; + private String description; + + @Override + public final void initialize(final ProcessorInitializationContext context) { + identifier = context.getIdentifier(); + logger = context.getLogger(); + serviceLookup = context.getControllerServiceLookup(); + init(context); + + description = getClass().getSimpleName() + "[id=" + identifier + "]"; + } + + /** + * @return the {@link ControllerServiceLookup} that was passed to the + * {@link #init(ProcessorInitializationContext)} method + */ + protected final ControllerServiceLookup getControllerServiceLookup() { + return serviceLookup; + } + + @Override + public Set<Relationship> getRelationships() { + return Collections.emptySet(); + } + + protected final ProcessorLog getLogger() { + return logger; + } + + /** + * Provides subclasses the ability to perform initialization logic + * + * @param context in which to perform initialization + */ + protected void init(final ProcessorInitializationContext context) { + // Provided for subclasses to override + } + + /** + * @return <code>true</code> if the processor is scheduled to run, + * <code>false</code> otherwise + */ + protected final boolean isScheduled() { + return scheduled; + } + + @OnScheduled + public final void updateScheduledTrue() { + scheduled = true; + } + + @OnUnscheduled + public final void updateScheduledFalse() { + scheduled = false; + } + + @Override + public final String getIdentifier() { + return identifier; + } + + @Override + public String toString() { + return description; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/processor/DataUnit.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/DataUnit.java b/nifi-api/src/main/java/org/apache/nifi/processor/DataUnit.java new file mode 100644 index 0000000..4980b97 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/processor/DataUnit.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public enum DataUnit { + + /** + * Bytes + */ + B { + @Override + public double toB(double value) { + return value; + } + + @Override + public double toKB(double value) { + return value / POWERS[1]; + } + + @Override + public double toMB(double value) { + return value / POWERS[2]; + } + + @Override + public double toGB(double value) { + return value / POWERS[3]; + } + + @Override + public double toTB(double value) { + return value / POWERS[4]; + } + + @Override + public double convert(double sourceSize, DataUnit sourceUnit) { + return sourceUnit.toB(sourceSize); + } + }, + /** + * Kilobytes + */ + KB { + @Override + public double toB(double value) { + return value * POWERS[1]; + } + + @Override + public double toKB(double value) { + return value; + } + + @Override + public double toMB(double value) { + return value / POWERS[1]; + } + + @Override + public double toGB(double value) { + return value / POWERS[2]; + } + + @Override + public double toTB(double value) { + return value / POWERS[3]; + } + + @Override + public double convert(double sourceSize, DataUnit sourceUnit) { + return sourceUnit.toKB(sourceSize); + } + }, + /** + * Megabytes + */ + MB { + @Override + public double toB(double value) { + return value * POWERS[2]; + } + + @Override + public double toKB(double value) { + return value * POWERS[1]; + } + + @Override + public double toMB(double value) { + return value; + } + + @Override + public double toGB(double value) { + return value / POWERS[1]; + } + + @Override + public double toTB(double value) { + return value / POWERS[2]; + } + + @Override + public double convert(double sourceSize, DataUnit sourceUnit) { + return sourceUnit.toMB(sourceSize); + } + }, + /** + * Gigabytes + */ + GB { + @Override + public double toB(double value) { + return value * POWERS[3]; + } + + @Override + public double toKB(double value) { + return value * POWERS[2]; + } + + @Override + public double toMB(double value) { + return value * POWERS[1]; + } + + @Override + public double toGB(double value) { + return value; + } + + @Override + public double toTB(double value) { + return value / POWERS[1]; + } + + @Override + public double convert(double sourceSize, DataUnit sourceUnit) { + return sourceUnit.toGB(sourceSize); + } + }, + /** + * Terabytes + */ + TB { + @Override + public double toB(double value) { + return value * POWERS[4]; + } + + @Override + public double toKB(double value) { + return value * POWERS[3]; + } + + @Override + public double toMB(double value) { + return value * POWERS[2]; + } + + @Override + public double toGB(double value) { + return value * POWERS[1]; + } + + @Override + public double toTB(double value) { + return value; + } + + @Override + public double convert(double sourceSize, DataUnit sourceUnit) { + return sourceUnit.toTB(sourceSize); + } + }; + + public double convert(final double sourceSize, final DataUnit sourceUnit) { + throw new AbstractMethodError(); + } + + public double toB(double size) { + throw new AbstractMethodError(); + } + + public double toKB(double size) { + throw new AbstractMethodError(); + } + + public double toMB(double size) { + throw new AbstractMethodError(); + } + + public double toGB(double size) { + throw new AbstractMethodError(); + } + + public double toTB(double size) { + throw new AbstractMethodError(); + } + + public static final double[] POWERS = {1, + 1024D, + 1024 * 1024D, + 1024 * 1024 * 1024D, + 1024 * 1024 * 1024 * 1024D}; + + public static final String DATA_SIZE_REGEX = "(\\d+(?:\\.\\d+)?)\\s*(B|KB|MB|GB|TB)"; + public static final Pattern DATA_SIZE_PATTERN = Pattern.compile(DATA_SIZE_REGEX); + + public static Double parseDataSize(final String value, final DataUnit units) { + if (value == null) { + return null; + } + + final Matcher matcher = DATA_SIZE_PATTERN.matcher(value.toUpperCase()); + if (!matcher.find()) { + throw new IllegalArgumentException("Invalid data size: " + value); + } + + final String sizeValue = matcher.group(1); + final String unitValue = matcher.group(2); + + final DataUnit sourceUnit = DataUnit.valueOf(unitValue); + final double size = Double.parseDouble(sizeValue); + return units.convert(size, sourceUnit); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/processor/FlowFileFilter.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/FlowFileFilter.java b/nifi-api/src/main/java/org/apache/nifi/processor/FlowFileFilter.java new file mode 100644 index 0000000..3bd6546 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/processor/FlowFileFilter.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor; + +import org.apache.nifi.flowfile.FlowFile; + +/** + * <p> + * FlowFileFilter provides a mechanism for selectively choosing which FlowFiles + * to obtain from a Processor's incoming connections. + * </p> + * + * <p> + * Implementations of this interface need not be thread-safe. + * </p> + */ +public interface FlowFileFilter { + + /** + * Indicates whether or not the given FlowFile should be selected and + * whether or not the Processor is interested in filtering additional + * FlowFiles + * + * @param flowFile to apply the filter to + * @return true if the given FlowFile should be selected and + * if Processor is interested in filtering additional + * FlowFiles + */ + FlowFileFilterResult filter(FlowFile flowFile); + + /** + * Provides a result type to indicate whether or not a FlowFile should be + * selected + */ + public static enum FlowFileFilterResult { + + /** + * Indicates that a FlowFile should be returned to the Processor to be + * processed and that additional FlowFiles should be processed by this + * filter. + */ + ACCEPT_AND_CONTINUE(true, true), + /** + * Indicates that a FlowFile should be returned to the Processor to be + * processed and that this is the last FlowFile that should be processed + * by this filter. + */ + ACCEPT_AND_TERMINATE(false, true), + /** + * Indicates that a FlowFile should not be processed by the Processor at + * this time but that additional FlowFiles should be processed by this + * filter. + */ + REJECT_AND_CONTINUE(true, false), + /** + * Indicates that a FlowFile should not be processed by the Processor at + * this time and that no additional FlowFiles should be processed + * either. + */ + REJECT_AND_TERMINATE(false, false); + + private final boolean continueProcessing; + private final boolean accept; + + private FlowFileFilterResult(final boolean continueProcessing, final boolean accept) { + this.continueProcessing = continueProcessing; + this.accept = accept; + } + + public boolean isAccept() { + return accept; + } + + public boolean isContinue() { + return continueProcessing; + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java new file mode 100644 index 0000000..c61a318 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor; + +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.controller.ControllerServiceLookup; + +/** + * <p> + * Provides a bridge between a Processor and the NiFi Framework + * </p> + * + * <p> + * <b>Note: </b>Implementations of this interface are NOT necessarily + * thread-safe. + * </p> + */ +public interface ProcessContext { + + /** + * Retrieves the current value set for the given descriptor, if a value is + * set - else uses the descriptor to determine the appropriate default value + * + * @param descriptor to lookup the value of + * @return the property value of the given descriptor + */ + PropertyValue getProperty(PropertyDescriptor descriptor); + + /** + * Retrieves the current value set for the given descriptor, if a value is + * set - else uses the descriptor to determine the appropriate default value + * + * @param propertyName of the property to lookup the value for + * @return property value as retrieved by property name + */ + PropertyValue getProperty(String propertyName); + + /** + * Creates and returns a {@link PropertyValue} object that can be used for + * evaluating the value of the given String + * + * @param rawValue the raw input before any property evaluation has occurred + * @return a {@link PropertyValue} object that can be used for + * evaluating the value of the given String + */ + PropertyValue newPropertyValue(String rawValue); + + /** + * <p> + * Causes the Processor not to be scheduled for some pre-configured amount + * of time. The duration of time for which the processor will not be + * scheduled is configured in the same manner as the processor's scheduling + * period. + * </p> + * + * <p> + * <b>Note: </b>This is NOT a blocking call and does not suspend execution + * of the current thread. + * </p> + */ + void yield(); + + /** + * @return the maximum number of threads that may be executing this + * processor's code at any given time + */ + int getMaxConcurrentTasks(); + + /** + * @return the annotation data configured for this processor + */ + String getAnnotationData(); + + /** + * @return a Map of all PropertyDescriptors to their configured values. This + * Map may or may not be modifiable, but modifying its values will not + * change the values of the processor's properties + */ + Map<PropertyDescriptor, String> getProperties(); + + /** + * Encrypts the given value using the password provided in the NiFi + * Properties + * + * @param unencrypted plaintext value + * @return encrypted value + */ + String encrypt(String unencrypted); + + /** + * Decrypts the given value using the password provided in the NiFi + * Properties + * + * @param encrypted the encrypted value + * @return the plaintext value + */ + String decrypt(String encrypted); + + /** + * @return a {@code ControllerServiceLookup} that can be used to obtain a + * Controller Service + */ + ControllerServiceLookup getControllerServiceLookup(); + + /** + * @return the set of all relationships for which space is available to + * receive new objects + */ + Set<Relationship> getAvailableRelationships(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java new file mode 100644 index 0000000..ed46d68 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java @@ -0,0 +1,736 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor; + +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Path; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.exception.FlowFileAccessException; +import org.apache.nifi.processor.exception.FlowFileHandlingException; +import org.apache.nifi.processor.exception.MissingFlowFileException; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.provenance.ProvenanceReporter; + +/** + * <p> + * A process session encompasses all the behaviors a processor can perform to + * obtain, clone, read, modify remove FlowFiles in an atomic unit. A process + * session is always tied to a single processor at any one time and ensures no + * FlowFile can ever be accessed by any more than one processor at a given time. + * The session also ensures that all FlowFiles are always accounted for. The + * creator of a ProcessSession is always required to manage the session.</p> + * + * <p> + * A session is not considered thread safe. The session supports a unit of work + * that is either committed or rolled back</p> + * + * <p> + * As noted on specific methods and for specific exceptions automated rollback + * will occur to ensure consistency of the repository. However, several + * situations can result in exceptions yet not cause automated rollback. In + * these cases the consistency of the repository will be retained but callers + * will be able to indicate whether it should result in rollback or continue on + * toward a commit.</p> + * + * <p> + * A process session instance may be used continuously. That is, after each + * commit or rollback, the session can be used again.</p> + * + */ +public interface ProcessSession { + + /** + * <p> + * Commits the current session ensuring all operations against FlowFiles + * within this session are atomically persisted. All FlowFiles operated on + * within this session must be accounted for by transfer or removal or the + * commit will fail.</p> + * + * <p> + * As soon as the commit completes the session is again ready to be used</p> + * + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session. + * @throws FlowFileHandlingException if not all FlowFiles acted upon within + * this session are accounted for by user code such that they have a + * transfer identified or where marked for removal. Automated rollback + * occurs. + * @throws ProcessException if some general fault occurs while persisting + * the session. Initiates automatic rollback. The root cause can be obtained + * via <code>Exception.getCause()</code> + */ + void commit(); + + /** + * Reverts any changes made during this session. All FlowFiles are restored + * back to their initial session state and back to their original queues. If + * this session is already committed or rolled back then no changes will + * occur. This method can be called any number of times. Calling this method + * is identical to calling {@link #rollback(boolean)} passing + * <code>false</code> as the parameter. + */ + void rollback(); + + /** + * Reverts any changes made during this session. All FlowFiles are restored + * back to their initial session state and back to their original queues, + * after optionally being penalized. If this session is already committed or + * rolled back then no changes will occur. This method can be called any + * number of times. + * + * @param penalize whether or not the FlowFiles that are being restored back + * to their queues should be penalized + */ + void rollback(boolean penalize); + + /** + * Adjusts counter data for the given counter name and takes care of + * registering the counter if not already present. The adjustment occurs + * only if and when the ProcessSession is committed. + * + * @param name the name of the counter + * @param delta the delta by which to modify the counter (+ or -) + * @param immediate if true, the counter will be updated immediately, + * without regard to whether the ProcessSession is commit or rolled back; + * otherwise, the counter will be incremented only if and when the + * ProcessSession is committed. + */ + void adjustCounter(String name, long delta, boolean immediate); + + /** + * @return FlowFile that is next highest priority FlowFile to process. + * Otherwise returns null. + */ + FlowFile get(); + + /** + * Returns up to <code>maxResults</code> FlowFiles from the work queue. If + * no FlowFiles are available, returns an empty list. Will not return null. + * If multiple incoming queues are present, the behavior is unspecified in + * terms of whether all queues or only a single queue will be polled in a + * single call. + * + * @param maxResults the maximum number of FlowFiles to return + * @return up to <code>maxResults</code> FlowFiles from the work queue. If + * no FlowFiles are available, returns an empty list. Will not return null. + * @throws IllegalArgumentException if <code>maxResults</code> is less than + * 0 + */ + List<FlowFile> get(int maxResults); + + /** + * <p> + * Returns all FlowFiles from all of the incoming queues for which the given + * {@link FlowFileFilter} indicates should be accepted. Calls to this method + * provide exclusive access to the underlying queues. I.e., no other thread + * will be permitted to pull FlowFiles from this Processor's queues or add + * FlowFiles to this Processor's incoming queues until this method call has + * returned. + * </p> + * + * @param filter to limit which flow files are returned + * @return all FlowFiles from all of the incoming queues for which the given + * {@link FlowFileFilter} indicates should be accepted. + */ + List<FlowFile> get(FlowFileFilter filter); + + /** + * @return the QueueSize that represents the number of FlowFiles and their + * combined data size for all FlowFiles waiting to be processed by the + * Processor that owns this ProcessSession, regardless of which Connection + * the FlowFiles live on + */ + QueueSize getQueueSize(); + + /** + * Creates a new FlowFile in the repository with no content and without any + * linkage to a parent FlowFile. This method is appropriate only when data + * is received or created from an external system. Otherwise, this method + * should be avoided and should instead use {@link #create(FlowFile)} or + * {@see #create(Collection)}. + * + * When this method is used, a Provenance CREATE or RECEIVE Event should be + * generated. See the {@link #getProvenanceReporter()} method and + * {@link ProvenanceReporter} class for more information + * + * @return newly created FlowFile + */ + FlowFile create(); + + /** + * Creates a new FlowFile in the repository with no content but with a + * parent linkage to <code>parent</code>. The newly created FlowFile will + * inherit all of the parent's attributes except for the UUID. This method + * will automatically generate a Provenance FORK event or a Provenance JOIN + * event, depending on whether or not other FlowFiles are generated from the + * same parent before the ProcessSession is committed. + * + * @param parent to base the new flowfile on + * @return newly created flowfile + */ + FlowFile create(FlowFile parent); + + /** + * Creates a new FlowFile in the repository with no content but with a + * parent linkage to the FlowFiles specified by the parents Collection. The + * newly created FlowFile will inherit all of the attributes that are in + * common to all parents (except for the UUID, which will be in common if + * only a single parent exists). This method will automatically generate a + * Provenance JOIN event. + * + * @param parents which the new flowfile should inherit shared attributes from + * @return new flowfile + */ + FlowFile create(Collection<FlowFile> parents); + + /** + * Creates a new FlowFile that is a clone of the given FlowFile as of the + * time this is called, both in content and attributes. This method + * automatically emits a Provenance CLONE Event. + * + * @param example FlowFile to be the source of cloning - given FlowFile must + * be a part of the given session + * @return FlowFile that is a clone of the given example + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content + * @throws NullPointerException if the argument null + */ + FlowFile clone(FlowFile example); + + /** + * Creates a new FlowFile whose parent is the given FlowFile. The content of + * the new FlowFile will be a subset of the byte sequence of the given + * FlowFile starting at the specified offset and with the length specified. + * The new FlowFile will contain all of the attributes of the original. This + * method automatically emits a Provenance FORK Event (or a Provenance CLONE + * Event, if the offset is 0 and the size is exactly equal to the size of + * the example FlowFile). + * + * @param parent to base the new flowfile attributes on + * @param offset of the parent flowfile to base the child flowfile content on + * @param size of the new flowfile from the offset + * @return a FlowFile with the specified size whose parent is first argument + * to this function + * + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session, or if the + * specified offset + size exceeds that of the size of the parent FlowFile. + * Automatic rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + */ + FlowFile clone(FlowFile parent, long offset, long size); + + /** + * Sets a penalty for the given FlowFile which will make it unavailable to + * be operated on any further during the penalty period. + * + * @param flowFile to penalize + * @return FlowFile the new FlowFile reference to use + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws NullPointerException if the argument null + */ + FlowFile penalize(FlowFile flowFile); + + /** + * Updates the given FlowFiles attributes with the given key/value pair. If + * the key is named {@code uuid}, this attribute will be ignored. + * + * @param flowFile to update + * @param key of attribute + * @param value of attribute + * @return FlowFile the updated FlowFile + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws NullPointerException if an argument is null + */ + FlowFile putAttribute(FlowFile flowFile, String key, String value); + + /** + * Updates the given FlowFiles attributes with the given key/value pairs. If + * the map contains a key named {@code uuid}, this attribute will be + * ignored. + * + * @param flowFile to update + * @param attributes the attributes to add to the given FlowFile + * @return FlowFile the updated FlowFile + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws NullPointerException if an argument is null + */ + FlowFile putAllAttributes(FlowFile flowFile, Map<String, String> attributes); + + /** + * Removes the given FlowFile attribute with the given key. If the key is + * named {@code uuid}, this method will return the same FlowFile without + * removing any attribute. + * + * @param flowFile to update + * @param key of attribute + * @return FlowFile the updated FlowFile + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws NullPointerException if the argument null + */ + FlowFile removeAttribute(FlowFile flowFile, String key); + + /** + * Removes the attributes with the given keys from the given FlowFile. If + * the set of keys contains the value {@code uuid}, this key will be ignored + * + * @param flowFile to update + * @param keys of attribute + * @return FlowFile the updated FlowFile + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws NullPointerException if the argument null + */ + FlowFile removeAllAttributes(FlowFile flowFile, Set<String> keys); + + /** + * Remove all attributes from the given FlowFile that have keys which match + * the given pattern. If the pattern matches the key {@code uuid}, this key + * will not be removed. + * + * @param flowFile to update + * @param keyPattern may be null; if supplied is matched against each of the + * FlowFile attribute keys + * @return FlowFile containing only attributes which did not meet the key + * pattern + */ + FlowFile removeAllAttributes(FlowFile flowFile, Pattern keyPattern); + + /** + * Transfers the given FlowFile to the appropriate destination processor + * work queue(s) based on the given relationship. If the relationship leads + * to more than one destination the state of the FlowFile is replicated such + * that each destination receives an exact copy of the FlowFile though each + * will have its own unique identity. The destination processors will not be + * able to operate on the given FlowFile until this session is committed or + * until the ownership of the session is migrated to another processor. If + * ownership of the session is passed to a destination processor then that + * destination processor will have immediate visibility of the transferred + * FlowFiles within the session. + * + * @param flowFile to transfer + * @param relationship to transfer to + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws NullPointerException if the argument null + * @throws IllegalArgumentException if given relationship is not a known or + * registered relationship + */ + void transfer(FlowFile flowFile, Relationship relationship); + + /** + * Transfers the given FlowFile back to the work queue from which it was + * pulled. The processor will not be able to operate on the given FlowFile + * until this session is committed. Any modifications that have been made to + * the FlowFile will be maintained. FlowFiles that are created by the + * processor cannot be transferred back to themselves via this method. + * + * @param flowFile to transfer + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws IllegalArgumentException if the FlowFile was created by this + * processor + * @throws NullPointerException if the argument null + */ + void transfer(FlowFile flowFile); + + /** + * Transfers the given FlowFiles back to the work queues from which the + * FlowFiles were pulled. The processor will not be able to operate on the + * given FlowFile until this session is committed. Any modifications that + * have been made to the FlowFile will be maintained. FlowFiles that are + * created by the processor cannot be transferred back to themselves via + * this method. + * + * @param flowFiles to transfer + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFiles are already + * transferred or removed or don't belong to this session. Automatic + * rollback will occur. + * @throws IllegalArgumentException if the FlowFile was created by this + * processor + * @throws NullPointerException if the argument null + */ + void transfer(Collection<FlowFile> flowFiles); + + /** + * Transfers the given FlowFile to the appropriate destination processor + * work queue(s) based on the given relationship. If the relationship leads + * to more than one destination the state of the FlowFile is replicated such + * that each destination receives an exact copy of the FlowFile though each + * will have its own unique identity. The destination processors will not be + * able to operate on the given FlowFile until this session is committed or + * until the ownership of the session is migrated to another processor. If + * ownership of the session is passed to a destination processor then that + * destination processor will have immediate visibility of the transferred + * FlowFiles within the session. + * + * @param flowFiles to transfer + * @param relationship to transfer to + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws NullPointerException if the argument null + * @throws IllegalArgumentException if given relationship is not a known or + * registered relationship + */ + void transfer(Collection<FlowFile> flowFiles, Relationship relationship); + + /** + * Ends the managed persistence for the given FlowFile. The persistent + * attributes for the FlowFile are deleted and so is the content assuming + * nothing else references it and this FlowFile will no longer be available + * for further operation. + * + * @param flowFile to remove + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + */ + void remove(FlowFile flowFile); + + /** + * Ends the managed persistence for the given FlowFiles. The persistent + * attributes for the FlowFile are deleted and so is the content assuming + * nothing else references it and this FlowFile will no longer be available + * for further operation. + * + * @param flowFiles to remove + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if any of the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + */ + void remove(Collection<FlowFile> flowFiles); + + /** + * Executes the given callback against the contents corresponding to the + * given FlowFile. + * + * <i>Note</i>: The OutputStream provided to the given OutputStreamCallback + * will not be accessible once this method has completed its execution. + * + * @param source flowfile to retrieve content of + * @param reader that will be called to read the flowfile content + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content; if an attempt is made to access the InputStream + * provided to the given InputStreamCallback after this method completed its + * execution + */ + void read(FlowFile source, InputStreamCallback reader) throws FlowFileAccessException; + + /** + * Combines the content of all given source FlowFiles into a single given + * destination FlowFile. + * + * @param sources the flowfiles to merge + * @param destination the flowfile to use as the merged result + * @return updated destination FlowFile (new size, etc...) + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws IllegalArgumentException if the given destination is contained + * within the sources + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content. The state of the destination will be as it was prior to + * this call. + */ + FlowFile merge(Collection<FlowFile> sources, FlowFile destination); + + /** + * Combines the content of all given source FlowFiles into a single given + * destination FlowFile. + * + * @param sources to merge together + * @param destination to merge to + * @param header bytes that will be added to the beginning of the merged + * output. May be null or empty. + * @param footer bytes that will be added to the end of the merged output. + * May be null or empty. + * @param demarcator bytes that will be placed in between each object merged + * together. May be null or empty. + * @return updated destination FlowFile (new size, etc...) + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws IllegalArgumentException if the given destination is contained + * within the sources + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content. The state of the destination will be as it was prior to + * this call. + */ + FlowFile merge(Collection<FlowFile> sources, FlowFile destination, byte[] header, byte[] footer, byte[] demarcator); + + /** + * Executes the given callback against the content corresponding to the + * given FlowFile. + * + * <i>Note</i>: The OutputStream provided to the given OutputStreamCallback + * will not be accessible once this method has completed its execution. + * + * @param source to write to + * @param writer used to write new content + * @return updated FlowFile + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be referenced, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content; if an attempt is made to access the OutputStream + * provided to the given OutputStreamCallaback after this method completed + * its execution + */ + FlowFile write(FlowFile source, OutputStreamCallback writer) throws FlowFileAccessException; + + /** + * Executes the given callback against the content corresponding to the + * given flow file. + * + * <i>Note</i>: The InputStream & OutputStream provided to the given + * StreamCallback will not be accessible once this method has completed its + * execution. + * + * @param source to read from and write to + * @param writer used to read the old content and write new content + * @return updated FlowFile + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content; if an attempt is made to access the InputStream or + * OutputStream provided to the given StreamCallback after this method + * completed its execution + */ + FlowFile write(FlowFile source, StreamCallback writer) throws FlowFileAccessException; + + /** + * Executes the given callback against the content corresponding to the + * given FlowFile, such that any data written to the OutputStream of the + * content will be appended to the end of FlowFile. + * + * <i>Note</i>: The OutputStream provided to the given OutputStreamCallback + * will not be accessible once this method has completed its execution. + * + * @param source the flowfile for which content should be appended + * @param writer used to write new bytes to the flowfile content + * @return the updated flowfile reference for the new content + * @throws FlowFileAccessException if an attempt is made to access the + * OutputStream provided to the given OutputStreamCallaback after this + * method completed its execution + */ + FlowFile append(FlowFile source, OutputStreamCallback writer) throws FlowFileAccessException; + + /** + * Writes to the given FlowFile all content from the given content path. + * + * @param source the file from which content will be obtained + * @param keepSourceFile if true the content is simply copied; if false the + * original content might be used in a destructive way for efficiency such + * that the repository will have the data but the original data will be + * gone. If false the source object will be removed or gone once imported. + * It will not be restored if the session is rolled back so this must be + * used with caution. In some cases it can result in tremendous efficiency + * gains but is also dangerous. + * @param destination the FlowFile whose content will be updated + * @return the updated destination FlowFile (new size) + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content + */ + FlowFile importFrom(Path source, boolean keepSourceFile, FlowFile destination); + + /** + * Writes to the given FlowFile all content from the given content path. + * + * @param source the file from which content will be obtained + * @param destination the FlowFile whose content will be updated + * @return the updated destination FlowFile (new size) + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content + */ + FlowFile importFrom(InputStream source, FlowFile destination); + + /** + * Writes the content of the given FlowFile to the given destination path. + * + * @param flowFile to export the content of + * @param destination to export the content to + * @param append if true will append to the current content at the given + * path; if false will replace any current content + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content + */ + void exportTo(FlowFile flowFile, Path destination, boolean append); + + /** + * Writes the content of the given FlowFile to the given destination stream + * + * @param flowFile to export the content of + * @param destination to export the content to + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s) + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be reference, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content + */ + void exportTo(FlowFile flowFile, OutputStream destination); + + /** + * Returns a ProvenanceReporter that is tied to this ProcessSession. + * + * @return the provenance reporter + */ + ProvenanceReporter getProvenanceReporter(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSessionFactory.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSessionFactory.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSessionFactory.java new file mode 100644 index 0000000..06c0479 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSessionFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor; + +/** + * <p> + * Factory for creating a {@link ProcessSession} + * </p> + * + * <p> + * MUST BE THREAD-SAFE</p> + */ +public interface ProcessSessionFactory { + + ProcessSession createSession(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/processor/Processor.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/Processor.java b/nifi-api/src/main/java/org/apache/nifi/processor/Processor.java new file mode 100644 index 0000000..53c7c70 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/processor/Processor.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor; + +import java.util.Set; + +import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.processor.exception.ProcessException; + +/** + * <p> + * Processor objects operate on FlowFile objects where the processors are linked + * together via relationships forming a directed graph structure.</p> + * + * <p> + * The validity of a processor once established is considered safe until a + * configuration property of that processor is changed. Even if the property + * change is itself valid it still brings the validity of the processor as a + * whole into question. Therefore changing a processor as a whole must be + * reverified. Also, changing any configuration property of a processor can also + * imply that its supported relationships have changed.</p> + * + * <p> + * Each Processor object is a single node in a flow graph. The same processor + * object on a graph may be called concurrently to update its configuration + * state and to perform 'onTrigger' actions. The framework also provides + * numerous hooks via annotations that subclasses can use to control the + * lifecycle of a given processor instance.</p> + * + * <p> + * Processor objects are expected to be thread-safe and must have a public + * default no-args constructor to facilitate the java service loader + * mechanism.</p> + * + */ +public interface Processor extends ConfigurableComponent { + + /** + * Provides the processor with access to objects that may be of use + * throughout the life of the Processor + * + * @param context of initialization + */ + void initialize(ProcessorInitializationContext context); + + /** + * @return Set of all relationships this processor expects to transfer a + * flow file to. An empty set indicates this processor does not have any + * destination relationships. Guaranteed non null. + */ + Set<Relationship> getRelationships(); + + /** + * <p> + * The method called when this processor is triggered to operate by the + * controller. This method may be called concurrently from different + * threads. When this method is called depends on how this processor is + * configured within a controller to be triggered (timing or event + * based).</p> + * + * @param context provides access to convenience methods for obtaining + * property values, delaying the scheduling of the processor, provides + * access to Controller Services, etc. + * @param sessionFactory provides access to a {@link ProcessSession}, which + * can be used for accessing FlowFiles, etc. + * + * @throws ProcessException if processing did not complete normally though + * indicates the problem is an understood potential outcome of processing. + * The controller/caller will handle these exceptions gracefully such as + * logging, etc.. If another type of exception is allowed to propagate the + * controller may no longer trigger this processor to operate, as this would + * indicate a probable coding defect. + */ + void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException; + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/processor/ProcessorInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessorInitializationContext.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessorInitializationContext.java new file mode 100644 index 0000000..7b09e1b --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessorInitializationContext.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor; + +import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.logging.ProcessorLog; + +/** + * <p> + * The <code>ProcessorInitializationContext</code> provides + * {@link nifi.processor.Processor Processor}s access to objects that may be of + * use throughout the life of the Processor. + * </p> + */ +public interface ProcessorInitializationContext { + + /** + * @return the unique identifier for this processor + */ + String getIdentifier(); + + /** + * @return a {@link ProcessorLog} that is tied to this processor that can be + * used to log events + */ + ProcessorLog getLogger(); + + /** + * @return the {@link ControllerServiceLookup} which can be used to obtain + * Controller Services + */ + ControllerServiceLookup getControllerServiceLookup(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/processor/QueueSize.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/QueueSize.java b/nifi-api/src/main/java/org/apache/nifi/processor/QueueSize.java new file mode 100644 index 0000000..c3c2ccc --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/processor/QueueSize.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor; + +/** + * + */ +public class QueueSize { + + private final int objectCount; + private final long totalSizeBytes; + + public QueueSize(final int numberObjects, final long totalSizeBytes) { + if (numberObjects < 0 || totalSizeBytes < 0) { + throw new IllegalArgumentException(); + } + objectCount = numberObjects; + this.totalSizeBytes = totalSizeBytes; + } + + /** + * @return number of objects present on the queue + */ + public int getObjectCount() { + return objectCount; + } + + /** + * @return total size in bytes of the content for the data on the queue + */ + public long getByteCount() { + return totalSizeBytes; + } +}