NIFI-108: - Removing sort from UI. - Addressing issues with listing and flowfile retrieval when clustered. - Making the context menu item available when source and destination are still running. - Adding a refresh button to the queue listing table. - Fixing the flowfile summary sorting in the cluster manager. - Adding a message when the source or destination of a connection is actively running. - Updating the documentation regarding queue interaction. - Updating the error message when a flowfile is no longer in the active queue. - Updated queue listing to allow listing to be done while source and destination are running but not sort or have ability to search - Added heartbeat when we finish clearing queue - Addressing comments from review.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0d7edcb3 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0d7edcb3 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0d7edcb3 Branch: refs/heads/NIFI-259 Commit: 0d7edcb3ace95e8729c4bc27457d702fdd879f09 Parents: 7d73ae7 Author: Matt Gilman <[email protected]> Authored: Thu Jan 7 15:59:02 2016 -0500 Committer: Matt Gilman <[email protected]> Committed: Thu Jan 21 08:13:32 2016 -0500 ---------------------------------------------------------------------- .../nifi/controller/queue/FlowFileQueue.java | 46 +--- .../controller/queue/ListFlowFileStatus.java | 20 -- .../src/main/asciidoc/images/iconDetails.png | Bin 0 -> 549 bytes nifi-docs/src/main/asciidoc/user-guide.adoc | 14 +- .../nifi/web/api/dto/ListingRequestDTO.java | 71 ++----- .../nifi/web/api/entity/FlowFileEntity.java | 2 +- .../cluster/manager/impl/WebClusterManager.java | 53 +++-- .../controller/queue/ListFlowFileRequest.java | 35 +-- .../nifi/connectable/StandardConnection.java | 9 +- .../apache/nifi/controller/FlowController.java | 1 + .../nifi/controller/FlowFileSummaries.java | 95 --------- .../nifi/controller/StandardFlowFileQueue.java | 211 +++++-------------- .../controller/TestStandardFlowFileQueue.java | 159 ++------------ .../repository/TestStandardProcessSession.java | 2 +- .../org/apache/nifi/web/NiFiServiceFacade.java | 22 +- .../nifi/web/StandardNiFiServiceFacade.java | 129 +++++++----- .../apache/nifi/web/api/ConnectionResource.java | 61 ++---- .../org/apache/nifi/web/api/dto/DtoFactory.java | 4 - .../org/apache/nifi/web/dao/ConnectionDAO.java | 9 +- .../web/dao/impl/StandardConnectionDAO.java | 36 ++-- .../WEB-INF/partials/canvas/queue-listing.jsp | 8 +- .../src/main/webapp/css/queue-listing.css | 53 +++-- .../src/main/webapp/js/nf/canvas/nf-actions.js | 2 +- .../main/webapp/js/nf/canvas/nf-context-menu.js | 2 +- .../webapp/js/nf/canvas/nf-queue-listing.js | 75 ++++--- 25 files changed, 377 insertions(+), 742 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0d7edcb3/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java index 0d0f03f..f2066a8 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java @@ -211,60 +211,28 @@ public interface FlowFileQueue { DropFlowFileStatus cancelDropFlowFileRequest(String requestIdentifier); /** + * <p> * Initiates a request to obtain a listing of FlowFiles in this queue. This method returns a * ListFlowFileStatus that can be used to obtain information about the FlowFiles that exist * within the queue. Additionally, the ListFlowFileStatus provides a request identifier that * can then be passed to the {@link #getListFlowFileStatus(String)}. The listing of FlowFiles * will be returned ordered by the position of the FlowFile in the queue. + * </p> * - * @param requestIdentifier the identifier of the List FlowFile Request - * @param maxResults the maximum number of FlowFileSummary objects to add to the ListFlowFileStatus - * - * @return the status for the request - * - * @throws IllegalStateException if either the source or the destination of the connection to which this queue belongs - * is currently running. - */ - ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults); - - /** - * Initiates a request to obtain a listing of FlowFiles in this queue. This method returns a - * ListFlowFileStatus that can be used to obtain information about the FlowFiles that exist - * within the queue. Additionally, the ListFlowFileStatus provides a request identifier that - * can then be passed to the {@link #getListFlowFileStatus(String)} + * <p> + * Note that if maxResults is larger than the size of the "active queue" (i.e., the un-swapped queued, + * FlowFiles that are swapped out will not be returned.) + * </p> * * @param requestIdentifier the identifier of the List FlowFile Request * @param maxResults the maximum number of FlowFileSummary objects to add to the ListFlowFileStatus - * @param sortColumn specifies which column to sort on - * @param direction specifies which direction to sort the FlowFiles * * @return the status for the request * * @throws IllegalStateException if either the source or the destination of the connection to which this queue belongs * is currently running. */ - ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults, SortColumn sortColumn, SortDirection direction); - - /** - * Initiates a request to obtain a listing of FlowFiles in this queue. This method returns a - * ListFlowFileStatus that can be used to obtain information about the FlowFiles that exist - * within the queue. Additionally, the ListFlowFileStatus provides a request identifier that - * can then be passed to the {@link #getListFlowFileStatus(String)} - * - * @param requestIdentifier the identifier of the List FlowFile Request - * @param maxResults the maximum number of FlowFileSummary objects to add to the ListFlowFileStatus - * @param query an Expression Language expression that will be evaluated against all FlowFiles. Only FlowFiles that satisfy the expression will - * be included in the results. The expression must be a valid expression and return a Boolean type - * @param sortColumn specifies which column to sort on - * @param direction specifies which direction to sort the FlowFiles - * - * @return the status for the request - * - * @throws IllegalStateException if either the source or the destination of the connection to which this queue belongs - * is currently running. - * @throws IllegalArgumentException if query is not a valid Expression Language expression or does not return a boolean type - */ - ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults, String query, SortColumn sortColumn, SortDirection direction); + ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults); /** * Returns the current status of a List FlowFile Request that was initiated via the {@link #listFlowFiles(String)} http://git-wip-us.apache.org/repos/asf/nifi/blob/0d7edcb3/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java index cae500d..e3cc337 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java @@ -44,16 +44,6 @@ public interface ListFlowFileStatus { long getLastUpdated(); /** - * @return the column on which the listing is sorted - */ - SortColumn getSortColumn(); - - /** - * @return the direction in which the FlowFiles are sorted - */ - SortDirection getSortDirection(); - - /** * @return the current state of the operation */ ListFlowFileState getState(); @@ -77,14 +67,4 @@ public interface ListFlowFileStatus { * @return the percentage (an integer between 0 and 100, inclusive) of how close the request is to being completed */ int getCompletionPercentage(); - - /** - * @return the total number of steps that are required in order to finish the listing - */ - int getTotalStepCount(); - - /** - * @return the total number of steps that have already been completed. The value returned will be >= 0 and <= the result of calling {@link #getTotalStepCount()}. - */ - int getCompletedStepCount(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/0d7edcb3/nifi-docs/src/main/asciidoc/images/iconDetails.png ---------------------------------------------------------------------- diff --git a/nifi-docs/src/main/asciidoc/images/iconDetails.png b/nifi-docs/src/main/asciidoc/images/iconDetails.png new file mode 100644 index 0000000..fe6b61f Binary files /dev/null and b/nifi-docs/src/main/asciidoc/images/iconDetails.png differ http://git-wip-us.apache.org/repos/asf/nifi/blob/0d7edcb3/nifi-docs/src/main/asciidoc/user-guide.adoc ---------------------------------------------------------------------- diff --git a/nifi-docs/src/main/asciidoc/user-guide.adoc b/nifi-docs/src/main/asciidoc/user-guide.adoc index a9cd083..6d97703 100644 --- a/nifi-docs/src/main/asciidoc/user-guide.adoc +++ b/nifi-docs/src/main/asciidoc/user-guide.adoc @@ -1260,19 +1260,23 @@ image:iconNotSecure.png["Not Secure"] -[[Queue_Listing]] -=== Listing FlowFiles in a Queue +[[Queue_Interaction]] +=== Queue Interaction The FlowFiles enqueued in a Connection can be viewed when necessary. The Queue listing is opened via a menu item in -a Connection's context menu. This option is only available when the source and destination of the Connection have -been stopped and all active threads have completed. The listing will return the top 100 FlowFiles according to -the currently sorted column. +a Connection's context menu. The listing will return the top 100 FlowFiles in the active queue according to the +configured priority. The listing can be performed even if the source and destination are actively running. Additionally, details for a Flowfile in the listing can be viewed by clicking on the Details icon ( image:iconDetails.png["Details"] ) in the left most column. From here, the FlowFile details and attributes are available as well buttons for downloading or viewing the content. Viewing the content is only available if the nifi.content.viewer.url has been configured. +If the source or destination of the Connection are actively running, there is a chance that the desired FlowFile will +no longer be in the active queue. +The FlowFiles enqueued in a Connection can also be deleted when necessary. The removal of the FlowFiles is initiated +via a menu item in the Connection's context menu. This action can also be performed if the source and destination +are actively running. [[Summary_Page]] http://git-wip-us.apache.org/repos/asf/nifi/blob/0d7edcb3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java index e29f41f..444aaf5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java @@ -22,6 +22,7 @@ import java.util.List; import javax.xml.bind.annotation.XmlType; import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import org.apache.nifi.web.api.dto.util.TimeAdapter; import org.apache.nifi.web.api.dto.util.TimestampAdapter; import com.wordnik.swagger.annotations.ApiModelProperty; @@ -38,11 +39,10 @@ public class ListingRequestDTO { private Integer percentCompleted; private Boolean finished; private String failureReason; - private String sortColumn; - private String sortDirection; private Integer maxResults; - private Integer totalStepCount; - private Integer completedStepCount; + + private Boolean isSourceRunning; + private Boolean isDestinationRunning; private String state; private QueueSizeDTO queueSize; @@ -95,7 +95,7 @@ public class ListingRequestDTO { /** * @return the time this request was last updated */ - @XmlJavaTypeAdapter(TimestampAdapter.class) + @XmlJavaTypeAdapter(TimeAdapter.class) @ApiModelProperty( value = "The last time this listing request was updated." ) @@ -178,30 +178,6 @@ public class ListingRequestDTO { } /** - * @return the column on which the listing is sorted - */ - @ApiModelProperty(value = "The column on which the FlowFiles are sorted.") - public String getSortColumn() { - return sortColumn; - } - - public void setSortColumn(String sortColumn) { - this.sortColumn = sortColumn; - } - - /** - * @return the direction in which the FlowFiles are sorted - */ - @ApiModelProperty(value = "The direction in which the FlowFiles are sorted. Either ASCENDING or DESCENDING.") - public String getSortDirection() { - return sortDirection; - } - - public void setSortDirection(String sortDirection) { - this.sortDirection = sortDirection; - } - - /** * @return the maximum number of FlowFileSummary objects to return */ @ApiModelProperty(value = "The maximum number of FlowFileSummary objects to return") @@ -213,40 +189,39 @@ public class ListingRequestDTO { this.maxResults = maxResults; } - /** - * @return the total number of steps required to complete the listing + * @return the size for the queue */ - @ApiModelProperty(value = "The total number of steps required to complete the listing") - public Integer getTotalStepCount() { - return totalStepCount; + @ApiModelProperty(value = "The size of the queue") + public QueueSizeDTO getQueueSize() { + return queueSize; } - public void setTotalStepCount(Integer totalStepCount) { - this.totalStepCount = totalStepCount; + public void setQueueSize(QueueSizeDTO queueSize) { + this.queueSize = queueSize; } /** - * @return the number of steps that have already been completed. This value will be >= 0 and <= the total step count + * @return whether the source is running */ - @ApiModelProperty(value = "The number of steps that have already been completed. This value will be between 0 and the total step count (inclusive)") - public Integer getCompletedStepCount() { - return completedStepCount; + @ApiModelProperty(value = "Whether the source of the connection is running") + public Boolean getSourceRunning() { + return isSourceRunning; } - public void setCompletedStepCount(Integer completedStepCount) { - this.completedStepCount = completedStepCount; + public void setSourceRunning(Boolean sourceRunning) { + isSourceRunning = sourceRunning; } /** - * @return the size for the queue + * @return whether the destination is running */ - @ApiModelProperty(value = "The size of the queue") - public QueueSizeDTO getQueueSize() { - return queueSize; + @ApiModelProperty(value = "Whether the destination of the connection is running") + public Boolean getDestinationRunning() { + return isDestinationRunning; } - public void setQueueSize(QueueSizeDTO queueSize) { - this.queueSize = queueSize; + public void setDestinationRunning(Boolean destinationRunning) { + isDestinationRunning = destinationRunning; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/0d7edcb3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/FlowFileEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/FlowFileEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/FlowFileEntity.java index 639cc85..960ad0d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/FlowFileEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/FlowFileEntity.java @@ -23,7 +23,7 @@ import javax.xml.bind.annotation.XmlRootElement; /** * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a FlowFileDTO. */ -@XmlRootElement(name = "listingRequestEntity") +@XmlRootElement(name = "flowFileEntity") public class FlowFileEntity extends Entity { private FlowFileDTO flowFile; http://git-wip-us.apache.org/repos/asf/nifi/blob/0d7edcb3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 55e58ac..77005b6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -127,7 +127,6 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ControllerService; -import org.apache.nifi.controller.FlowFileSummaries; import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ScheduledState; @@ -137,8 +136,6 @@ import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.exception.ComponentLifeCycleException; import org.apache.nifi.controller.queue.DropFlowFileState; import org.apache.nifi.controller.queue.ListFlowFileState; -import org.apache.nifi.controller.queue.SortColumn; -import org.apache.nifi.controller.queue.SortDirection; import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode; import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; import org.apache.nifi.controller.reporting.ReportingTaskProvider; @@ -2447,7 +2444,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } private static boolean isListFlowFilesEndpoint(final URI uri, final String method) { - if ("GET".equalsIgnoreCase(method) && LISTING_REQUEST_URI.matcher(uri.getPath()).matches()) { + if (("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) && LISTING_REQUEST_URI.matcher(uri.getPath()).matches()) { return true; } else if ("POST".equalsIgnoreCase(method) && LISTING_REQUESTS_URI.matcher(uri.getPath()).matches()) { return true; @@ -2516,7 +2513,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C || isProvenanceQueryEndpoint(uri, method) || isProvenanceEventEndpoint(uri, method) || isControllerServicesEndpoint(uri, method) || isControllerServiceEndpoint(uri, method) || isControllerServiceReferenceEndpoint(uri, method) || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method) - || isDropRequestEndpoint(uri, method); + || isDropRequestEndpoint(uri, method) || isListFlowFilesEndpoint(uri, method); } private void mergeProcessorValidationErrors(final ProcessorDTO processor, Map<NodeIdentifier, ProcessorDTO> processorMap) { @@ -2860,8 +2857,28 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C * @param listingRequestMap the mapping of all responses being merged */ private void mergeListingRequests(final ListingRequestDTO listingRequest, final Map<NodeIdentifier, ListingRequestDTO> listingRequestMap) { - final Comparator<FlowFileSummaryDTO> comparator = FlowFileSummaries.createDTOComparator( - SortColumn.valueOf(listingRequest.getSortColumn()), SortDirection.valueOf(listingRequest.getSortDirection())); + final Comparator<FlowFileSummaryDTO> comparator = new Comparator<FlowFileSummaryDTO>() { + @Override + public int compare(final FlowFileSummaryDTO dto1, final FlowFileSummaryDTO dto2) { + int positionCompare = dto1.getPosition().compareTo(dto2.getPosition()); + if (positionCompare != 0) { + return positionCompare; + } + + final String address1 = dto1.getClusterNodeAddress(); + final String address2 = dto2.getClusterNodeAddress(); + if (address1 == null && address2 == null) { + return 0; + } + if (address1 == null) { + return 1; + } + if (address2 == null) { + return -1; + } + return address1.compareTo(address2); + } + }; final NavigableSet<FlowFileSummaryDTO> flowFileSummaries = new TreeSet<>(comparator); @@ -2877,8 +2894,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final ListingRequestDTO nodeRequest = entry.getValue(); - numStepsCompleted += nodeRequest.getCompletedStepCount(); - numStepsTotal += nodeRequest.getTotalStepCount(); + numStepsTotal++; + if (Boolean.TRUE.equals(nodeRequest.getFinished())) { + numStepsCompleted++; + } final QueueSizeDTO nodeQueueSize = nodeRequest.getQueueSize(); objectCount += nodeQueueSize.getObjectCount(); @@ -2898,15 +2917,17 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C state = nodeState; } - for (final FlowFileSummaryDTO summaryDTO : nodeRequest.getFlowFileSummaries()) { - summaryDTO.setClusterNodeId(nodeIdentifier.getId()); - summaryDTO.setClusterNodeAddress(nodeAddress); + if (nodeRequest.getFlowFileSummaries() != null) { + for (final FlowFileSummaryDTO summaryDTO : nodeRequest.getFlowFileSummaries()) { + summaryDTO.setClusterNodeId(nodeIdentifier.getId()); + summaryDTO.setClusterNodeAddress(nodeAddress); - flowFileSummaries.add(summaryDTO); + flowFileSummaries.add(summaryDTO); - // Keep the set from growing beyond our max - if (flowFileSummaries.size() > listingRequest.getMaxResults()) { - flowFileSummaries.pollLast(); + // Keep the set from growing beyond our max + if (flowFileSummaries.size() > listingRequest.getMaxResults()) { + flowFileSummaries.pollLast(); + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/0d7edcb3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java index 313ad0c..dea23fa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java @@ -25,24 +25,17 @@ public class ListFlowFileRequest implements ListFlowFileStatus { private final String requestId; private final int maxResults; private final QueueSize queueSize; - private final SortColumn sortColumn; - private final SortDirection sortDirection; private final long submissionTime = System.currentTimeMillis(); private final List<FlowFileSummary> flowFileSummaries = new ArrayList<>(); private ListFlowFileState state = ListFlowFileState.WAITING_FOR_LOCK; private String failureReason; - private int numSteps; - private int completedStepCount; private long lastUpdated = System.currentTimeMillis(); - public ListFlowFileRequest(final String requestId, final SortColumn sortColumn, final SortDirection sortDirection, final int maxResults, final QueueSize queueSize, final int numSteps) { + public ListFlowFileRequest(final String requestId, final int maxResults, final QueueSize queueSize) { this.requestId = requestId; - this.sortColumn = sortColumn; - this.sortDirection = sortDirection; this.maxResults = maxResults; this.queueSize = queueSize; - this.numSteps = numSteps; } @Override @@ -61,16 +54,6 @@ public class ListFlowFileRequest implements ListFlowFileStatus { } @Override - public SortColumn getSortColumn() { - return sortColumn; - } - - @Override - public SortDirection getSortDirection() { - return sortDirection; - } - - @Override public synchronized ListFlowFileState getState() { return state; } @@ -118,25 +101,11 @@ public class ListFlowFileRequest implements ListFlowFileStatus { @Override public synchronized int getCompletionPercentage() { - return (int) (100F * completedStepCount / numSteps); - } - - public synchronized void setCompletedStepCount(final int completedStepCount) { - this.completedStepCount = completedStepCount; + return state == ListFlowFileState.COMPLETE ? 100 : 0; } @Override public int getMaxResults() { return maxResults; } - - @Override - public int getTotalStepCount() { - return numSteps; - } - - @Override - public int getCompletedStepCount() { - return completedStepCount; - } } http://git-wip-us.apache.org/repos/asf/nifi/blob/0d7edcb3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java index d43a3db..1ef18c0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.StandardFlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueue; @@ -70,7 +71,7 @@ public final class StandardConnection implements Connection { relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships)); scheduler = builder.scheduler; flowFileQueue = new StandardFlowFileQueue(id, this, builder.flowFileRepository, builder.provenanceRepository, builder.resourceClaimManager, - scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold()); + scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold(), builder.heartbeater); hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode(); } @@ -269,6 +270,7 @@ public final class StandardConnection implements Connection { private FlowFileRepository flowFileRepository; private ProvenanceEventRepository provenanceRepository; private ResourceClaimManager resourceClaimManager; + private Heartbeater heartbeater; public Builder(final ProcessScheduler scheduler) { this.scheduler = scheduler; @@ -304,6 +306,11 @@ public final class StandardConnection implements Connection { return this; } + public Builder heartbeater(final Heartbeater heartbeater) { + this.heartbeater = heartbeater; + return this; + } + public Builder bendPoints(final List<Position> bendPoints) { this.bendPoints.clear(); this.bendPoints.addAll(bendPoints); http://git-wip-us.apache.org/repos/asf/nifi/blob/0d7edcb3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index dd3b687..0cab9ad 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -770,6 +770,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R .resourceClaimManager(resourceClaimManager) .flowFileRepository(flowFileRepository) .provenanceRepository(provenanceEventRepository) + .heartbeater(this) .build(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/0d7edcb3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java deleted file mode 100644 index 7687d8a..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.controller; - -import java.util.Collections; -import java.util.Comparator; - -import org.apache.nifi.controller.queue.FlowFileSummary; -import org.apache.nifi.controller.queue.SortColumn; -import org.apache.nifi.controller.queue.SortDirection; -import org.apache.nifi.web.api.dto.FlowFileSummaryDTO; - -public class FlowFileSummaries { - - public static Comparator<FlowFileSummary> createComparator(final SortColumn column, final SortDirection direction) { - final Comparator<FlowFileSummary> comparator = new Comparator<FlowFileSummary>() { - @Override - public int compare(final FlowFileSummary o1, final FlowFileSummary o2) { - switch (column) { - case FILENAME: - return o1.getFilename().compareTo(o2.getFilename()); - case FLOWFILE_AGE: - return Long.compare(o1.getLineageStartDate(), o2.getLineageStartDate()); - case FLOWFILE_SIZE: - return Long.compare(o1.getSize(), o2.getSize()); - case FLOWFILE_UUID: - return o1.getUuid().compareTo(o2.getUuid()); - case PENALIZATION: - return Boolean.compare(o1.isPenalized(), o2.isPenalized()); - case QUEUE_POSITION: - return Long.compare(o1.getPosition(), o2.getPosition()); - case QUEUED_DURATION: - return Long.compare(o1.getLastQueuedTime(), o2.getLastQueuedTime()); - } - - return 0; - } - }; - - - if (direction == SortDirection.DESCENDING) { - return Collections.reverseOrder(comparator); - } else { - return comparator; - } - } - - public static Comparator<FlowFileSummaryDTO> createDTOComparator(final SortColumn column, final SortDirection direction) { - final Comparator<FlowFileSummaryDTO> comparator = new Comparator<FlowFileSummaryDTO>() { - @Override - public int compare(final FlowFileSummaryDTO o1, final FlowFileSummaryDTO o2) { - switch (column) { - case FILENAME: - return o1.getFilename().compareTo(o2.getFilename()); - case FLOWFILE_AGE: - return o1.getLineageDuration().compareTo(o2.getLineageDuration()); - case FLOWFILE_SIZE: - return Long.compare(o1.getSize(), o2.getSize()); - case FLOWFILE_UUID: - return o1.getUuid().compareTo(o2.getUuid()); - case PENALIZATION: - return Boolean.compare(o1.getPenalized(), o2.getPenalized()); - case QUEUE_POSITION: - return Long.compare(o1.getPosition(), o2.getPosition()); - case QUEUED_DURATION: - return o1.getQueuedDuration().compareTo(o2.getQueuedDuration()); - } - - return 0; - } - }; - - if (direction == SortDirection.DESCENDING) { - return Collections.reverseOrder(comparator); - } else { - return comparator; - } - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/0d7edcb3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 64e49c3..04763ee 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -16,9 +16,25 @@ */ package org.apache.nifi.controller; -import org.apache.nifi.attribute.expression.language.PreparedQuery; -import org.apache.nifi.attribute.expression.language.Query; -import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageParsingException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.queue.DropFlowFileState; import org.apache.nifi.controller.queue.DropFlowFileStatus; @@ -28,8 +44,6 @@ import org.apache.nifi.controller.queue.ListFlowFileRequest; import org.apache.nifi.controller.queue.ListFlowFileState; import org.apache.nifi.controller.queue.ListFlowFileStatus; import org.apache.nifi.controller.queue.QueueSize; -import org.apache.nifi.controller.queue.SortColumn; -import org.apache.nifi.controller.queue.SortDirection; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileSwapManager; @@ -39,7 +53,6 @@ import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.events.EventReporter; -import org.apache.nifi.expression.AttributeExpression.ResultType; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -57,27 +70,6 @@ import org.apache.nifi.util.concurrency.TimedLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NavigableSet; -import java.util.PriorityQueue; -import java.util.Queue; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantReadWriteLock; - /** * A FlowFileQueue is used to queue FlowFile objects that are awaiting further * processing. Must be thread safe. @@ -115,6 +107,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private final FlowFileRepository flowFileRepository; private final ProvenanceEventRepository provRepository; private final ResourceClaimManager resourceClaimManager; + private final Heartbeater heartbeater; private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>(); private final ConcurrentMap<String, ListFlowFileRequest> listRequestMap = new ConcurrentHashMap<>(); @@ -123,7 +116,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private final ProcessScheduler scheduler; public StandardFlowFileQueue(final String identifier, final Connection connection, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo, - final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) { + final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold, + final Heartbeater heartbeater) { activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>())); priorities = new ArrayList<>(); swapQueue = new ArrayList<>(); @@ -137,6 +131,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { this.swapThreshold = swapThreshold; this.scheduler = scheduler; this.connection = connection; + this.heartbeater = heartbeater; readLock = new TimedLock(this.lock.readLock(), identifier + " Read Lock", 100); writeLock = new TimedLock(this.lock.writeLock(), identifier + " Write Lock", 100); @@ -852,33 +847,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { @Override public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults) { - return listFlowFiles(requestIdentifier, maxResults, SortColumn.QUEUE_POSITION, SortDirection.ASCENDING); - } - - @Override - public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults, final SortColumn sortColumn, final SortDirection direction) { - return listFlowFiles(requestIdentifier, maxResults, null, sortColumn, direction); - } - - @Override - public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults, final String query, final SortColumn sortColumn, final SortDirection direction) { - final PreparedQuery preparedQuery; - if (query == null) { - preparedQuery = null; - } else { - try { - final ResultType resultType = Query.compile(query).getResultType(); - if (resultType != ResultType.BOOLEAN) { - throw new IllegalArgumentException("Invalid expression Language provided to search the listing of FlowFiles. " - + "The expression must return a 'Boolean' type but returns a " + resultType.name() + " type"); - } - preparedQuery = Query.prepare(query); - } catch (final AttributeExpressionLanguageParsingException e) { - throw new IllegalArgumentException("Invalid Expression Language provided to search the listing of FlowFiles: " + query, e); - } - } - - // purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother + // purge any old requests from the map just to keep it clean. But if there are very few requests, which is usually the case, then don't bother if (listRequestMap.size() > 10) { final List<String> toDrop = new ArrayList<>(); for (final Map.Entry<String, ListFlowFileRequest> entry : listRequestMap.entrySet()) { @@ -896,101 +865,49 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } // numSteps = 1 for each swap location + 1 for active queue + 1 for swap queue. - final int numSteps = 2 + size.get().swapFiles; - final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, sortColumn, direction, maxResults, size(), numSteps); + final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, maxResults, size()); final Thread t = new Thread(new Runnable() { @Override public void run() { int position = 0; int resultCount = 0; - final Comparator<FlowFileSummary> comparator = FlowFileSummaries.createComparator(sortColumn, direction); - final NavigableSet<FlowFileSummary> summaries = new TreeSet<>(comparator); - int completedStepCount = 0; - - // we need a write lock while using the Active Queue because we can't iterate over it - we have to poll from it - // continually. This is because the iterator for PriorityQueue does not iterate over the elements in any particular - // order. Since we need the 'position' of the element in the queue, we need to iterate over them in the proper order. - writeLock.lock(); + final List<FlowFileSummary> summaries = new ArrayList<>(); + + // Create an ArrayList that contains all of the contents of the active queue. + // We do this so that we don't have to hold the lock any longer than absolutely necessary. + // We cannot simply pull the first 'maxResults' records from the queue, however, because the + // Iterator provided by PriorityQueue does not return records in order. So we would have to either + // use a writeLock and 'pop' the first 'maxResults' records off the queue or use a read lock and + // do a shallow copy of the queue. The shallow copy is generally quicker because it doesn't have to do + // the sorting to put the records back. So even though this has an expensive of Java Heap to create the + // extra collection, we are making this trade-off to avoid locking the queue any longer than required. + final List<FlowFileRecord> allFlowFiles; + final Prioritizer prioritizer; + readLock.lock(); try { logger.debug("{} Acquired lock to perform listing of FlowFiles", StandardFlowFileQueue.this); - listRequest.setState(ListFlowFileState.CALCULATING_LIST); - final List<FlowFileRecord> flowFileRecords = new ArrayList<>(activeQueue.size()); - - FlowFileRecord flowFile; - try { - while ((flowFile = activeQueue.poll()) != null) { - flowFileRecords.add(flowFile); - position++; - - if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) { - summaries.add(summarize(flowFile, position)); - if (summaries.size() > maxResults) { - summaries.pollLast(); - } - } - } - } finally { - activeQueue.addAll(flowFileRecords); - } + allFlowFiles = new ArrayList<>(activeQueue); + prioritizer = new Prioritizer(StandardFlowFileQueue.this.priorities); } finally { - writeLock.unlock("List FlowFiles"); + readLock.unlock("List FlowFiles"); } - logger.debug("{} Finished listing FlowFiles for active queue with a total of {} results", StandardFlowFileQueue.this, resultCount); - - listRequest.setCompletedStepCount(++completedStepCount); - - position = activeQueue.size(); - try { - // We are now iterating over swap files, and we don't need the write lock for this, just the read lock, since - // we are not modifying anything. - readLock.lock(); - try { - for (final String location : swapLocations) { - logger.debug("{} Performing listing of FlowFiles for Swap Location {}", StandardFlowFileQueue.this, location); - final List<FlowFileRecord> flowFiles = swapManager.peek(location, StandardFlowFileQueue.this); - for (final FlowFileRecord flowFile : flowFiles) { - position++; - - if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) { - summaries.add(summarize(flowFile, position)); - if (summaries.size() > maxResults) { - summaries.pollLast(); - } - } - } - - listRequest.setCompletedStepCount(++completedStepCount); - } - - logger.debug("{} Performing listing of FlowFiles from Swap Queue", StandardFlowFileQueue.this); - for (final FlowFileRecord flowFile : swapQueue) { - position++; + listRequest.setState(ListFlowFileState.CALCULATING_LIST); - if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) { - summaries.add(summarize(flowFile, position)); - if (summaries.size() > maxResults) { - summaries.pollLast(); - } - } - } + // sort the FlowFileRecords so that we have the list in the same order as on the queue. + Collections.sort(allFlowFiles, prioritizer); - listRequest.setCompletedStepCount(++completedStepCount); - } finally { - readLock.unlock("List FlowFiles"); + for (final FlowFileRecord flowFile : allFlowFiles) { + summaries.add(summarize(flowFile, ++position)); + if (summaries.size() >= maxResults) { + break; } - } catch (final IOException ioe) { - logger.error("Failed to read swapped FlowFiles in order to perform listing of queue " + StandardFlowFileQueue.this, ioe); - listRequest.setFailure("Could not read FlowFiles from queue. Check log files for more details."); } - // We have now completed the listing successfully. Set the number of completed steps to the total number of steps. We may have - // skipped some steps because we have reached the maximum number of results, so we consider those steps completed. - logger.debug("{} Completed listing of FlowFiles", StandardFlowFileQueue.this); - listRequest.setCompletedStepCount(listRequest.getTotalStepCount()); + logger.debug("{} Finished listing FlowFiles for active queue with a total of {} results", StandardFlowFileQueue.this, resultCount); + listRequest.setFlowFileSummaries(summaries); listRequest.setState(ListFlowFileState.COMPLETE); - listRequest.setFlowFileSummaries(new ArrayList<FlowFileSummary>(summaries)); } }, "List FlowFiles for Connection " + getIdentifier()); t.setDaemon(true); @@ -1082,24 +999,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { return flowFile; } } - - for (final FlowFileRecord flowFile : swapQueue) { - if (flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) { - return flowFile; - } - } - - // TODO: consider using a Long flowFileId instead of a UUID, and then having the swap manager - // write out the min and max FlowFile ID's. This would allow us to then have a method: boolean isFlowFilePossiblyContained(long id) - // which can return a boolean value that can be used to determine whether or not to even call peek - for (final String swapLocation : swapLocations) { - final List<FlowFileRecord> flowFiles = swapManager.peek(swapLocation, this); - for (final FlowFileRecord flowFile : flowFiles) { - if (flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) { - return flowFile; - } - } - } } finally { readLock.unlock("getFlowFile"); } @@ -1110,13 +1009,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { @Override public void verifyCanList() throws IllegalStateException { - if (connection.getSource().isRunning()) { - throw new IllegalStateException("Cannot list the FlowFiles of queue because the connection's source is still running"); - } - - if (connection.getDestination().isRunning()) { - throw new IllegalStateException("Cannot list the FlowFiles of queue because the connection's destination is still running"); - } } @Override @@ -1248,6 +1140,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue { logger.info("Successfully dropped {} FlowFiles ({} bytes) from Connection with ID {} on behalf of {}", dropRequest.getDroppedSize().getObjectCount(), dropRequest.getDroppedSize().getByteCount(), StandardFlowFileQueue.this.getIdentifier(), requestor); dropRequest.setState(DropFlowFileState.COMPLETE); + if (heartbeater != null) { + heartbeater.heartbeat(); + } } catch (final Exception e) { logger.error("Failed to drop FlowFiles from Connection with ID {} due to {}", StandardFlowFileQueue.this.getIdentifier(), e.toString()); logger.error("", e); http://git-wip-us.apache.org/repos/asf/nifi/blob/0d7edcb3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index fdb47b6..32d8566 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -17,17 +17,31 @@ package org.apache.nifi.controller; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.queue.DropFlowFileState; import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.queue.FlowFileQueue; -import org.apache.nifi.controller.queue.FlowFileSummary; import org.apache.nifi.controller.queue.ListFlowFileState; import org.apache.nifi.controller.queue.ListFlowFileStatus; import org.apache.nifi.controller.queue.QueueSize; -import org.apache.nifi.controller.queue.SortDirection; -import org.apache.nifi.controller.queue.SortColumn; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileSwapManager; @@ -49,23 +63,6 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - public class TestStandardFlowFileQueue { private TestSwapManager swapManager = null; private StandardFlowFileQueue queue = null; @@ -105,7 +102,7 @@ public class TestStandardFlowFileQueue { } }).when(provRepo).registerEvents(Mockito.any(Iterable.class)); - queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000); + queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000, null); TestFlowFile.idGenerator.set(0L); } @@ -417,72 +414,8 @@ public class TestStandardFlowFileQueue { assertEquals(9999, status.getFlowFileSummaries().size()); assertEquals(100, status.getCompletionPercentage()); assertNull(status.getFailureReason()); - assertEquals(2, status.getTotalStepCount()); - assertEquals(2, status.getCompletedStepCount()); } - @Test(timeout = 5000) - public void testListFlowFilesActiveQueueAndSwapQueue() throws InterruptedException { - for (int i = 0; i < 11000; i++) { - queue.put(new TestFlowFile()); - } - - final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 11000); - assertNotNull(status); - assertEquals(11000, status.getQueueSize().getObjectCount()); - - while (status.getState() != ListFlowFileState.COMPLETE) { - Thread.sleep(100); - } - - assertEquals(11000, status.getFlowFileSummaries().size()); - assertEquals(100, status.getCompletionPercentage()); - assertNull(status.getFailureReason()); - assertEquals(2, status.getTotalStepCount()); - assertEquals(2, status.getCompletedStepCount()); - } - - @Test(timeout = 5000) - public void testListFlowFilesActiveQueueAndSwapFile() throws InterruptedException { - for (int i = 0; i < 20000; i++) { - queue.put(new TestFlowFile()); - } - - final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 20000); - assertNotNull(status); - assertEquals(20000, status.getQueueSize().getObjectCount()); - - while (status.getState() != ListFlowFileState.COMPLETE) { - Thread.sleep(100); - } - - assertEquals(20000, status.getFlowFileSummaries().size()); - assertEquals(100, status.getCompletionPercentage()); - assertNull(status.getFailureReason()); - assertEquals(3, status.getTotalStepCount()); - assertEquals(3, status.getCompletedStepCount()); - } - - @Test(timeout = 5000) - public void testListFlowFilesActiveQueueAndSwapFilesAndSwapQueue() throws InterruptedException { - for (int i = 0; i < 30050; i++) { - queue.put(new TestFlowFile()); - } - - final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 30050); - assertNotNull(status); - assertEquals(30050, status.getQueueSize().getObjectCount()); - - while (status.getState() != ListFlowFileState.COMPLETE) { - Thread.sleep(100); - } - - assertEquals(30050, status.getFlowFileSummaries().size()); - assertEquals(100, status.getCompletionPercentage()); - assertNull(status.getFailureReason()); - assertEquals(4, status.getTotalStepCount()); - assertEquals(4, status.getCompletedStepCount()); - } @Test(timeout = 5000) public void testListFlowFilesResultsLimited() throws InterruptedException { @@ -501,62 +434,6 @@ public class TestStandardFlowFileQueue { assertEquals(100, status.getFlowFileSummaries().size()); assertEquals(100, status.getCompletionPercentage()); assertNull(status.getFailureReason()); - assertEquals(4, status.getTotalStepCount()); - assertEquals(4, status.getCompletedStepCount()); - } - - @Test - public void testListFlowFilesSortedAscending() throws InterruptedException { - for (int i = 0; i < 30050; i++) { - queue.put(new TestFlowFile(i)); - } - - final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 100, SortColumn.FLOWFILE_SIZE, SortDirection.ASCENDING); - assertNotNull(status); - assertEquals(30050, status.getQueueSize().getObjectCount()); - - while (status.getState() != ListFlowFileState.COMPLETE) { - Thread.sleep(100); - } - - assertEquals(100, status.getFlowFileSummaries().size()); - assertEquals(100, status.getCompletionPercentage()); - - assertNull(status.getFailureReason()); - assertEquals(4, status.getTotalStepCount()); - assertEquals(4, status.getCompletedStepCount()); - - int counter = 0; - for (final FlowFileSummary summary : status.getFlowFileSummaries()) { - assertEquals(counter++, summary.getSize()); - } - } - - @Test - public void testListFlowFilesSortedDescending() throws InterruptedException { - for (int i = 0; i < 30050; i++) { - queue.put(new TestFlowFile(i)); - } - - final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 100, SortColumn.FLOWFILE_SIZE, SortDirection.DESCENDING); - assertNotNull(status); - assertEquals(30050, status.getQueueSize().getObjectCount()); - - while (status.getState() != ListFlowFileState.COMPLETE) { - Thread.sleep(100); - } - - assertEquals(100, status.getFlowFileSummaries().size()); - assertEquals(100, status.getCompletionPercentage()); - - assertNull(status.getFailureReason()); - assertEquals(4, status.getTotalStepCount()); - assertEquals(4, status.getCompletedStepCount()); - - int counter = 0; - for (final FlowFileSummary summary : status.getFlowFileSummaries()) { - assertEquals((30050 - 1 - counter++), summary.getSize()); - } } http://git-wip-us.apache.org/repos/asf/nifi/blob/0d7edcb3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 644018f..f8db35e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -139,7 +139,7 @@ public class TestStandardProcessSession { final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class); final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class); - flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000); + flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000, null); when(connection.getFlowFileQueue()).thenReturn(flowFileQueue); Mockito.doAnswer(new Answer<Object>() { http://git-wip-us.apache.org/repos/asf/nifi/blob/0d7edcb3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 4bc1222..150dd67 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -16,25 +16,22 @@ */ package org.apache.nifi.web; -import java.util.Collection; -import java.util.Date; -import java.util.Set; import org.apache.nifi.controller.ScheduledState; - -import org.apache.nifi.controller.queue.SortColumn; -import org.apache.nifi.controller.queue.SortDirection; import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.web.api.dto.BulletinBoardDTO; import org.apache.nifi.web.api.dto.BulletinQueryDTO; import org.apache.nifi.web.api.dto.ClusterDTO; +import org.apache.nifi.web.api.dto.ComponentHistoryDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; import org.apache.nifi.web.api.dto.ControllerConfigurationDTO; import org.apache.nifi.web.api.dto.ControllerDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO; +import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; import org.apache.nifi.web.api.dto.CounterDTO; import org.apache.nifi.web.api.dto.CountersDTO; import org.apache.nifi.web.api.dto.DocumentedTypeDTO; +import org.apache.nifi.web.api.dto.DropRequestDTO; import org.apache.nifi.web.api.dto.FlowFileDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.FunnelDTO; @@ -45,9 +42,6 @@ import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO; import org.apache.nifi.web.api.dto.PortDTO; import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; -import org.apache.nifi.web.api.dto.ComponentHistoryDTO; -import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; -import org.apache.nifi.web.api.dto.DropRequestDTO; import org.apache.nifi.web.api.dto.PropertyDescriptorDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; @@ -61,8 +55,8 @@ import org.apache.nifi.web.api.dto.UserGroupDTO; import org.apache.nifi.web.api.dto.action.ActionDTO; import org.apache.nifi.web.api.dto.action.HistoryDTO; import org.apache.nifi.web.api.dto.action.HistoryQueryDTO; -import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO; +import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO; import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO; import org.apache.nifi.web.api.dto.search.SearchResultsDTO; @@ -78,6 +72,10 @@ import org.apache.nifi.web.api.dto.status.NodeStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; +import java.util.Collection; +import java.util.Date; +import java.util.Set; + /** * Defines the NiFiServiceFacade interface. */ @@ -585,11 +583,9 @@ public interface NiFiServiceFacade { * @param groupId group * @param connectionId The ID of the connection * @param listingRequestId The ID of the listing request - * @param column sort column - * @param direction sort direction * @return The ListingRequest */ - ListingRequestDTO createFlowFileListingRequest(String groupId, String connectionId, String listingRequestId, SortColumn column, SortDirection direction); + ListingRequestDTO createFlowFileListingRequest(String groupId, String connectionId, String listingRequestId); /** * Gets a new flow file listing request. http://git-wip-us.apache.org/repos/asf/nifi/blob/0d7edcb3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 2f92588..142ff08 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -16,25 +16,8 @@ */ package org.apache.nifi.web; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Set; -import java.util.TimeZone; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import javax.ws.rs.WebApplicationException; - +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; import org.apache.nifi.action.Component; import org.apache.nifi.action.FlowChangeAction; @@ -53,18 +36,23 @@ import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.cluster.node.Node; import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; import org.apache.nifi.controller.Counter; import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.Snippet; import org.apache.nifi.controller.Template; import org.apache.nifi.controller.label.Label; -import org.apache.nifi.controller.queue.SortColumn; -import org.apache.nifi.controller.queue.SortDirection; import org.apache.nifi.controller.repository.claim.ContentDirection; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceReference; +import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; @@ -82,9 +70,7 @@ import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinQuery; import org.apache.nifi.reporting.BulletinRepository; -import org.apache.nifi.web.api.dto.FlowFileDTO; -import org.apache.nifi.web.api.dto.ListingRequestDTO; -import org.apache.nifi.web.security.user.NiFiUserUtils; +import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.user.AccountStatus; import org.apache.nifi.user.NiFiUser; import org.apache.nifi.user.NiFiUserGroup; @@ -94,16 +80,22 @@ import org.apache.nifi.web.api.dto.BulletinBoardDTO; import org.apache.nifi.web.api.dto.BulletinDTO; import org.apache.nifi.web.api.dto.BulletinQueryDTO; import org.apache.nifi.web.api.dto.ClusterDTO; +import org.apache.nifi.web.api.dto.ComponentHistoryDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; import org.apache.nifi.web.api.dto.ControllerConfigurationDTO; import org.apache.nifi.web.api.dto.ControllerDTO; +import org.apache.nifi.web.api.dto.ControllerServiceDTO; +import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; import org.apache.nifi.web.api.dto.CounterDTO; import org.apache.nifi.web.api.dto.CountersDTO; import org.apache.nifi.web.api.dto.DocumentedTypeDTO; +import org.apache.nifi.web.api.dto.DropRequestDTO; import org.apache.nifi.web.api.dto.DtoFactory; +import org.apache.nifi.web.api.dto.FlowFileDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.FunnelDTO; import org.apache.nifi.web.api.dto.LabelDTO; +import org.apache.nifi.web.api.dto.ListingRequestDTO; import org.apache.nifi.web.api.dto.NodeDTO; import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO; import org.apache.nifi.web.api.dto.PortDTO; @@ -111,10 +103,11 @@ import org.apache.nifi.web.api.dto.PreviousValueDTO; import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.ProcessorConfigDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; -import org.apache.nifi.web.api.dto.ComponentHistoryDTO; +import org.apache.nifi.web.api.dto.PropertyDescriptorDTO; import org.apache.nifi.web.api.dto.PropertyHistoryDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; +import org.apache.nifi.web.api.dto.ReportingTaskDTO; import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.dto.SnippetDTO; import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO; @@ -131,6 +124,7 @@ import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO; import org.apache.nifi.web.api.dto.search.SearchResultsDTO; import org.apache.nifi.web.api.dto.status.ClusterConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO; +import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterStatusDTO; @@ -138,6 +132,7 @@ import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO; import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; import org.apache.nifi.web.api.dto.status.NodeConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.NodePortStatusDTO; +import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.NodeProcessorStatusDTO; import org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.NodeStatusDTO; @@ -145,40 +140,41 @@ import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.controller.ControllerFacade; import org.apache.nifi.web.dao.ConnectionDAO; +import org.apache.nifi.web.dao.ControllerServiceDAO; import org.apache.nifi.web.dao.FunnelDAO; import org.apache.nifi.web.dao.LabelDAO; import org.apache.nifi.web.dao.PortDAO; import org.apache.nifi.web.dao.ProcessGroupDAO; import org.apache.nifi.web.dao.ProcessorDAO; import org.apache.nifi.web.dao.RemoteProcessGroupDAO; +import org.apache.nifi.web.dao.ReportingTaskDAO; import org.apache.nifi.web.dao.SnippetDAO; import org.apache.nifi.web.dao.TemplateDAO; -import org.apache.nifi.web.util.SnippetUtils; - -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.Validator; -import org.apache.nifi.controller.ReportingTaskNode; -import org.apache.nifi.controller.ScheduledState; -import org.apache.nifi.controller.service.ControllerServiceNode; -import org.apache.nifi.controller.service.ControllerServiceReference; -import org.apache.nifi.controller.service.ControllerServiceState; -import org.apache.nifi.reporting.ComponentType; -import org.apache.nifi.web.api.dto.ControllerServiceDTO; -import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; -import org.apache.nifi.web.api.dto.DropRequestDTO; -import org.apache.nifi.web.api.dto.PropertyDescriptorDTO; -import org.apache.nifi.web.api.dto.ReportingTaskDTO; -import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO; -import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusDTO; -import org.apache.nifi.web.dao.ControllerServiceDAO; -import org.apache.nifi.web.dao.ReportingTaskDAO; import org.apache.nifi.web.security.user.NewAccountRequest; +import org.apache.nifi.web.security.user.NiFiUserUtils; +import org.apache.nifi.web.util.SnippetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.security.access.AccessDeniedException; +import javax.ws.rs.WebApplicationException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + /** * Implementation of NiFiServiceFacade that performs revision checking. */ @@ -828,7 +824,18 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ListingRequestDTO deleteFlowFileListingRequest(String groupId, String connectionId, String listingRequestId) { - return dtoFactory.createListingRequestDTO(connectionDAO.deleteFlowFileListingRequest(groupId, connectionId, listingRequestId)); + final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.deleteFlowFileListingRequest(groupId, connectionId, listingRequestId)); + + // include whether the source and destination are running + final Connection connection = connectionDAO.getConnection(groupId, connectionId); + if (connection.getSource() != null) { + listRequest.setSourceRunning(connection.getSource().isRunning()); + } + if (connection.getDestination() != null) { + listRequest.setDestinationRunning(connection.getDestination().isRunning()); + } + + return listRequest; } @Override @@ -1088,8 +1095,19 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ListingRequestDTO createFlowFileListingRequest(String groupId, String connectionId, String listingRequestId, SortColumn column, SortDirection direction) { - return dtoFactory.createListingRequestDTO(connectionDAO.createFlowFileListingRequest(groupId, connectionId, listingRequestId, column, direction)); + public ListingRequestDTO createFlowFileListingRequest(String groupId, String connectionId, String listingRequestId) { + final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.createFlowFileListingRequest(groupId, connectionId, listingRequestId)); + + // include whether the source and destination are running + final Connection connection = connectionDAO.getConnection(groupId, connectionId); + if (connection.getSource() != null) { + listRequest.setSourceRunning(connection.getSource().isRunning()); + } + if (connection.getDestination() != null) { + listRequest.setDestinationRunning(connection.getDestination().isRunning()); + } + + return listRequest; } @Override @@ -2153,7 +2171,18 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ListingRequestDTO getFlowFileListingRequest(String groupId, String connectionId, String listingRequestId) { - return dtoFactory.createListingRequestDTO(connectionDAO.getFlowFileListingRequest(groupId, connectionId, listingRequestId)); + final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.getFlowFileListingRequest(groupId, connectionId, listingRequestId)); + + // include whether the source and destination are running + final Connection connection = connectionDAO.getConnection(groupId, connectionId); + if (connection.getSource() != null) { + listRequest.setSourceRunning(connection.getSource().isRunning()); + } + if (connection.getDestination() != null) { + listRequest.setDestinationRunning(connection.getDestination().isRunning()); + } + + return listRequest; } @Override
