http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordType.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordType.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordType.java new file mode 100644 index 0000000..ff8dc50 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordType.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository; + +/** + * + */ +public enum RepositoryRecordType { + + UPDATE, CREATE, DELETE, CONTENTMISSING, SWAP_IN, SWAP_OUT; +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java new file mode 100644 index 0000000..53cc44f --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository.claim; + +/** + * <p> + * A ContentClaim is a reference to a given flow file's content. Multiple flow + * files may reference the same content by both having the same content + * claim.</p> + * + * <p> + * Must be thread safe</p> + * + */ +public interface ContentClaim extends Comparable<ContentClaim> { + + /** + * @return the unique identifier for this claim + */ + String getId(); + + /** + * @return the container identifier in which this claim is held + */ + String getContainer(); + + /** + * @return the section within a given container the claim is held + */ + String getSection(); + + /** + * @return Indicates whether or not the Claim is loss-tolerant. If so, we will + * attempt to keep the content but will not sacrifice a great deal of + * performance to do so + */ + boolean isLossTolerant(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java new file mode 100644 index 0000000..bffcec3 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository.claim; + +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +/** + * Responsible for managing all ContentClaims that are used in the application + */ +public interface ContentClaimManager { + + /** + * Creates a new Content Claim with the given id, container, section, and + * loss tolerance. + * + * @param id of claim + * @param container of claim + * @param section of claim + * @param lossTolerant of claim + * @return new claim + */ + ContentClaim newContentClaim(String container, String section, String id, boolean lossTolerant); + + /** + * @param claim to obtain reference count for + * @return the number of FlowFiles that hold a claim to a particular piece + * of FlowFile content + */ + int getClaimantCount(ContentClaim claim); + + /** + * Decreases by 1 the count of how many FlowFiles hold a claim to a + * particular piece of FlowFile content and returns the new count + * + * @param claim to decrement claimants on + * @return new claimaint count + */ + int decrementClaimantCount(ContentClaim claim); + + /** + * Increases by 1 the count of how many FlowFiles hold a claim to a + * particular piece of FlowFile content and returns the new count + * + * @param claim to increment claims on + * @return new claimant count + */ + int incrementClaimantCount(ContentClaim claim); + + /** + * Increases by 1 the count of how many FlowFiles hold a claim to a + * particular piece of FlowFile content and returns the new count. + * + * If it is known that the Content Claim whose count is being incremented is + * a newly created ContentClaim, this method should be called with a value + * of {@code true} as the second argument, as it may allow the manager to + * optimize its tasks, knowing that the Content Claim cannot be referenced + * by any other component + * + * @param claim to increment + * @param newClaim provides a hint that no other process can have access to this + * claim right now + * @return new claim count + */ + int incrementClaimantCount(ContentClaim claim, boolean newClaim); + + /** + * Indicates that the given ContentClaim can now be destroyed by the + * appropriate Content Repository. This should be done only after it is + * guaranteed that the FlowFile Repository has been synchronized with its + * underlying storage component. This way, we avoid the following sequence + * of events: + * <ul> + * <li>FlowFile Repository is updated to indicate that FlowFile F no longer + * depends on ContentClaim C</li> + * <li>ContentClaim C is no longer needed and is destroyed</li> + * <li>The Operating System crashes or there is a power failure</li> + * <li>Upon restart, the FlowFile Repository was not synchronized with its + * underlying storage mechanism and as such indicates that FlowFile F needs + * ContentClaim C.</li> + * <li>Since ContentClaim C has already been destroyed, it is inaccessible, + * and FlowFile F's Content is not found, so the FlowFile is removed, + * resulting in data loss.</li> + * </ul> + * + * <p> + * Using this method of marking the ContentClaim as destructable only when + * the FlowFile repository has been synced with the underlying storage + * mechanism, we can ensure that on restart, we will not point to this + * unneeded claim. As such, it is now safe to destroy the contents. + * </p> + * + * @param claim to mark as now destructable + */ + void markDestructable(ContentClaim claim); + + /** + * Drains up to {@code maxElements} Content Claims from the internal queue + * of destructable content claims to the given {@code destination} so that + * they can be destroyed. + * + * @param destination to drain to + * @param maxElements max items to drain + */ + void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements); + + /** + * Drains up to {@code maxElements} Content Claims from the internal queue + * of destructable content claims to the given {@code destination} so that + * they can be destroyed. If no ContentClaim is ready to be destroyed at + * this time, will wait up to the specified amount of time before returning. + * If, after the specified amount of time, there is still no ContentClaim + * ready to be destroyed, the method will return without having added + * anything to the given {@code destination}. + * + * @param destination to drain to + * @param maxElements max items to drain + * @param timeout maximum time to wait + * @param unit unit of time to wait + */ + void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements, long timeout, TimeUnit unit); + + /** + * Clears the manager's memory of any and all ContentClaims that it knows + * about + */ + void purge(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java new file mode 100644 index 0000000..cebb2b4 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status; + +/** + */ +public class ConnectionStatus implements Cloneable { + + private String id; + private String groupId; + private String name; + private String sourceId; + private String sourceName; + private String destinationId; + private String destinationName; + private int inputCount; + private long inputBytes; + private int queuedCount; + private long queuedBytes; + private int outputCount; + private long outputBytes; + + public String getId() { + return id; + } + + public void setId(final String id) { + this.id = id; + } + + public String getGroupId() { + return groupId; + } + + public void setGroupId(final String groupId) { + this.groupId = groupId; + } + + public int getQueuedCount() { + return queuedCount; + } + + public void setQueuedCount(final int queuedCount) { + this.queuedCount = queuedCount; + } + + public long getQueuedBytes() { + return queuedBytes; + } + + public void setQueuedBytes(final long queuedBytes) { + this.queuedBytes = queuedBytes; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getSourceId() { + return sourceId; + } + + public void setSourceId(String sourceId) { + this.sourceId = sourceId; + } + + public String getSourceName() { + return sourceName; + } + + public void setSourceName(String sourceName) { + this.sourceName = sourceName; + } + + public String getDestinationId() { + return destinationId; + } + + public void setDestinationId(String destinationId) { + this.destinationId = destinationId; + } + + public String getDestinationName() { + return destinationName; + } + + public void setDestinationName(String destinationName) { + this.destinationName = destinationName; + } + + public long getInputBytes() { + return inputBytes; + } + + public void setInputBytes(long inputBytes) { + this.inputBytes = inputBytes; + } + + public int getInputCount() { + return inputCount; + } + + public void setInputCount(int inputCount) { + this.inputCount = inputCount; + } + + public long getOutputBytes() { + return outputBytes; + } + + public void setOutputBytes(long outputBytes) { + this.outputBytes = outputBytes; + } + + public int getOutputCount() { + return outputCount; + } + + public void setOutputCount(int outputCount) { + this.outputCount = outputCount; + } + + @Override + public ConnectionStatus clone() { + final ConnectionStatus clonedObj = new ConnectionStatus(); + clonedObj.groupId = groupId; + clonedObj.id = id; + clonedObj.inputBytes = inputBytes; + clonedObj.inputCount = inputCount; + clonedObj.name = name; + clonedObj.outputBytes = outputBytes; + clonedObj.outputCount = outputCount; + clonedObj.queuedBytes = queuedBytes; + clonedObj.queuedCount = queuedCount; + clonedObj.sourceId = sourceId; + clonedObj.sourceName = sourceName; + clonedObj.destinationId = destinationId; + clonedObj.destinationName = destinationName; + return clonedObj; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("ConnectionStatus [id="); + builder.append(id); + builder.append(", groupId="); + builder.append(groupId); + builder.append(", name="); + builder.append(name); + builder.append(", sourceId="); + builder.append(sourceId); + builder.append(", sourceName="); + builder.append(sourceName); + builder.append(", destinationId="); + builder.append(destinationId); + builder.append(", destinationName="); + builder.append(destinationName); + builder.append(", inputCount="); + builder.append(inputCount); + builder.append(", inputBytes="); + builder.append(inputBytes); + builder.append(", queuedCount="); + builder.append(queuedCount); + builder.append(", queuedBytes="); + builder.append(queuedBytes); + builder.append(", outputCount="); + builder.append(outputCount); + builder.append(", outputBytes="); + builder.append(outputBytes); + builder.append("]"); + return builder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/PortStatus.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/PortStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/PortStatus.java new file mode 100644 index 0000000..0d248cd --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/PortStatus.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status; + +/** + * The status of a port. + */ +public class PortStatus implements Cloneable { + + private String id; + private String groupId; + private String name; + private Integer activeThreadCount; + private int inputCount; + private long inputBytes; + private int outputCount; + private long outputBytes; + private int flowFilesReceived; + private long bytesReceived; + private int flowFilesSent; + private long bytesSent; + private Boolean transmitting; + private RunStatus runStatus; + + public Boolean isTransmitting() { + return transmitting; + } + + public void setTransmitting(Boolean transmitting) { + this.transmitting = transmitting; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Integer getActiveThreadCount() { + return activeThreadCount; + } + + public void setActiveThreadCount(Integer activeThreadCount) { + this.activeThreadCount = activeThreadCount; + } + + public int getInputCount() { + return inputCount; + } + + public void setInputCount(int inputCount) { + this.inputCount = inputCount; + } + + public long getInputBytes() { + return inputBytes; + } + + public void setInputBytes(long inputBytes) { + this.inputBytes = inputBytes; + } + + public int getOutputCount() { + return outputCount; + } + + public void setOutputCount(final int outputCount) { + this.outputCount = outputCount; + } + + public long getOutputBytes() { + return outputBytes; + } + + public void setOutputBytes(final long outputBytes) { + this.outputBytes = outputBytes; + } + + public RunStatus getRunStatus() { + return runStatus; + } + + public void setRunStatus(RunStatus runStatus) { + this.runStatus = runStatus; + } + + public int getFlowFilesReceived() { + return flowFilesReceived; + } + + public void setFlowFilesReceived(int flowFilesReceived) { + this.flowFilesReceived = flowFilesReceived; + } + + public long getBytesReceived() { + return bytesReceived; + } + + public void setBytesReceived(long bytesReceived) { + this.bytesReceived = bytesReceived; + } + + public int getFlowFilesSent() { + return flowFilesSent; + } + + public void setFlowFilesSent(int flowFilesSent) { + this.flowFilesSent = flowFilesSent; + } + + public long getBytesSent() { + return bytesSent; + } + + public void setBytesSent(long bytesSent) { + this.bytesSent = bytesSent; + } + + public Boolean getTransmitting() { + return transmitting; + } + + @Override + public PortStatus clone() { + final PortStatus clonedObj = new PortStatus(); + clonedObj.id = id; + clonedObj.groupId = groupId; + clonedObj.name = name; + clonedObj.activeThreadCount = activeThreadCount; + clonedObj.inputBytes = inputBytes; + clonedObj.inputCount = inputCount; + clonedObj.outputBytes = outputBytes; + clonedObj.outputCount = outputCount; + clonedObj.flowFilesReceived = flowFilesReceived; + clonedObj.bytesReceived = bytesReceived; + clonedObj.flowFilesSent = flowFilesSent; + clonedObj.bytesSent = bytesSent; + clonedObj.transmitting = transmitting; + clonedObj.runStatus = runStatus; + return clonedObj; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("PortStatus [id="); + builder.append(id); + builder.append(", groupId="); + builder.append(groupId); + builder.append(", name="); + builder.append(name); + builder.append(", activeThreadCount="); + builder.append(activeThreadCount); + builder.append(", transmitting="); + builder.append(transmitting); + builder.append(", inputCount="); + builder.append(inputCount); + builder.append(", inputBytes="); + builder.append(inputBytes); + builder.append(", outputCount="); + builder.append(outputCount); + builder.append(", outputBytes="); + builder.append(outputBytes); + builder.append(", runStatus="); + builder.append(runStatus); + builder.append("]"); + return builder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java new file mode 100644 index 0000000..eb0339f --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java @@ -0,0 +1,584 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + */ +public class ProcessGroupStatus implements Cloneable { + + private String id; + private String name; + private Integer inputCount; + private Long inputContentSize; + private Integer outputCount; + private Long outputContentSize; + private long creationTimestamp; + private Integer activeThreadCount; + private Integer queuedCount; + private Long queuedContentSize; + private Long bytesRead; + private Long bytesWritten; + private int flowFilesReceived; + private long bytesReceived; + private int flowFilesSent; + private long bytesSent; + private int flowFilesTransferred; + private long bytesTransferred; + + private Collection<ConnectionStatus> connectionStatus = new ArrayList<>(); + private Collection<ProcessorStatus> processorStatus = new ArrayList<>(); + private Collection<ProcessGroupStatus> processGroupStatus = new ArrayList<>(); + private Collection<RemoteProcessGroupStatus> remoteProcessGroupStatus = new ArrayList<>(); + private Collection<PortStatus> inputPortStatus = new ArrayList<>(); + private Collection<PortStatus> outputPortStatus = new ArrayList<>(); + + public String getId() { + return id; + } + + public void setId(final String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Integer getInputCount() { + return inputCount; + } + + public void setInputCount(final Integer inputCount) { + this.inputCount = inputCount; + } + + public Long getInputContentSize() { + return inputContentSize; + } + + public void setInputContentSize(final Long inputContentSize) { + this.inputContentSize = inputContentSize; + } + + public Integer getOutputCount() { + return outputCount; + } + + public void setOutputCount(final Integer outputCount) { + this.outputCount = outputCount; + } + + public Long getOutputContentSize() { + return outputContentSize; + } + + public void setOutputContentSize(final Long outputContentSize) { + this.outputContentSize = outputContentSize; + } + + public Long getBytesRead() { + return bytesRead; + } + + public void setBytesRead(final Long bytesRead) { + this.bytesRead = bytesRead; + } + + public Long getBytesWritten() { + return bytesWritten; + } + + public void setBytesWritten(final Long bytesWritten) { + this.bytesWritten = bytesWritten; + } + + public Integer getQueuedCount() { + return this.queuedCount; + } + + public void setQueuedCount(final Integer queuedCount) { + this.queuedCount = queuedCount; + } + + public Long getQueuedContentSize() { + return queuedContentSize; + } + + public void setQueuedContentSize(final Long queuedContentSize) { + this.queuedContentSize = queuedContentSize; + } + + public long getCreationTimestamp() { + return creationTimestamp; + } + + public void setCreationTimestamp(final long creationTimestamp) { + this.creationTimestamp = creationTimestamp; + } + + public Integer getActiveThreadCount() { + return activeThreadCount; + } + + public void setActiveThreadCount(final Integer activeThreadCount) { + this.activeThreadCount = activeThreadCount; + } + + public Collection<ConnectionStatus> getConnectionStatus() { + return connectionStatus; + } + + public void setConnectionStatus(final Collection<ConnectionStatus> connectionStatus) { + this.connectionStatus = connectionStatus; + } + + public Collection<ProcessorStatus> getProcessorStatus() { + return processorStatus; + } + + public void setProcessorStatus(final Collection<ProcessorStatus> processorStatus) { + this.processorStatus = processorStatus; + } + + public Collection<ProcessGroupStatus> getProcessGroupStatus() { + return processGroupStatus; + } + + public void setProcessGroupStatus(final Collection<ProcessGroupStatus> processGroupStatus) { + this.processGroupStatus = processGroupStatus; + } + + public Collection<PortStatus> getInputPortStatus() { + return inputPortStatus; + } + + public void setInputPortStatus(Collection<PortStatus> inputPortStatus) { + this.inputPortStatus = inputPortStatus; + } + + public Collection<PortStatus> getOutputPortStatus() { + return outputPortStatus; + } + + public void setOutputPortStatus(Collection<PortStatus> outputPortStatus) { + this.outputPortStatus = outputPortStatus; + } + + public Collection<RemoteProcessGroupStatus> getRemoteProcessGroupStatus() { + return remoteProcessGroupStatus; + } + + public void setRemoteProcessGroupStatus(final Collection<RemoteProcessGroupStatus> remoteProcessGroupStatus) { + this.remoteProcessGroupStatus = remoteProcessGroupStatus; + } + + public int getFlowFilesReceived() { + return flowFilesReceived; + } + + public void setFlowFilesReceived(final int flowFilesReceived) { + this.flowFilesReceived = flowFilesReceived; + } + + public long getBytesReceived() { + return bytesReceived; + } + + public void setBytesReceived(final long bytesReceived) { + this.bytesReceived = bytesReceived; + } + + public int getFlowFilesSent() { + return flowFilesSent; + } + + public void setFlowFilesSent(final int flowFilesSent) { + this.flowFilesSent = flowFilesSent; + } + + public long getBytesSent() { + return bytesSent; + } + + public void setBytesSent(final long bytesSent) { + this.bytesSent = bytesSent; + } + + public int getFlowFilesTransferred() { + return flowFilesTransferred; + } + + public void setFlowFilesTransferred(int flowFilesTransferred) { + this.flowFilesTransferred = flowFilesTransferred; + } + + public long getBytesTransferred() { + return bytesTransferred; + } + + public void setBytesTransferred(long bytesTransferred) { + this.bytesTransferred = bytesTransferred; + } + + @Override + public ProcessGroupStatus clone() { + + final ProcessGroupStatus clonedObj = new ProcessGroupStatus(); + + clonedObj.creationTimestamp = creationTimestamp; + clonedObj.id = id; + clonedObj.name = name; + clonedObj.outputContentSize = outputContentSize; + clonedObj.outputCount = outputCount; + clonedObj.inputContentSize = inputContentSize; + clonedObj.inputCount = inputCount; + clonedObj.activeThreadCount = activeThreadCount; + clonedObj.queuedContentSize = queuedContentSize; + clonedObj.queuedCount = queuedCount; + clonedObj.bytesRead = bytesRead; + clonedObj.bytesWritten = bytesWritten; + clonedObj.flowFilesReceived = flowFilesReceived; + clonedObj.bytesReceived = bytesReceived; + clonedObj.flowFilesSent = flowFilesSent; + clonedObj.bytesSent = bytesSent; + clonedObj.flowFilesTransferred = flowFilesTransferred; + clonedObj.bytesTransferred = bytesTransferred; + + if (connectionStatus != null) { + final Collection<ConnectionStatus> statusList = new ArrayList<>(); + clonedObj.setConnectionStatus(statusList); + for (final ConnectionStatus status : connectionStatus) { + statusList.add(status.clone()); + } + } + + if (processorStatus != null) { + final Collection<ProcessorStatus> statusList = new ArrayList<>(); + clonedObj.setProcessorStatus(statusList); + for (final ProcessorStatus status : processorStatus) { + statusList.add(status.clone()); + } + } + + if (inputPortStatus != null) { + final Collection<PortStatus> statusList = new ArrayList<>(); + clonedObj.setInputPortStatus(statusList); + for (final PortStatus status : inputPortStatus) { + statusList.add(status.clone()); + } + } + + if (outputPortStatus != null) { + final Collection<PortStatus> statusList = new ArrayList<>(); + clonedObj.setOutputPortStatus(statusList); + for (final PortStatus status : outputPortStatus) { + statusList.add(status.clone()); + } + } + + if (processGroupStatus != null) { + final Collection<ProcessGroupStatus> statusList = new ArrayList<>(); + clonedObj.setProcessGroupStatus(statusList); + for (final ProcessGroupStatus status : processGroupStatus) { + statusList.add(status.clone()); + } + } + + if (remoteProcessGroupStatus != null) { + final Collection<RemoteProcessGroupStatus> statusList = new ArrayList<>(); + clonedObj.setRemoteProcessGroupStatus(statusList); + for (final RemoteProcessGroupStatus status : remoteProcessGroupStatus) { + statusList.add(status.clone()); + } + } + + return clonedObj; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("ProcessGroupStatus [id="); + builder.append(id); + builder.append(", inputCount="); + builder.append(inputCount); + builder.append(", inputBytes="); + builder.append(inputContentSize); + builder.append(", outputCount="); + builder.append(outputCount); + builder.append(", outputBytes="); + builder.append(outputContentSize); + builder.append(", creationTimestamp="); + builder.append(creationTimestamp); + builder.append(", activeThreadCount="); + builder.append(activeThreadCount); + builder.append(", flowFilesTransferred="); + builder.append(flowFilesTransferred); + builder.append(", bytesTransferred="); + builder.append(bytesTransferred); + builder.append(", flowFilesReceived="); + builder.append(flowFilesReceived); + builder.append(", bytesReceived="); + builder.append(bytesReceived); + builder.append(", flowFilesSent="); + builder.append(flowFilesSent); + builder.append(", bytesSent="); + builder.append(bytesSent); + builder.append(",\n\tconnectionStatus="); + + for (final ConnectionStatus status : connectionStatus) { + builder.append("\n\t\t"); + builder.append(status); + } + + builder.append(",\n\tprocessorStatus="); + + for (final ProcessorStatus status : processorStatus) { + builder.append("\n\t\t"); + builder.append(status); + } + + builder.append(",\n\tprocessGroupStatus="); + + for (final ProcessGroupStatus status : processGroupStatus) { + builder.append("\n\t\t"); + builder.append(status); + } + + builder.append(",\n\tremoteProcessGroupStatus="); + for (final RemoteProcessGroupStatus status : remoteProcessGroupStatus) { + builder.append("\n\t\t"); + builder.append(status); + } + + builder.append(",\n\tinputPortStatus="); + for (final PortStatus status : inputPortStatus) { + builder.append("\n\t\t"); + builder.append(status); + } + + builder.append(",\n\toutputPortStatus="); + for (final PortStatus status : outputPortStatus) { + builder.append("\n\t\t"); + builder.append(status); + } + + builder.append("]"); + return builder.toString(); + } + + public static void merge(final ProcessGroupStatus target, final ProcessGroupStatus toMerge) { + if (target == null || toMerge == null) { + return; + } + + target.setInputCount(target.getInputCount() + toMerge.getInputCount()); + target.setInputContentSize(target.getInputContentSize() + toMerge.getInputContentSize()); + target.setOutputCount(target.getOutputCount() + toMerge.getOutputCount()); + target.setOutputContentSize(target.getOutputContentSize() + toMerge.getOutputContentSize()); + target.setQueuedCount(target.getQueuedCount() + toMerge.getQueuedCount()); + target.setQueuedContentSize(target.getQueuedContentSize() + toMerge.getQueuedContentSize()); + target.setBytesRead(target.getBytesRead() + toMerge.getBytesRead()); + target.setBytesWritten(target.getBytesWritten() + toMerge.getBytesWritten()); + target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount()); + target.setFlowFilesTransferred(target.getFlowFilesTransferred() + toMerge.getFlowFilesTransferred()); + target.setBytesTransferred(target.getBytesTransferred() + toMerge.getBytesTransferred()); + target.setFlowFilesReceived(target.getFlowFilesReceived() + toMerge.getFlowFilesReceived()); + target.setBytesReceived(target.getBytesReceived() + toMerge.getBytesReceived()); + target.setFlowFilesSent(target.getFlowFilesSent() + toMerge.getFlowFilesSent()); + target.setBytesSent(target.getBytesSent() + toMerge.getBytesSent()); + + // connection status + // sort by id + final Map<String, ConnectionStatus> mergedConnectionMap = new HashMap<>(); + for (final ConnectionStatus status : target.getConnectionStatus()) { + mergedConnectionMap.put(status.getId(), status); + } + + for (final ConnectionStatus statusToMerge : toMerge.getConnectionStatus()) { + ConnectionStatus merged = mergedConnectionMap.get(statusToMerge.getId()); + if (merged == null) { + mergedConnectionMap.put(statusToMerge.getId(), statusToMerge.clone()); + continue; + } + + merged.setQueuedCount(merged.getQueuedCount() + statusToMerge.getQueuedCount()); + merged.setQueuedBytes(merged.getQueuedBytes() + statusToMerge.getQueuedBytes()); + merged.setInputCount(merged.getInputCount() + statusToMerge.getInputCount()); + merged.setInputBytes(merged.getInputBytes() + statusToMerge.getInputBytes()); + merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount()); + merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes()); + } + target.setConnectionStatus(mergedConnectionMap.values()); + + // processor status + final Map<String, ProcessorStatus> mergedProcessorMap = new HashMap<>(); + for (final ProcessorStatus status : target.getProcessorStatus()) { + mergedProcessorMap.put(status.getId(), status); + } + + for (final ProcessorStatus statusToMerge : toMerge.getProcessorStatus()) { + ProcessorStatus merged = mergedProcessorMap.get(statusToMerge.getId()); + if (merged == null) { + mergedProcessorMap.put(statusToMerge.getId(), statusToMerge.clone()); + continue; + } + + merged.setActiveThreadCount(merged.getActiveThreadCount() + statusToMerge.getActiveThreadCount()); + merged.setBytesRead(merged.getBytesRead() + statusToMerge.getBytesRead()); + merged.setBytesWritten(merged.getBytesWritten() + statusToMerge.getBytesWritten()); + merged.setInputBytes(merged.getInputBytes() + statusToMerge.getInputBytes()); + merged.setInputCount(merged.getInputCount() + statusToMerge.getInputCount()); + merged.setInvocations(merged.getInvocations() + statusToMerge.getInvocations()); + merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes()); + merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount()); + merged.setProcessingNanos(merged.getProcessingNanos() + statusToMerge.getProcessingNanos()); + + // if the status to merge is invalid allow it to take precedence. whether the + // processor run status is disabled/stopped/running is part of the flow configuration + // and should not differ amongst nodes. however, whether a processor is invalid + // can be driven by environmental conditions. this check allows any of those to + // take precedence over the configured run status. + if (RunStatus.Invalid.equals(statusToMerge.getRunStatus())) { + merged.setRunStatus(RunStatus.Invalid); + } + } + target.setProcessorStatus(mergedProcessorMap.values()); + + // input ports + final Map<String, PortStatus> mergedInputPortMap = new HashMap<>(); + for (final PortStatus status : target.getInputPortStatus()) { + mergedInputPortMap.put(status.getId(), status); + } + + for (final PortStatus statusToMerge : toMerge.getInputPortStatus()) { + PortStatus merged = mergedInputPortMap.get(statusToMerge.getId()); + if (merged == null) { + mergedInputPortMap.put(statusToMerge.getId(), statusToMerge.clone()); + continue; + } + + merged.setInputBytes(merged.getInputBytes() + statusToMerge.getInputBytes()); + merged.setInputCount(merged.getInputCount() + statusToMerge.getInputCount()); + merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes()); + merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount()); + merged.setActiveThreadCount(merged.getActiveThreadCount() + statusToMerge.getActiveThreadCount()); + if (statusToMerge.isTransmitting() != null && statusToMerge.isTransmitting()) { + merged.setTransmitting(true); + } + + // should be unnecessary here since ports run status should not be affected by + // environmental conditions but doing so in case that changes + if (RunStatus.Invalid.equals(statusToMerge.getRunStatus())) { + merged.setRunStatus(RunStatus.Invalid); + } + } + target.setInputPortStatus(mergedInputPortMap.values()); + + // output ports + final Map<String, PortStatus> mergedOutputPortMap = new HashMap<>(); + for (final PortStatus status : target.getOutputPortStatus()) { + mergedOutputPortMap.put(status.getId(), status); + } + + for (final PortStatus statusToMerge : toMerge.getOutputPortStatus()) { + PortStatus merged = mergedOutputPortMap.get(statusToMerge.getId()); + if (merged == null) { + mergedOutputPortMap.put(statusToMerge.getId(), statusToMerge.clone()); + continue; + } + + merged.setInputBytes(merged.getInputBytes() + statusToMerge.getInputBytes()); + merged.setInputCount(merged.getInputCount() + statusToMerge.getInputCount()); + merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes()); + merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount()); + merged.setActiveThreadCount(merged.getActiveThreadCount() + statusToMerge.getActiveThreadCount()); + if (statusToMerge.isTransmitting() != null && statusToMerge.isTransmitting()) { + merged.setTransmitting(true); + } + + // should be unnecessary here since ports run status not should be affected by + // environmental conditions but doing so in case that changes + if (RunStatus.Invalid.equals(statusToMerge.getRunStatus())) { + merged.setRunStatus(RunStatus.Invalid); + } + } + target.setOutputPortStatus(mergedOutputPortMap.values()); + + // child groups + final Map<String, ProcessGroupStatus> mergedGroupMap = new HashMap<>(); + for (final ProcessGroupStatus status : target.getProcessGroupStatus()) { + mergedGroupMap.put(status.getId(), status); + } + + for (final ProcessGroupStatus statusToMerge : toMerge.getProcessGroupStatus()) { + ProcessGroupStatus merged = mergedGroupMap.get(statusToMerge.getId()); + if (merged == null) { + mergedGroupMap.put(statusToMerge.getId(), statusToMerge.clone()); + continue; + } + + merge(merged, statusToMerge); + } + target.setOutputPortStatus(mergedOutputPortMap.values()); + + // remote groups + final Map<String, RemoteProcessGroupStatus> mergedRemoteGroupMap = new HashMap<>(); + for (final RemoteProcessGroupStatus status : target.getRemoteProcessGroupStatus()) { + mergedRemoteGroupMap.put(status.getId(), status); + } + + for (final RemoteProcessGroupStatus statusToMerge : toMerge.getRemoteProcessGroupStatus()) { + RemoteProcessGroupStatus merged = mergedRemoteGroupMap.get(statusToMerge.getId()); + if (merged == null) { + mergedRemoteGroupMap.put(statusToMerge.getId(), statusToMerge.clone()); + continue; + } + + // NOTE - active/inactive port counts are not merged since that state is considered part of the flow (like runStatus) + merged.setReceivedContentSize(merged.getReceivedContentSize() + statusToMerge.getReceivedContentSize()); + merged.setReceivedCount(merged.getReceivedCount() + statusToMerge.getReceivedCount()); + merged.setSentContentSize(merged.getSentContentSize() + statusToMerge.getSentContentSize()); + merged.setSentCount(merged.getSentCount() + statusToMerge.getSentCount()); + merged.setActiveThreadCount(merged.getActiveThreadCount() + statusToMerge.getActiveThreadCount()); + + List<String> mergedAuthenticationIssues = merged.getAuthorizationIssues(); + if (mergedAuthenticationIssues == null) { + mergedAuthenticationIssues = new ArrayList<>(); + } + + final List<String> nodeAuthorizationIssues = statusToMerge.getAuthorizationIssues(); + if (nodeAuthorizationIssues != null && !nodeAuthorizationIssues.isEmpty()) { + mergedAuthenticationIssues.addAll(nodeAuthorizationIssues); + } + + merged.setAuthorizationIssues(mergedAuthenticationIssues); + } + + target.setRemoteProcessGroupStatus(mergedRemoteGroupMap.values()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java new file mode 100644 index 0000000..54be7ba --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status; + +import java.util.concurrent.TimeUnit; + +/** + */ +public class ProcessorStatus implements Cloneable { + + private String id; + private String groupId; + private String name; + private String type; + private RunStatus runStatus; + private int inputCount; + private long inputBytes; + private int outputCount; + private long outputBytes; + private long bytesRead; + private long bytesWritten; + private int invocations; + private long processingNanos; + private int flowFilesRemoved; + private long averageLineageDuration; + private int activeThreadCount; + private int flowFilesReceived; + private long bytesReceived; + private int flowFilesSent; + private long bytesSent; + + public String getId() { + return id; + } + + public void setId(final String id) { + this.id = id; + } + + public String getGroupId() { + return groupId; + } + + public void setGroupId(final String groupId) { + this.groupId = groupId; + } + + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + public String getType() { + return type; + } + + public void setType(final String type) { + this.type = type; + } + + public int getInputCount() { + return inputCount; + } + + public RunStatus getRunStatus() { + return runStatus; + } + + public void setRunStatus(RunStatus runStatus) { + this.runStatus = runStatus; + } + + public void setInputCount(final int inputCount) { + this.inputCount = inputCount; + } + + public long getInputBytes() { + return inputBytes; + } + + public void setInputBytes(final long inputBytes) { + this.inputBytes = inputBytes; + } + + public int getOutputCount() { + return outputCount; + } + + public void setOutputCount(final int outputCount) { + this.outputCount = outputCount; + } + + public long getOutputBytes() { + return outputBytes; + } + + public void setOutputBytes(final long outputBytes) { + this.outputBytes = outputBytes; + } + + public long getBytesRead() { + return bytesRead; + } + + public void setBytesRead(final long bytesRead) { + this.bytesRead = bytesRead; + } + + public long getBytesWritten() { + return bytesWritten; + } + + public void setBytesWritten(final long bytesWritten) { + this.bytesWritten = bytesWritten; + } + + public int getInvocations() { + return invocations; + } + + public void setInvocations(final int invocations) { + this.invocations = invocations; + } + + public long getProcessingNanos() { + return processingNanos; + } + + public void setProcessingNanos(final long processingNanos) { + this.processingNanos = processingNanos; + } + + public long getAverageLineageDuration(final TimeUnit timeUnit) { + return TimeUnit.MILLISECONDS.convert(averageLineageDuration, timeUnit); + } + + public void setAverageLineageDuration(final long duration, final TimeUnit timeUnit) { + this.averageLineageDuration = timeUnit.toMillis(duration); + } + + public long getAverageLineageDuration() { + return averageLineageDuration; + } + + public void setAverageLineageDuration(final long millis) { + this.averageLineageDuration = millis; + } + + public int getFlowFilesRemoved() { + return flowFilesRemoved; + } + + public void setFlowFilesRemoved(int flowFilesRemoved) { + this.flowFilesRemoved = flowFilesRemoved; + } + + public int getActiveThreadCount() { + return activeThreadCount; + } + + public void setActiveThreadCount(final int activeThreadCount) { + this.activeThreadCount = activeThreadCount; + } + + public int getFlowFilesReceived() { + return flowFilesReceived; + } + + public void setFlowFilesReceived(int flowFilesReceived) { + this.flowFilesReceived = flowFilesReceived; + } + + public long getBytesReceived() { + return bytesReceived; + } + + public void setBytesReceived(long bytesReceived) { + this.bytesReceived = bytesReceived; + } + + public int getFlowFilesSent() { + return flowFilesSent; + } + + public void setFlowFilesSent(int flowFilesSent) { + this.flowFilesSent = flowFilesSent; + } + + public long getBytesSent() { + return bytesSent; + } + + public void setBytesSent(long bytesSent) { + this.bytesSent = bytesSent; + } + + @Override + public ProcessorStatus clone() { + final ProcessorStatus clonedObj = new ProcessorStatus(); + clonedObj.activeThreadCount = activeThreadCount; + clonedObj.bytesRead = bytesRead; + clonedObj.bytesWritten = bytesWritten; + clonedObj.flowFilesReceived = flowFilesReceived; + clonedObj.bytesReceived = bytesReceived; + clonedObj.flowFilesSent = flowFilesSent; + clonedObj.bytesSent = bytesSent; + clonedObj.groupId = groupId; + clonedObj.id = id; + clonedObj.inputBytes = inputBytes; + clonedObj.inputCount = inputCount; + clonedObj.invocations = invocations; + clonedObj.name = name; + clonedObj.outputBytes = outputBytes; + clonedObj.outputCount = outputCount; + clonedObj.processingNanos = processingNanos; + clonedObj.averageLineageDuration = averageLineageDuration; + clonedObj.flowFilesRemoved = flowFilesRemoved; + clonedObj.runStatus = runStatus; + clonedObj.type = type; + return clonedObj; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("ProcessorStatus [id="); + builder.append(id); + builder.append(", groupId="); + builder.append(groupId); + builder.append(", name="); + builder.append(name); + builder.append(", type="); + builder.append(type); + builder.append(", runStatus="); + builder.append(runStatus); + builder.append(", inputCount="); + builder.append(inputCount); + builder.append(", inputBytes="); + builder.append(inputBytes); + builder.append(", outputCount="); + builder.append(outputCount); + builder.append(", outputBytes="); + builder.append(outputBytes); + builder.append(", bytesRead="); + builder.append(bytesRead); + builder.append(", bytesWritten="); + builder.append(bytesWritten); + builder.append(", invocations="); + builder.append(invocations); + builder.append(", processingNanos="); + builder.append(processingNanos); + builder.append(", activeThreadCount="); + builder.append(activeThreadCount); + builder.append("]"); + return builder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/RemoteProcessGroupStatus.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/RemoteProcessGroupStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/RemoteProcessGroupStatus.java new file mode 100644 index 0000000..110972e --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/RemoteProcessGroupStatus.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +/** + * + */ +public class RemoteProcessGroupStatus implements Cloneable { + + private String id; + private String groupId; + private TransmissionStatus transmissionStatus; + private String uri; + private String name; + private Integer activeThreadCount; + private int sentCount; + private long sentContentSize; + private int receivedCount; + private long receivedContentSize; + private Integer activeRemotePortCount; + private Integer inactiveRemotePortCount; + + private long averageLineageDuration; + private List<String> authorizationIssues = new ArrayList<>(); + + public String getTargetUri() { + return uri; + } + + public void setTargetUri(String uri) { + this.uri = uri; + } + + public TransmissionStatus getTransmissionStatus() { + return transmissionStatus; + } + + public void setTransmissionStatus(TransmissionStatus transmissionStatus) { + this.transmissionStatus = transmissionStatus; + } + + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public Integer getActiveThreadCount() { + return activeThreadCount; + } + + public void setActiveThreadCount(Integer activeThreadCount) { + this.activeThreadCount = activeThreadCount; + } + + public Integer getSentCount() { + return sentCount; + } + + public void setSentCount(Integer sentCount) { + this.sentCount = sentCount; + } + + public Long getSentContentSize() { + return sentContentSize; + } + + public void setSentContentSize(Long sentContentSize) { + this.sentContentSize = sentContentSize; + } + + public Integer getReceivedCount() { + return receivedCount; + } + + public void setReceivedCount(Integer receivedCount) { + this.receivedCount = receivedCount; + } + + public Long getReceivedContentSize() { + return receivedContentSize; + } + + public void setReceivedContentSize(Long receivedContentSize) { + this.receivedContentSize = receivedContentSize; + } + + public Integer getActiveRemotePortCount() { + return activeRemotePortCount; + } + + public void setActiveRemotePortCount(Integer activeRemotePortCount) { + this.activeRemotePortCount = activeRemotePortCount; + } + + public Integer getInactiveRemotePortCount() { + return inactiveRemotePortCount; + } + + public void setInactiveRemotePortCount(Integer inactiveRemotePortCount) { + this.inactiveRemotePortCount = inactiveRemotePortCount; + } + + public List<String> getAuthorizationIssues() { + return new ArrayList<>(authorizationIssues); + } + + public void setAuthorizationIssues(List<String> authorizationIssues) { + this.authorizationIssues = new ArrayList<>(Objects.requireNonNull(authorizationIssues)); + } + + public long getAverageLineageDuration() { + return averageLineageDuration; + } + + public void setAverageLineageDuration(final long millis) { + this.averageLineageDuration = millis; + } + + public long getAverageLineageDuration(final TimeUnit timeUnit) { + return TimeUnit.MILLISECONDS.convert(averageLineageDuration, timeUnit); + } + + public void setAverageLineageDuration(final long duration, final TimeUnit timeUnit) { + this.averageLineageDuration = timeUnit.toMillis(duration); + } + + @Override + public RemoteProcessGroupStatus clone() { + final RemoteProcessGroupStatus clonedObj = new RemoteProcessGroupStatus(); + clonedObj.id = id; + clonedObj.groupId = groupId; + clonedObj.name = name; + clonedObj.uri = uri; + clonedObj.activeThreadCount = activeThreadCount; + clonedObj.transmissionStatus = transmissionStatus; + clonedObj.sentCount = sentCount; + clonedObj.sentContentSize = sentContentSize; + clonedObj.receivedCount = receivedCount; + clonedObj.receivedContentSize = receivedContentSize; + clonedObj.activeRemotePortCount = activeRemotePortCount; + clonedObj.inactiveRemotePortCount = inactiveRemotePortCount; + clonedObj.averageLineageDuration = averageLineageDuration; + clonedObj.authorizationIssues = getAuthorizationIssues(); + return clonedObj; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("RemoteProcessGroupStatus [id="); + builder.append(id); + builder.append(", groupId="); + builder.append(groupId); + builder.append(", name="); + builder.append(name); + builder.append(", uri="); + builder.append(uri); + builder.append(", activeThreadCount="); + builder.append(activeThreadCount); + builder.append(", transmissionStatus="); + builder.append(transmissionStatus); + builder.append(", sentCount="); + builder.append(sentCount); + builder.append(", sentContentSize="); + builder.append(sentContentSize); + builder.append(", receivedCount="); + builder.append(receivedCount); + builder.append(", receivedContentSize="); + builder.append(receivedContentSize); + builder.append(", activeRemotePortCount="); + builder.append(activeRemotePortCount); + builder.append(", inactiveRemotePortCount="); + builder.append(inactiveRemotePortCount); + builder.append(", authenticationIssues="); + builder.append(authorizationIssues); + builder.append("]"); + return builder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/RunStatus.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/RunStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/RunStatus.java new file mode 100644 index 0000000..1b7c43d --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/RunStatus.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status; + +/** + * + */ +public enum RunStatus { + + Running, + Stopped, + Invalid, + Disabled; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/TransmissionStatus.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/TransmissionStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/TransmissionStatus.java new file mode 100644 index 0000000..6d7eb12 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/TransmissionStatus.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status; + +public enum TransmissionStatus { + + Transmitting, + NotTransmitting; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java new file mode 100644 index 0000000..4628a28 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status.history; + +import java.util.Date; +import java.util.List; + +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; + +/** + * A repository for storing and retrieving components' historical status + * information + */ +public interface ComponentStatusRepository { + + /** + * Captures the status information provided in the given report + * + * @param rootGroupStatus status of root group + */ + void capture(ProcessGroupStatus rootGroupStatus); + + /** + * Captures the status information provided in the given report, providing a + * timestamp that indicates the time at which the status report was + * generated. This can be used to replay historical values. + * + * @param rootGroupStatus status + * @param timestamp timestamp of capture + */ + void capture(ProcessGroupStatus rootGroupStatus, Date timestamp); + + /** + * @return the Date at which the latest capture was performed + */ + Date getLastCaptureDate(); + + /** + * @param connectionId the ID of the Connection for which the Status is + * desired + * @param start the earliest date for which status information should be + * returned; if <code>null</code>, the start date should be assumed to be + * the beginning of time + * @param end the latest date for which status information should be + * returned; if <code>null</code>, the end date should be assumed to be the + * current time + * @param preferredDataPoints the preferred number of data points to return. + * If the date range is large, the total number of data points could be far + * too many to process. Therefore, this parameter allows the requestor to + * indicate how many samples to return. + * @return a {@link StatusHistory} that provides the status information + * about the Connection with the given ID during the given time period + */ + StatusHistory getConnectionStatusHistory(String connectionId, Date start, Date end, int preferredDataPoints); + + /** + * @param processGroupId of group to get status of + * @param start the earliest date for which status information should be + * returned; if <code>null</code>, the start date should be assumed to be + * the beginning of time + * @param end the latest date for which status information should be + * returned; if <code>null</code>, the end date should be assumed to be the + * current time + * @param preferredDataPoints the preferred number of data points to return. + * If the date range is large, the total number of data points could be far + * too many to process. Therefore, this parameter allows the requestor to + * indicate how many samples to return. + * @return a {@link StatusHistory} that provides the status information + * about the Process Group with the given ID during the given time period + */ + StatusHistory getProcessGroupStatusHistory(String processGroupId, Date start, Date end, int preferredDataPoints); + + /** + * @param processorId to get status of + * @param start the earliest date for which status information should be + * returned; if <code>null</code>, the start date should be assumed to be + * the beginning of time + * @param end the latest date for which status information should be + * returned; if <code>null</code>, the end date should be assumed to be the + * current time + * @param preferredDataPoints the preferred number of data points to return. + * If the date range is large, the total number of data points could be far + * too many to process. Therefore, this parameter allows the requestor to + * indicate how many samples to return. + * @return a {@link StatusHistory} that provides the status information + * about the Processor with the given ID during the given time period + */ + StatusHistory getProcessorStatusHistory(String processorId, Date start, Date end, int preferredDataPoints); + + /** + * @param remoteGroupId to get history of + * @param start the earliest date for which status information should be + * returned; if <code>null</code>, the start date should be assumed to be + * the beginning of time + * @param end the latest date for which status information should be + * returned; if <code>null</code>, the end date should be assumed to be the + * current time + * @param preferredDataPoints the preferred number of data points to return. + * If the date range is large, the total number of data points could be far + * too many to process. Therefore, this parameter allows the requestor to + * indicate how many samples to return. + * @return a {@link StatusHistory} that provides the status information + * about the Remote Process Group with the given ID during the given time + * period + */ + StatusHistory getRemoteProcessGroupStatusHistory(String remoteGroupId, Date start, Date end, int preferredDataPoints); + + /** + * @return a List of all {@link MetricDescriptor}s that are applicable to + * Process Groups + */ + List<MetricDescriptor<ProcessGroupStatus>> getProcessGroupMetricDescriptors(); + + /** + * @return a List of all {@link MetricDescriptor}s that are applicable to + * Processors + */ + List<MetricDescriptor<ProcessorStatus>> getProcessorMetricDescriptors(); + + /** + * @return a List of all {@link MetricDescriptor}s that are applicable to + * Remote Process Groups + */ + List<MetricDescriptor<RemoteProcessGroupStatus>> getRemoteProcessGroupMetricDescriptors(); + + /** + * @return a List of all {@link MetricDescriptor}s that are applicable to + * Connections + */ + List<MetricDescriptor<ConnectionStatus>> getConnectionMetricDescriptors(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java new file mode 100644 index 0000000..8fdce05 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status.history; + +/** + * Describes a particular metric that is derived from a Status History + * + * @param <T> type of metric + */ +public interface MetricDescriptor<T> { + + public enum Formatter { + + COUNT, + DURATION, + DATA_SIZE + }; + + /** + * Specifies how the values should be formatted + * + * @return formatter for values + */ + Formatter getFormatter(); + + /** + * @return a human-readable description of the field + */ + String getDescription(); + + /** + * @return a human-readable label for the field + */ + String getLabel(); + + /** + * @return the name of a field + */ + String getField(); + + /** + * @return a {@link ValueMapper} that can be used to extract a value for the + * status history + */ + ValueMapper<T> getValueFunction(); + + /** + * @return a {@link ValueReducer} that can reduce multiple StatusSnapshots + * into a single Long value + */ + ValueReducer<StatusSnapshot, Long> getValueReducer(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java new file mode 100644 index 0000000..f1bb946 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status.history; + +import java.util.Date; +import java.util.List; +import java.util.Map; + +/** + * Represents a collection of historical status values for a component + */ +public interface StatusHistory { + + /** + * @return a Date indicating when this report was generated + */ + Date getDateGenerated(); + + /** + * @return a Map of component field names and their values. The order in + * which these values are displayed is dependent on the natural ordering of + * the Map returned + */ + Map<String, String> getComponentDetails(); + + /** + * @return List of snapshots for a given component + */ + List<StatusSnapshot> getStatusSnapshots(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java new file mode 100644 index 0000000..551ceb2 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status.history; + +import java.util.Date; +import java.util.Map; + +/** + * A StatusSnapshot represents a Component's status report at some point in time + */ +public interface StatusSnapshot { + + /** + * @return the point in time for which the status values were obtained + */ + Date getTimestamp(); + + /** + * @return a Map of MetricDescriptor to value + */ + Map<MetricDescriptor<?>, Long> getStatusMetrics(); + + /** + * @return a {@link ValueReducer} that is capable of merging multiple + * StatusSnapshot objects into a single one + */ + ValueReducer<StatusSnapshot, StatusSnapshot> getValueReducer(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ValueMapper.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ValueMapper.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ValueMapper.java new file mode 100644 index 0000000..8000b3a --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ValueMapper.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status.history; + +public interface ValueMapper<S> { + + Long getValue(S status); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ValueReducer.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ValueReducer.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ValueReducer.java new file mode 100644 index 0000000..0427da7 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/history/ValueReducer.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status.history; + +import java.util.List; + +public interface ValueReducer<T, R> { + + R reduce(List<T> values); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/events/EventReporter.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/events/EventReporter.java b/nifi-api/src/main/java/org/apache/nifi/events/EventReporter.java new file mode 100644 index 0000000..d645d60 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/events/EventReporter.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.events; + +import java.io.Serializable; + +import org.apache.nifi.reporting.Severity; + +/** + * Implementations MUST be thread-safe + */ +public interface EventReporter extends Serializable { + + void reportEvent(Severity severity, String category, String message); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/expression/AttributeExpression.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/expression/AttributeExpression.java b/nifi-api/src/main/java/org/apache/nifi/expression/AttributeExpression.java new file mode 100644 index 0000000..ed409ea --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/expression/AttributeExpression.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.expression; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.exception.ProcessException; + +public interface AttributeExpression { + + /** + * @return Evaluates the expression without providing any FlowFile Attributes. This + * will evaluate the expression based only on System Properties and JVM + * Environment properties + * @throws ProcessException if unable to evaluate + */ + String evaluate() throws ProcessException; + + /** + * Evaluates the expression without providing any FlowFile Attributes. This + * will evaluate the expression based only on System Properties and JVM + * Environment properties but allows the values to be decorated + * + * @param decorator for attribute value + * @return evaluated value + * @throws ProcessException if failure in evaluation + */ + String evaluate(AttributeValueDecorator decorator) throws ProcessException; + + /** + * Evaluates the expression, providing access to the attributes, file size, + * id, etc. of the given FlowFile, as well as System Properties and JVM + * Environment properties + * + * @param flowFile to evaluate + * @return evaluated value + * @throws ProcessException if failure evaluating + */ + String evaluate(FlowFile flowFile) throws ProcessException; + + /** + * Evaluates the expression, providing access to the attributes, file size, + * id, etc. of the given FlowFile, as well as System Properties and JVM + * Environment properties and allows the values to be decorated + * + * @param flowFile to evaluate + * @param decorator for evaluation + * @return evaluated value + * @throws ProcessException if failed to evaluate + */ + String evaluate(FlowFile flowFile, AttributeValueDecorator decorator) throws ProcessException; + + /** + * @return the type that is returned by the Expression + */ + ResultType getResultType(); + + public static enum ResultType { + + STRING, BOOLEAN, NUMBER, DATE; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/expression/AttributeValueDecorator.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/expression/AttributeValueDecorator.java b/nifi-api/src/main/java/org/apache/nifi/expression/AttributeValueDecorator.java new file mode 100644 index 0000000..4cea248 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/expression/AttributeValueDecorator.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.expression; + +public interface AttributeValueDecorator { + + /** + * Decorates the value of a FlowFile Attribute or System/JVM property in + * some way + * + * @param attributeValue to decorate + * @return decorated value + */ + String decorate(String attributeValue); +}