http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 77c3dd7..abb4b78 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -275,7 +275,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final SnippetManager snippetManager; private final long gracefulShutdownSeconds; private final ExtensionManager extensionManager; - private final NiFiProperties properties; + private final NiFiProperties nifiProperties; private final SSLContext sslContext; private final Set<RemoteSiteListener> externalSiteListeners = new HashSet<>(); private final AtomicReference<CounterRepository> counterRepositoryRef; @@ -420,7 +420,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private FlowController( final FlowFileEventRepository flowFileEventRepo, - final NiFiProperties properties, + final NiFiProperties nifiProperties, final Authorizer authorizer, final AuditService auditService, final StringEncryptor encryptor, @@ -435,16 +435,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R maxEventDrivenThreads = new AtomicInteger(5); this.encryptor = encryptor; - this.properties = properties; + this.nifiProperties = nifiProperties; this.heartbeatMonitor = heartbeatMonitor; - sslContext = SslContextFactory.createSslContext(properties, false); + sslContext = SslContextFactory.createSslContext(nifiProperties, false); extensionManager = new ExtensionManager(); this.clusterCoordinator = clusterCoordinator; timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process")); eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process")); - final FlowFileRepository flowFileRepo = createFlowFileRepository(properties, resourceClaimManager); + final FlowFileRepository flowFileRepo = createFlowFileRepository(nifiProperties, resourceClaimManager); flowFileRepository = flowFileRepo; flowFileEventRepository = flowFileEventRepo; counterRepositoryRef = new AtomicReference<>(new StandardCounterRepository()); @@ -453,25 +453,25 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R this.variableRegistry = variableRegistry == null ? VariableRegistry.EMPTY_REGISTRY : variableRegistry; try { - this.provenanceRepository = createProvenanceRepository(properties); + this.provenanceRepository = createProvenanceRepository(nifiProperties); this.provenanceRepository.initialize(createEventReporter(bulletinRepository), authorizer, this); } catch (final Exception e) { throw new RuntimeException("Unable to create Provenance Repository", e); } try { - this.contentRepository = createContentRepository(properties); + this.contentRepository = createContentRepository(nifiProperties); } catch (final Exception e) { throw new RuntimeException("Unable to create Content Repository", e); } try { - this.stateManagerProvider = StandardStateManagerProvider.create(properties, this.variableRegistry); + this.stateManagerProvider = StandardStateManagerProvider.create(nifiProperties, this.variableRegistry); } catch (final IOException e) { throw new RuntimeException(e); } - processScheduler = new StandardProcessScheduler(this, encryptor, stateManagerProvider, this.variableRegistry); + processScheduler = new StandardProcessScheduler(this, encryptor, stateManagerProvider, this.variableRegistry, this.nifiProperties); eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler); final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository); @@ -479,7 +479,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R eventDrivenEngineRef.get(), this, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor, this.variableRegistry)); final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry); - final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry); + final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry, this.nifiProperties); processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent); processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent); processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, quartzSchedulingAgent); @@ -490,7 +490,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R this.authorizer = authorizer; this.auditService = auditService; - final String gracefulShutdownSecondsVal = properties.getProperty(GRACEFUL_SHUTDOWN_PERIOD); + final String gracefulShutdownSecondsVal = nifiProperties.getProperty(GRACEFUL_SHUTDOWN_PERIOD); long shutdownSecs; try { shutdownSecs = Long.parseLong(gracefulShutdownSecondsVal); @@ -502,27 +502,26 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } gracefulShutdownSeconds = shutdownSecs; - remoteInputSocketPort = properties.getRemoteInputPort(); - remoteInputHttpPort = properties.getRemoteInputHttpPort(); - isSiteToSiteSecure = properties.isSiteToSiteSecure(); + remoteInputSocketPort = nifiProperties.getRemoteInputPort(); + remoteInputHttpPort = nifiProperties.getRemoteInputHttpPort(); + isSiteToSiteSecure = nifiProperties.isSiteToSiteSecure(); if (isSiteToSiteSecure && sslContext == null && remoteInputSocketPort != null) { throw new IllegalStateException("NiFi Configured to allow Secure Site-to-Site communications but the Keystore/Truststore properties are not configured"); } this.configuredForClustering = configuredForClustering; - this.heartbeatDelaySeconds = (int) FormatUtils.getTimeDuration(properties.getNodeHeartbeatInterval(), TimeUnit.SECONDS); + this.heartbeatDelaySeconds = (int) FormatUtils.getTimeDuration(nifiProperties.getNodeHeartbeatInterval(), TimeUnit.SECONDS); this.snippetManager = new SnippetManager(); - final ProcessGroup rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(), this, processScheduler, - properties, encryptor, this, this.variableRegistry); + nifiProperties, encryptor, this, this.variableRegistry); rootGroup.setName(DEFAULT_ROOT_GROUP_NAME); rootGroupRef.set(rootGroup); instanceId = ComponentIdGenerator.generateId().toString(); - controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository, stateManagerProvider, this.variableRegistry); + controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository, stateManagerProvider, this.variableRegistry, this.nifiProperties); if (remoteInputSocketPort == null) { LOG.info("Not enabling RAW Socket Site-to-Site functionality because nifi.remote.input.socket.port is not set"); @@ -534,13 +533,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R RemoteResourceManager.setServerProtocolImplementation(SocketFlowFileServerProtocol.RESOURCE_NAME, SocketFlowFileServerProtocol.class); final NodeInformant nodeInformant = configuredForClustering ? new ClusterCoordinatorNodeInformant(clusterCoordinator) : null; - externalSiteListeners.add(new SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext : null, nodeInformant)); + externalSiteListeners.add(new SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext : null, nifiProperties, nodeInformant)); } if (remoteInputHttpPort == null) { LOG.info("Not enabling HTTP(S) Site-to-Site functionality because the '" + NiFiProperties.SITE_TO_SITE_HTTP_ENABLED + "' property is not true"); } else { - externalSiteListeners.add(HttpRemoteSiteListener.getInstance()); + externalSiteListeners.add(HttpRemoteSiteListener.getInstance(nifiProperties)); } for (final RemoteSiteListener listener : externalSiteListeners) { @@ -548,7 +547,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } // Determine frequency for obtaining component status snapshots - final String snapshotFrequency = properties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY); + final String snapshotFrequency = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY); long snapshotMillis; try { snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS); @@ -557,9 +556,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } // Initialize the Embedded ZooKeeper server, if applicable - if (properties.isStartEmbeddedZooKeeper() && configuredForClustering) { + if (nifiProperties.isStartEmbeddedZooKeeper() && configuredForClustering) { try { - zooKeeperStateServer = ZooKeeperStateServer.create(properties); + zooKeeperStateServer = ZooKeeperStateServer.create(nifiProperties); zooKeeperStateServer.start(); } catch (final IOException | ConfigException e) { throw new IllegalStateException("Unable to initailize Flow because NiFi was configured to start an Embedded Zookeeper server but failed to do so", e); @@ -580,8 +579,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false)); if (configuredForClustering) { - leaderElectionManager = new CuratorLeaderElectionManager(4, properties); - heartbeater = new ClusterProtocolHeartbeater(protocolSender, properties); + leaderElectionManager = new CuratorLeaderElectionManager(4, nifiProperties); + heartbeater = new ClusterProtocolHeartbeater(protocolSender, nifiProperties); // Check if there is already a cluster coordinator elected. If not, go ahead // and register for coordinator role. If there is already one elected, do not register until @@ -624,7 +623,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } try { - final FlowFileRepository created = NarThreadContextClassLoader.createInstance(implementationClassName, FlowFileRepository.class); + final FlowFileRepository created = NarThreadContextClassLoader.createInstance(implementationClassName, FlowFileRepository.class, properties); synchronized (created) { created.initialize(contentClaimManager); } @@ -641,7 +640,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } try { - return NarThreadContextClassLoader.createInstance(implementationClassName, FlowFileSwapManager.class); + return NarThreadContextClassLoader.createInstance(implementationClassName, FlowFileSwapManager.class, properties); } catch (final Exception e) { throw new RuntimeException(e); } @@ -820,7 +819,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } try { - final ContentRepository contentRepo = NarThreadContextClassLoader.createInstance(implementationClassName, ContentRepository.class); + final ContentRepository contentRepo = NarThreadContextClassLoader.createInstance(implementationClassName, ContentRepository.class, properties); synchronized (contentRepo) { contentRepo.initialize(resourceClaimManager); } @@ -838,21 +837,21 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } try { - return NarThreadContextClassLoader.createInstance(implementationClassName, ProvenanceRepository.class); + return NarThreadContextClassLoader.createInstance(implementationClassName, ProvenanceRepository.class, properties); } catch (final Exception e) { throw new RuntimeException(e); } } private ComponentStatusRepository createComponentStatusRepository() { - final String implementationClassName = properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION); + final String implementationClassName = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION); if (implementationClassName == null) { throw new RuntimeException("Cannot create Component Status Repository because the NiFi Properties is missing the following property: " + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION); } try { - return NarThreadContextClassLoader.createInstance(implementationClassName, ComponentStatusRepository.class); + return NarThreadContextClassLoader.createInstance(implementationClassName, ComponentStatusRepository.class, nifiProperties); } catch (final Exception e) { throw new RuntimeException(e); } @@ -883,7 +882,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } // Create and initialize a FlowFileSwapManager for this connection - final FlowFileSwapManager swapManager = createSwapManager(properties); + final FlowFileSwapManager swapManager = createSwapManager(nifiProperties); final EventReporter eventReporter = createEventReporter(getBulletinRepository()); try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { @@ -984,7 +983,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @throws NullPointerException if the argument is null */ public ProcessGroup createProcessGroup(final String id) { - return new StandardProcessGroup(requireNonNull(id).intern(), this, processScheduler, properties, encryptor, this, variableRegistry); + return new StandardProcessGroup(requireNonNull(id).intern(), this, processScheduler, nifiProperties, encryptor, this, variableRegistry); } /** @@ -1040,11 +1039,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, variableRegistry); final ProcessorNode procNode; if (creationSuccessful) { - procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider); + procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, nifiProperties); } else { final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; final String componentType = "(Missing) " + simpleClassName; - procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, componentType, type); + procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, componentType, type, nifiProperties); } final LogRepository logRepository = LogRepositoryFactory.getRepository(id); @@ -1087,7 +1086,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final Class<? extends Processor> processorClass = rawClass.asSubclass(Processor.class); processor = processorClass.newInstance(); final ComponentLog componentLogger = new SimpleProcessLogger(identifier, processor); - final ProcessorInitializationContext ctx = new StandardProcessorInitializationContext(identifier, componentLogger, this, this); + final ProcessorInitializationContext ctx = new StandardProcessorInitializationContext(identifier, componentLogger, this, this, nifiProperties); processor.initialize(ctx); LogRepositoryFactory.getRepository(identifier).setLogger(componentLogger); @@ -1191,7 +1190,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @throws IllegalArgumentException if <code>uri</code> is not a valid URI. */ public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uri) { - return new StandardRemoteProcessGroup(requireNonNull(id).intern(), requireNonNull(uri).intern(), null, this, sslContext); + return new StandardRemoteProcessGroup(requireNonNull(id).intern(), requireNonNull(uri).intern(), null, this, sslContext, nifiProperties); } public ProcessGroup getRootGroup() { @@ -2845,7 +2844,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R if (firstTimeAdded) { final ComponentLog componentLog = new SimpleProcessLogger(id, taskNode.getReportingTask()); final ReportingInitializationContext config = new StandardReportingInitializationContext(id, taskNode.getName(), - SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this); + SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this, nifiProperties); try { task.initialize(config); @@ -3305,7 +3304,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return configuredForClustering; } - private void registerForClusterCoordinator() { leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new LeaderElectionStateChangeListener() { @Override @@ -3320,7 +3318,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // call start() when we become the leader, and this will ensure that initialization is handled. The heartbeat monitor // then will check the zookeeper znode to check if it is the cluster coordinator before kicking any nodes out of the // cluster. - if (clusterCoordinator != null) { clusterCoordinator.removeRole(ClusterRoles.CLUSTER_COORDINATOR); } @@ -3357,7 +3354,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * either connected or trying to connect to the cluster. * * @param clustered true if clustered - * @param clusterInstanceId if clustered is true, indicates the InstanceID of the Cluster Manager + * @param clusterInstanceId if clustered is true, indicates the InstanceID + * of the Cluster Manager */ public void setClustered(final boolean clustered, final String clusterInstanceId) { writeLock.lock();
http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 091e59c..6ad9976 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -135,6 +135,8 @@ public class StandardFlowService implements FlowService, ProtocolHandler { */ private NodeIdentifier nodeId; + private final NiFiProperties nifiProperties; + // guardedBy rwLock private boolean firstControllerInitialization = true; @@ -142,44 +144,45 @@ public class StandardFlowService implements FlowService, ProtocolHandler { private static final Logger logger = LoggerFactory.getLogger(StandardFlowService.class); public static StandardFlowService createStandaloneInstance( - final FlowController controller, - final NiFiProperties properties, - final StringEncryptor encryptor, - final RevisionManager revisionManager, - final Authorizer authorizer) throws IOException { + final FlowController controller, + final NiFiProperties nifiProperties, + final StringEncryptor encryptor, + final RevisionManager revisionManager, + final Authorizer authorizer) throws IOException { - return new StandardFlowService(controller, properties, null, encryptor, false, null, revisionManager, authorizer); + return new StandardFlowService(controller, nifiProperties, null, encryptor, false, null, revisionManager, authorizer); } public static StandardFlowService createClusteredInstance( - final FlowController controller, - final NiFiProperties properties, - final NodeProtocolSenderListener senderListener, - final ClusterCoordinator coordinator, - final StringEncryptor encryptor, - final RevisionManager revisionManager, - final Authorizer authorizer) throws IOException { - - return new StandardFlowService(controller, properties, senderListener, encryptor, true, coordinator, revisionManager, authorizer); + final FlowController controller, + final NiFiProperties nifiProperties, + final NodeProtocolSenderListener senderListener, + final ClusterCoordinator coordinator, + final StringEncryptor encryptor, + final RevisionManager revisionManager, + final Authorizer authorizer) throws IOException { + + return new StandardFlowService(controller, nifiProperties, senderListener, encryptor, true, coordinator, revisionManager, authorizer); } private StandardFlowService( - final FlowController controller, - final NiFiProperties properties, - final NodeProtocolSenderListener senderListener, - final StringEncryptor encryptor, - final boolean configuredForClustering, - final ClusterCoordinator clusterCoordinator, - final RevisionManager revisionManager, - final Authorizer authorizer) throws IOException { - + final FlowController controller, + final NiFiProperties nifiProperties, + final NodeProtocolSenderListener senderListener, + final StringEncryptor encryptor, + final boolean configuredForClustering, + final ClusterCoordinator clusterCoordinator, + final RevisionManager revisionManager, + final Authorizer authorizer) throws IOException { + + this.nifiProperties = nifiProperties; this.controller = controller; - flowXml = Paths.get(properties.getProperty(NiFiProperties.FLOW_CONFIGURATION_FILE)); + flowXml = Paths.get(nifiProperties.getProperty(NiFiProperties.FLOW_CONFIGURATION_FILE)); - gracefulShutdownSeconds = (int) FormatUtils.getTimeDuration(properties.getProperty(NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD), TimeUnit.SECONDS); - autoResumeState = properties.getAutoResumeState(); + gracefulShutdownSeconds = (int) FormatUtils.getTimeDuration(nifiProperties.getProperty(NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD), TimeUnit.SECONDS); + autoResumeState = nifiProperties.getAutoResumeState(); - dao = new StandardXMLFlowConfigurationDAO(flowXml, encryptor); + dao = new StandardXMLFlowConfigurationDAO(flowXml, encryptor, nifiProperties); this.clusterCoordinator = clusterCoordinator; if (clusterCoordinator != null) { clusterCoordinator.setFlowService(this); @@ -193,8 +196,8 @@ public class StandardFlowService implements FlowService, ProtocolHandler { this.senderListener = senderListener; senderListener.addHandler(this); - final InetSocketAddress nodeApiAddress = properties.getNodeApiAddress(); - final InetSocketAddress nodeSocketAddress = properties.getClusterNodeProtocolAddress(); + final InetSocketAddress nodeApiAddress = nifiProperties.getNodeApiAddress(); + final InetSocketAddress nodeSocketAddress = nifiProperties.getClusterNodeProtocolAddress(); String nodeUuid = null; final StateManager stateManager = controller.getStateManagerProvider().getStateManager(CLUSTER_NODE_CONFIG); @@ -208,10 +211,10 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // use a random UUID as the proposed node identifier this.nodeId = new NodeIdentifier(nodeUuid, - nodeApiAddress.getHostName(), nodeApiAddress.getPort(), - nodeSocketAddress.getHostName(), nodeSocketAddress.getPort(), - properties.getRemoteInputHost(), properties.getRemoteInputPort(), - properties.getRemoteInputHttpPort(), properties.isSiteToSiteSecure()); + nodeApiAddress.getHostName(), nodeApiAddress.getPort(), + nodeSocketAddress.getHostName(), nodeSocketAddress.getPort(), + nifiProperties.getRemoteInputHost(), nifiProperties.getRemoteInputPort(), + nifiProperties.getRemoteInputHttpPort(), nifiProperties.isSiteToSiteSecure()); } else { this.configuredForClustering = false; @@ -244,7 +247,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { public void overwriteFlow(final InputStream is) throws IOException { writeLock.lock(); try (final OutputStream output = Files.newOutputStream(flowXml, StandardOpenOption.WRITE, StandardOpenOption.CREATE); - final OutputStream gzipOut = new GZIPOutputStream(output);) { + final OutputStream gzipOut = new GZIPOutputStream(output);) { FileUtils.copy(is, gzipOut); } finally { writeLock.unlock(); @@ -253,7 +256,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { @Override public void saveFlowChanges(final TimeUnit delayUnit, final long delay) { - final boolean archiveEnabled = NiFiProperties.getInstance().isFlowConfigurationArchiveEnabled(); + final boolean archiveEnabled = nifiProperties.isFlowConfigurationArchiveEnabled(); saveFlowChanges(delayUnit, delay, archiveEnabled); } @@ -584,12 +587,12 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // reconnect final ConnectionResponse connectionResponse = new ConnectionResponse(getNodeId(), request.getDataFlow(), - request.getInstanceId(), request.getNodeConnectionStatuses(), request.getComponentRevisions()); + request.getInstanceId(), request.getNodeConnectionStatuses(), request.getComponentRevisions()); loadFromConnectionResponse(connectionResponse); clusterCoordinator.resetNodeStatuses(connectionResponse.getNodeConnectionStatuses().stream() - .collect(Collectors.toMap(status -> status.getNodeIdentifier(), status -> status))); + .collect(Collectors.toMap(status -> status.getNodeIdentifier(), status -> status))); controller.resumeHeartbeats(); // we are now connected, so resume sending heartbeats. logger.info("Node reconnected."); @@ -637,7 +640,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // write lock must already be acquired private void loadFromBytes(final DataFlow proposedFlow, final boolean allowEmptyFlow) - throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException { + throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException { logger.trace("Loading flow from bytes"); // resolve the given flow (null means load flow from disk) @@ -695,16 +698,15 @@ public class StandardFlowService implements FlowService, ProtocolHandler { } /** - * In NiFi 0.x, templates were stored in a templates directory as separate files. They are - * now stored in the flow itself. If there already are templates in that directory, though, - * we want to restore them. + * In NiFi 0.x, templates were stored in a templates directory as separate + * files. They are now stored in the flow itself. If there already are + * templates in that directory, though, we want to restore them. * * @return the templates found in the templates directory * @throws IOException if unable to read from the file system */ public List<Template> loadTemplates() throws IOException { - final NiFiProperties properties = NiFiProperties.getInstance(); - final Path templatePath = properties.getTemplateDirectory(); + final Path templatePath = nifiProperties.getTemplateDirectory(); final File[] files = templatePath.toFile().listFiles(pathname -> { final String lowerName = pathname.getName().toLowerCase(); @@ -718,7 +720,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { final List<Template> templates = new ArrayList<>(); for (final File file : files) { try (final FileInputStream fis = new FileInputStream(file); - final BufferedInputStream bis = new BufferedInputStream(fis)) { + final BufferedInputStream bis = new BufferedInputStream(fis)) { final TemplateDTO templateDto; try { @@ -820,7 +822,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { controller.getStateManagerProvider().getStateManager(CLUSTER_NODE_CONFIG).setState(map, Scope.LOCAL); } catch (final IOException ioe) { logger.warn("Received successful response from Cluster Manager but failed to persist state about the Node's Unique Identifier and the Node's Index. " - + "This node may be assigned a different UUID when the node is restarted.", ioe); + + "This node may be assigned a different UUID when the node is restarted.", ioe); } return response; @@ -834,7 +836,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { writeLock.lock(); try { clusterCoordinator.resetNodeStatuses(response.getNodeConnectionStatuses().stream() - .collect(Collectors.toMap(status -> status.getNodeIdentifier(), status -> status))); + .collect(Collectors.toMap(status -> status.getNodeIdentifier(), status -> status))); // get the dataflow from the response final DataFlow dataFlow = response.getDataFlow(); @@ -869,7 +871,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { throw new ConnectionException(CONNECTION_EXCEPTION_MSG_PREFIX + "local or cluster flow is malformed.", fse); } catch (final FlowSynchronizationException fse) { throw new FlowSynchronizationException(CONNECTION_EXCEPTION_MSG_PREFIX + "local flow controller partially updated. " - + "Administrator should disconnect node and review flow for corruption.", fse); + + "Administrator should disconnect node and review flow for corruption.", fse); } catch (final Exception ex) { throw new ConnectionException("Failed to connect node to cluster due to: " + ex, ex); } finally { @@ -887,7 +889,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { } try (final InputStream in = Files.newInputStream(flowXml, StandardOpenOption.READ); - final InputStream gzipIn = new GZIPInputStream(in)) { + final InputStream gzipIn = new GZIPInputStream(in)) { FileUtils.copy(gzipIn, os); } } finally { @@ -895,7 +897,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler { } } - public void loadSnippets(final byte[] bytes) throws IOException { if (bytes.length == 0) { return; @@ -909,7 +910,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler { } } - private class SaveReportingTask implements Runnable { @Override @@ -962,6 +962,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { } private class SaveHolder { + private final Calendar saveTime; private final boolean shouldArchive; http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index d1822ef..6364d6c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -121,10 +121,12 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { public static final URL FLOW_XSD_RESOURCE = StandardFlowSynchronizer.class.getResource("/FlowConfiguration.xsd"); private final StringEncryptor encryptor; private final boolean autoResumeState; + private final NiFiProperties nifiProperties; - public StandardFlowSynchronizer(final StringEncryptor encryptor) { + public StandardFlowSynchronizer(final StringEncryptor encryptor, final NiFiProperties nifiProperties) { this.encryptor = encryptor; - autoResumeState = NiFiProperties.getInstance().getAutoResumeState(); + autoResumeState = nifiProperties.getAutoResumeState(); + this.nifiProperties = nifiProperties; } public static boolean isEmpty(final DataFlow dataFlow) { @@ -309,7 +311,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } // get/create all the reporting task nodes and DTOs, but don't apply their scheduled state yet - final Map<ReportingTaskNode,ReportingTaskDTO> reportingTaskNodesToDTOs = new HashMap<>(); + final Map<ReportingTaskNode, ReportingTaskDTO> reportingTaskNodesToDTOs = new HashMap<>(); for (final Element taskElement : reportingTaskElements) { final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(taskElement, encryptor); final ReportingTaskNode reportingTask = getOrCreateReportingTask(controller, dto, initialized, existingFlowEmpty); @@ -344,7 +346,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { .collect(Collectors.toSet()); // clone the controller services and map the original id to the clone - final Map<String,ControllerServiceNode> controllerServiceMapping = new HashMap<>(); + final Map<String, ControllerServiceNode> controllerServiceMapping = new HashMap<>(); for (ControllerServiceNode controllerService : controllerServicesToClone) { final ControllerServiceNode clone = ControllerServiceLoader.cloneControllerService(controller, controllerService); controller.addRootControllerService(clone); @@ -370,7 +372,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { scaleRootGroup(rootGroup, encodingVersion); // now that controller services are loaded and enabled we can apply the scheduled state to each reporting task - for (Map.Entry<ReportingTaskNode,ReportingTaskDTO> entry : reportingTaskNodesToDTOs.entrySet()) { + for (Map.Entry<ReportingTaskNode, ReportingTaskDTO> entry : reportingTaskNodesToDTOs.entrySet()) { applyReportingTaskScheduleState(controller, entry.getValue(), entry.getKey(), initialized, existingFlowEmpty); } } @@ -403,15 +405,15 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } - private void updateReportingTaskControllerServices(final Set<ReportingTaskNode> reportingTasks, final Map<String,ControllerServiceNode> controllerServiceMapping) { + private void updateReportingTaskControllerServices(final Set<ReportingTaskNode> reportingTasks, final Map<String, ControllerServiceNode> controllerServiceMapping) { for (ReportingTaskNode reportingTask : reportingTasks) { if (reportingTask.getProperties() != null) { - final Set<Map.Entry<PropertyDescriptor,String>> propertyDescriptors = reportingTask.getProperties().entrySet().stream() + final Set<Map.Entry<PropertyDescriptor, String>> propertyDescriptors = reportingTask.getProperties().entrySet().stream() .filter(e -> e.getKey().getControllerServiceDefinition() != null) .filter(e -> controllerServiceMapping.containsKey(e.getValue())) .collect(Collectors.toSet()); - for (Map.Entry<PropertyDescriptor,String> propEntry : propertyDescriptors) { + for (Map.Entry<PropertyDescriptor, String> propEntry : propertyDescriptors) { final PropertyDescriptor propertyDescriptor = propEntry.getKey(); final ControllerServiceNode clone = controllerServiceMapping.get(propEntry.getValue()); reportingTask.setProperty(propertyDescriptor.getName(), clone.getIdentifier()); @@ -490,7 +492,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } private byte[] readFlowFromDisk() throws IOException { - final Path flowPath = NiFiProperties.getInstance().getFlowConfigurationFile().toPath(); + final Path flowPath = nifiProperties.getFlowConfigurationFile().toPath(); if (!Files.exists(flowPath) || Files.size(flowPath) == 0) { return new byte[0]; } @@ -544,7 +546,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask()); final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(), - SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(), componentLog, controller); + SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(), componentLog, controller, nifiProperties); try { reportingTask.getReportingTask().initialize(config); @@ -560,7 +562,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } private void applyReportingTaskScheduleState(final FlowController controller, final ReportingTaskDTO dto, final ReportingTaskNode reportingTask, - final boolean controllerInitialized, final boolean existingFlowEmpty) { + final boolean controllerInitialized, final boolean existingFlowEmpty) { if (!controllerInitialized || existingFlowEmpty) { applyNewReportingTaskScheduleState(controller, dto, reportingTask); } else { @@ -636,7 +638,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } private ProcessGroup updateProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, - final StringEncryptor encryptor, final FlowEncodingVersion encodingVersion) throws ProcessorInstantiationException { + final StringEncryptor encryptor, final FlowEncodingVersion encodingVersion) throws ProcessorInstantiationException { // get the parent group ID final String parentId = (parentGroup == null) ? null : parentGroup.getIdentifier(); @@ -792,10 +794,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { if (inputPort.getScheduledState() != ScheduledState.RUNNING && inputPort.getScheduledState() != ScheduledState.STARTING) { rpg.startTransmitting(inputPort); } - } else { - if (inputPort.getScheduledState() != ScheduledState.STOPPED && inputPort.getScheduledState() != ScheduledState.STOPPING) { - rpg.stopTransmitting(inputPort); - } + } else if (inputPort.getScheduledState() != ScheduledState.STOPPED && inputPort.getScheduledState() != ScheduledState.STOPPING) { + rpg.stopTransmitting(inputPort); } } @@ -813,15 +813,12 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { if (outputPort.getScheduledState() != ScheduledState.RUNNING && outputPort.getScheduledState() != ScheduledState.STARTING) { rpg.startTransmitting(outputPort); } - } else { - if (outputPort.getScheduledState() != ScheduledState.STOPPED && outputPort.getScheduledState() != ScheduledState.STOPPING) { - rpg.stopTransmitting(outputPort); - } + } else if (outputPort.getScheduledState() != ScheduledState.STOPPED && outputPort.getScheduledState() != ScheduledState.STOPPING) { + rpg.stopTransmitting(outputPort); } } } - // add labels final List<Element> labelNodeList = getChildrenByTagName(processGroupElement, "label"); for (final Element labelElement : labelNodeList) { @@ -969,7 +966,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } private ProcessGroup addProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, - final StringEncryptor encryptor, final FlowEncodingVersion encodingVersion) throws ProcessorInstantiationException { + final StringEncryptor encryptor, final FlowEncodingVersion encodingVersion) throws ProcessorInstantiationException { // get the parent group ID final String parentId = (parentGroup == null) ? null : parentGroup.getIdentifier(); @@ -1285,12 +1282,15 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } /** - * If both authorizers are external authorizers, or if the both are internal authorizers with equal fingerprints, - * then an uniheritable result with no reason is returned to indicate nothing to do. + * If both authorizers are external authorizers, or if the both are internal + * authorizers with equal fingerprints, then an uniheritable result with no + * reason is returned to indicate nothing to do. * - * If both are internal authorizers and the current authorizer is empty, then an inheritable result is returned. + * If both are internal authorizers and the current authorizer is empty, + * then an inheritable result is returned. * - * All other cases return uninheritable with a reason which indicates to throw an exception. + * All other cases return uninheritable with a reason which indicates to + * throw an exception. * * @param existingFlow the existing DataFlow * @param proposedFlow the proposed DataFlow @@ -1336,13 +1336,15 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } /** - * Returns true if the given controller can inherit the proposed flow without orphaning flow files. + * Returns true if the given controller can inherit the proposed flow + * without orphaning flow files. * * @param existingFlow flow * @param controller the running controller * @param proposedFlow the flow to inherit * - * @return null if the controller can inherit the specified flow, an explanation of why it cannot be inherited otherwise + * @return null if the controller can inherit the specified flow, an + * explanation of why it cannot be inherited otherwise * * @throws FingerprintException if flow fingerprints could not be generated */ @@ -1453,7 +1455,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } /** - * Holder for the result of determining if a proposed Authorizer is inheritable. + * Holder for the result of determining if a proposed Authorizer is + * inheritable. */ private static final class AuthorizerInheritability { http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index ab37aee..5ff97ef 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -127,22 +127,23 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable private final Requirement inputRequirement; private final ProcessScheduler processScheduler; private long runNanos = 0L; + private final NiFiProperties nifiProperties; private SchedulingStrategy schedulingStrategy; // guarded by read/write lock // ??????? NOT any more public StandardProcessorNode(final Processor processor, final String uuid, final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, - final ControllerServiceProvider controllerServiceProvider) { + final ControllerServiceProvider controllerServiceProvider, final NiFiProperties nifiProperties) { this(processor, uuid, validationContextFactory, scheduler, controllerServiceProvider, - processor.getClass().getSimpleName(), processor.getClass().getCanonicalName()); + processor.getClass().getSimpleName(), processor.getClass().getCanonicalName(), nifiProperties); } public StandardProcessorNode(final Processor processor, final String uuid, final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, final ControllerServiceProvider controllerServiceProvider, - final String componentType, final String componentCanonicalClass) { + final String componentType, final String componentCanonicalClass, final NiFiProperties nifiProperties) { super(processor, uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass); @@ -166,6 +167,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable processScheduler = scheduler; isolated = new AtomicBoolean(false); penalizationPeriod = new AtomicReference<>(DEFAULT_PENALIZATION_PERIOD); + this.nifiProperties = nifiProperties; final Class<?> procClass = processor.getClass(); triggerWhenEmpty = procClass.isAnnotationPresent(TriggerWhenEmpty.class); @@ -1374,7 +1376,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable * </p> */ private <T> void invokeTaskAsCancelableFuture(final SchedulingAgentCallback callback, final Callable<T> task) { - final String timeoutString = NiFiProperties.getInstance().getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT); + final String timeoutString = nifiProperties.getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT); final long onScheduleTimeout = timeoutString == null ? 60000 : FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS); final Future<?> taskFuture = callback.invokeMonitoringTask(task); http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java index 0240318..275fd3e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java @@ -14,12 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.controller.cluster; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.Properties; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; @@ -28,17 +26,20 @@ import org.apache.curator.retry.RetryNTimes; import org.apache.nifi.cluster.protocol.NodeProtocolSender; import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.util.NiFiProperties; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Uses ZooKeeper in order to determine which node is the elected Cluster Coordinator and to indicate - * that this node is part of the cluster. However, once the Cluster Coordinator is known, heartbeats are - * sent directly to the Cluster Coordinator. + * Uses ZooKeeper in order to determine which node is the elected Cluster + * Coordinator and to indicate that this node is part of the cluster. However, + * once the Cluster Coordinator is known, heartbeats are sent directly to the + * Cluster Coordinator. */ public class ClusterProtocolHeartbeater implements Heartbeater { + private static final Logger logger = LoggerFactory.getLogger(ClusterProtocolHeartbeater.class); private final NodeProtocolSender protocolSender; @@ -48,15 +49,14 @@ public class ClusterProtocolHeartbeater implements Heartbeater { private final String coordinatorPath; private volatile String coordinatorAddress; - - public ClusterProtocolHeartbeater(final NodeProtocolSender protocolSender, final Properties properties) { + public ClusterProtocolHeartbeater(final NodeProtocolSender protocolSender, final NiFiProperties nifiProperties) { this.protocolSender = protocolSender; final RetryPolicy retryPolicy = new RetryNTimes(10, 500); - final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(properties); + final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(nifiProperties); curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(), - zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy); + zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy); curatorClient.start(); nodesPathPrefix = zkConfig.resolvePath("cluster/nodes"); @@ -87,7 +87,6 @@ public class ClusterProtocolHeartbeater implements Heartbeater { } } - @Override public synchronized void send(final HeartbeatMessage heartbeatMessage) throws IOException { final String heartbeatAddress = getHeartbeatAddress(); @@ -107,7 +106,6 @@ public class ClusterProtocolHeartbeater implements Heartbeater { } } - @Override public void close() throws IOException { if (curatorClient != null) { http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java index 52cd7ec..ffd4046 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java @@ -18,7 +18,6 @@ package org.apache.nifi.controller.cluster; import java.util.ArrayList; import java.util.List; -import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import org.apache.commons.lang3.StringUtils; @@ -70,8 +69,8 @@ public class ZooKeeperClientConfig { return rootPath + "/" + path; } - public static ZooKeeperClientConfig createConfig(final Properties properties) { - final String connectString = properties.getProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING); + public static ZooKeeperClientConfig createConfig(final NiFiProperties nifiProperties) { + final String connectString = nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING); if (connectString == null || connectString.trim().isEmpty()) { throw new IllegalStateException("The '" + NiFiProperties.ZOOKEEPER_CONNECT_STRING + "' property is not set in nifi.properties"); } @@ -79,9 +78,9 @@ public class ZooKeeperClientConfig { if (cleanedConnectString.isEmpty()) { throw new IllegalStateException("The '" + NiFiProperties.ZOOKEEPER_CONNECT_STRING + "' property is set in nifi.properties but needs to be in pairs of host:port separated by commas"); } - final long sessionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT); - final long connectionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT); - final String rootPath = properties.getProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, NiFiProperties.DEFAULT_ZOOKEEPER_ROOT_NODE); + final long sessionTimeoutMs = getTimePeriod(nifiProperties, NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT); + final long connectionTimeoutMs = getTimePeriod(nifiProperties, NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT); + final String rootPath = nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, NiFiProperties.DEFAULT_ZOOKEEPER_ROOT_NODE); try { PathUtils.validatePath(rootPath); @@ -92,8 +91,8 @@ public class ZooKeeperClientConfig { return new ZooKeeperClientConfig(cleanedConnectString, (int) sessionTimeoutMs, (int) connectionTimeoutMs, rootPath); } - private static int getTimePeriod(final Properties properties, final String propertyName, final String defaultValue) { - final String timeout = properties.getProperty(propertyName, defaultValue); + private static int getTimePeriod(final NiFiProperties nifiProperties, final String propertyName, final String defaultValue) { + final String timeout = nifiProperties.getProperty(propertyName, defaultValue); try { return (int) FormatUtils.getTimeDuration(timeout, TimeUnit.MILLISECONDS); } catch (final Exception e) { http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java index 7bf7494..3ef2b8b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java @@ -14,12 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.controller.leader.election; import java.util.HashMap; import java.util.Map; -import java.util.Properties; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; @@ -37,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class CuratorLeaderElectionManager implements LeaderElectionManager { + private static final Logger logger = LoggerFactory.getLogger(CuratorLeaderElectionManager.class); private final FlowEngine leaderElectionMonitorEngine; @@ -49,17 +48,11 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { private final Map<String, LeaderRole> leaderRoles = new HashMap<>(); private final Map<String, LeaderElectionStateChangeListener> registeredRoles = new HashMap<>(); - - public CuratorLeaderElectionManager(final int threadPoolSize) { - this(threadPoolSize, NiFiProperties.getInstance()); - } - - public CuratorLeaderElectionManager(final int threadPoolSize, final Properties properties) { + public CuratorLeaderElectionManager(final int threadPoolSize, final NiFiProperties properties) { leaderElectionMonitorEngine = new FlowEngine(threadPoolSize, "Leader Election Notification", true); zkConfig = ZooKeeperClientConfig.createConfig(properties); } - @Override public synchronized void start() { if (!stopped) { @@ -70,12 +63,12 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { final RetryPolicy retryPolicy = new RetryForever(5000); curatorClient = CuratorFrameworkFactory.builder() - .connectString(zkConfig.getConnectString()) - .sessionTimeoutMs(zkConfig.getSessionTimeoutMillis()) - .connectionTimeoutMs(zkConfig.getConnectionTimeoutMillis()) - .retryPolicy(retryPolicy) - .defaultData(new byte[0]) - .build(); + .connectString(zkConfig.getConnectString()) + .sessionTimeoutMs(zkConfig.getSessionTimeoutMillis()) + .connectionTimeoutMs(zkConfig.getConnectionTimeoutMillis()) + .retryPolicy(retryPolicy) + .defaultData(new byte[0]) + .build(); curatorClient.start(); @@ -89,13 +82,11 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { logger.info("{} started", this); } - @Override public synchronized void register(final String roleName) { register(roleName, null); } - @Override public synchronized void register(final String roleName, final LeaderElectionStateChangeListener listener) { logger.debug("{} Registering new Leader Selector for role {}", this, roleName); @@ -168,13 +159,11 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { return stopped; } - @Override public String toString() { return "CuratorLeaderElectionManager[stopped=" + isStopped() + "]"; } - @Override public synchronized boolean isLeader(final String roleName) { final LeaderRole role = leaderRoles.get(roleName); @@ -185,8 +174,8 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { return role.isLeader(); } - private static class LeaderRole { + private final LeaderSelector leaderSelector; private final ElectionListener electionListener; @@ -204,8 +193,8 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { } } - private class ElectionListener extends LeaderSelectorListenerAdapter implements LeaderSelectorListener { + private final String roleName; private final LeaderElectionStateChangeListener listener; http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java index caf38a8..ae63543 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.controller.reporting; +import java.io.File; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -26,6 +27,7 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; public class StandardReportingInitializationContext implements ReportingInitializationContext, ControllerServiceLookup { @@ -35,15 +37,19 @@ public class StandardReportingInitializationContext implements ReportingInitiali private final SchedulingStrategy schedulingStrategy; private final ControllerServiceProvider serviceProvider; private final ComponentLog logger; + private final NiFiProperties nifiProperties; - public StandardReportingInitializationContext(final String id, final String name, final SchedulingStrategy schedulingStrategy, - final String schedulingPeriod, final ComponentLog logger, final ControllerServiceProvider serviceProvider) { + public StandardReportingInitializationContext( + final String id, final String name, final SchedulingStrategy schedulingStrategy, + final String schedulingPeriod, final ComponentLog logger, + final ControllerServiceProvider serviceProvider, final NiFiProperties nifiProperties) { this.id = id; this.name = name; this.schedulingPeriod = schedulingPeriod; this.serviceProvider = serviceProvider; this.schedulingStrategy = schedulingStrategy; this.logger = logger; + this.nifiProperties = nifiProperties; } @Override @@ -113,4 +119,19 @@ public class StandardReportingInitializationContext implements ReportingInitiali public ComponentLog getLogger() { return logger; } + + @Override + public String getKerberosServicePrincipal() { + return nifiProperties.getKerberosServicePrincipal(); + } + + @Override + public File getKerberosServiceKeytab() { + return new File(nifiProperties.getKerberosKeytabLocation()); + } + + @Override + public File getKerberosConfigurationFile() { + return nifiProperties.getKerberosConfigurationFile(); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/7d7401ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index 8cf2401..4e70e7b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -126,10 +126,26 @@ public class FileSystemRepository implements ContentRepository { // guarded by synchronizing on this private final AtomicLong oldestArchiveDate = new AtomicLong(0L); - public FileSystemRepository() throws IOException { - final NiFiProperties properties = NiFiProperties.getInstance(); + private final NiFiProperties nifiProperties; + + /** + * Default no args constructor for service loading only + */ + public FileSystemRepository() { + containers = null; + containerNames = null; + index = null; + archiveData = false; + maxArchiveMillis = 0; + alwaysSync = false; + containerCleanupExecutor = null; + nifiProperties = null; + } + + public FileSystemRepository(final NiFiProperties nifiProperties) throws IOException { + this.nifiProperties = nifiProperties; // determine the file repository paths and ensure they exist - final Map<String, Path> fileRespositoryPaths = properties.getContentRepositoryPaths(); + final Map<String, Path> fileRespositoryPaths = nifiProperties.getContentRepositoryPaths(); for (final Path path : fileRespositoryPaths.values()) { Files.createDirectories(path); } @@ -139,21 +155,21 @@ public class FileSystemRepository implements ContentRepository { index = new AtomicLong(0L); for (final String containerName : containerNames) { - reclaimable.put(containerName, new LinkedBlockingQueue<ResourceClaim>(10000)); - archivedFiles.put(containerName, new LinkedBlockingQueue<ArchiveInfo>(100000)); + reclaimable.put(containerName, new LinkedBlockingQueue<>(10000)); + archivedFiles.put(containerName, new LinkedBlockingQueue<>(100000)); } - final String enableArchiving = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_ENABLED); - final String maxArchiveRetentionPeriod = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_MAX_RETENTION_PERIOD); - final String maxArchiveSize = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE); - final String archiveBackPressureSize = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_BACK_PRESSURE_PERCENTAGE); + final String enableArchiving = nifiProperties.getProperty(NiFiProperties.CONTENT_ARCHIVE_ENABLED); + final String maxArchiveRetentionPeriod = nifiProperties.getProperty(NiFiProperties.CONTENT_ARCHIVE_MAX_RETENTION_PERIOD); + final String maxArchiveSize = nifiProperties.getProperty(NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE); + final String archiveBackPressureSize = nifiProperties.getProperty(NiFiProperties.CONTENT_ARCHIVE_BACK_PRESSURE_PERCENTAGE); if ("true".equalsIgnoreCase(enableArchiving)) { archiveData = true; if (maxArchiveSize == null) { throw new RuntimeException("No value specified for property '" - + NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE + "' but archiving is enabled. You must configure the max disk usage in order to enable archiving."); + + NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE + "' but archiving is enabled. You must configure the max disk usage in order to enable archiving."); } if (!MAX_ARCHIVE_SIZE_PATTERN.matcher(maxArchiveSize.trim()).matches()) { @@ -187,7 +203,7 @@ public class FileSystemRepository implements ContentRepository { final long maxArchiveBytes = (long) (capacity * (1D - (maxArchiveRatio - 0.02))); minUsableContainerBytesForArchive.put(container.getKey(), Long.valueOf(maxArchiveBytes)); LOG.info("Maximum Threshold for Container {} set to {} bytes; if volume exceeds this size, archived data will be deleted until it no longer exceeds this size", - containerName, maxArchiveBytes); + containerName, maxArchiveBytes); final long backPressureBytes = (long) (Files.getFileStore(container.getValue()).getTotalSpace() * archiveBackPressureRatio); final ContainerState containerState = new ContainerState(containerName, true, backPressureBytes, capacity); @@ -205,7 +221,7 @@ public class FileSystemRepository implements ContentRepository { maxArchiveMillis = StringUtils.isEmpty(maxArchiveRetentionPeriod) ? Long.MAX_VALUE : FormatUtils.getTimeDuration(maxArchiveRetentionPeriod, TimeUnit.MILLISECONDS); } - this.alwaysSync = Boolean.parseBoolean(properties.getProperty("nifi.content.repository.always.sync")); + this.alwaysSync = Boolean.parseBoolean(nifiProperties.getProperty("nifi.content.repository.always.sync")); LOG.info("Initializing FileSystemRepository with 'Always Sync' set to {}", alwaysSync); initializeRepository(); @@ -216,16 +232,14 @@ public class FileSystemRepository implements ContentRepository { public void initialize(final ResourceClaimManager claimManager) { this.resourceClaimManager = claimManager; - final NiFiProperties properties = NiFiProperties.getInstance(); - - final Map<String, Path> fileRespositoryPaths = properties.getContentRepositoryPaths(); + final Map<String, Path> fileRespositoryPaths = nifiProperties.getContentRepositoryPaths(); executor.scheduleWithFixedDelay(new BinDestructableClaims(), 1, 1, TimeUnit.SECONDS); for (int i = 0; i < fileRespositoryPaths.size(); i++) { executor.scheduleWithFixedDelay(new ArchiveOrDestroyDestructableClaims(), 1, 1, TimeUnit.SECONDS); } - final long cleanupMillis = this.determineCleanupInterval(properties); + final long cleanupMillis = this.determineCleanupInterval(nifiProperties); for (final Map.Entry<String, Path> containerEntry : containers.entrySet()) { final String containerName = containerEntry.getKey(); @@ -562,7 +576,6 @@ public class FileSystemRepository implements ContentRepository { return resourceClaimManager.incrementClaimantCount(resourceClaim, newClaim); } - @Override public int getClaimantCount(final ContentClaim claim) { if (claim == null) { @@ -619,14 +632,13 @@ public class FileSystemRepository implements ContentRepository { final File file = path.toFile(); if (!file.delete() && file.exists()) { - LOG.warn("Unable to delete {} at path {}", new Object[] {claim, path}); + LOG.warn("Unable to delete {} at path {}", new Object[]{claim, path}); return false; } return true; } - @Override public ContentClaim clone(final ContentClaim original, final boolean lossTolerant) throws IOException { if (original == null) { @@ -635,7 +647,7 @@ public class FileSystemRepository implements ContentRepository { final ContentClaim newClaim = create(lossTolerant); try (final InputStream in = read(original); - final OutputStream out = write(newClaim)) { + final OutputStream out = write(newClaim)) { StreamUtils.copy(in, out); } catch (final IOException ioe) { decrementClaimantCount(newClaim); @@ -700,7 +712,7 @@ public class FileSystemRepository implements ContentRepository { } try (final InputStream in = read(claim); - final FileOutputStream fos = new FileOutputStream(destination.toFile(), append)) { + final FileOutputStream fos = new FileOutputStream(destination.toFile(), append)) { final long copied = StreamUtils.copy(in, fos); if (alwaysSync) { fos.getFD().sync(); @@ -729,7 +741,7 @@ public class FileSystemRepository implements ContentRepository { } try (final InputStream in = read(claim); - final FileOutputStream fos = new FileOutputStream(destination.toFile(), append)) { + final FileOutputStream fos = new FileOutputStream(destination.toFile(), append)) { if (offset > 0) { StreamUtils.skip(in, offset); } @@ -801,7 +813,7 @@ public class FileSystemRepository implements ContentRepository { if (claim.getOffset() > 0L) { try { StreamUtils.skip(fis, claim.getOffset()); - } catch(IOException ioe) { + } catch (IOException ioe) { IOUtils.closeQuietly(fis); throw ioe; } @@ -821,7 +833,6 @@ public class FileSystemRepository implements ContentRepository { return write(claim, false); } - private OutputStream write(final ContentClaim claim, final boolean append) throws IOException { if (claim == null) { throw new NullPointerException("ContentClaim cannot be null"); @@ -973,7 +984,6 @@ public class FileSystemRepository implements ContentRepository { return out; } - @Override public void purge() { // delete all content from repositories @@ -1035,7 +1045,7 @@ public class FileSystemRepository implements ContentRepository { break; } else { LOG.warn("Failed to clean up {} because old claims aren't being cleaned up fast enough. " - + "This Content Claim will remain in the Content Repository until NiFi is restarted, at which point it will be cleaned up", claim); + + "This Content Claim will remain in the Content Repository until NiFi is restarted, at which point it will be cleaned up", claim); } } } catch (final InterruptedException ie) { @@ -1210,10 +1220,10 @@ public class FileSystemRepository implements ContentRepository { if (archiveExpirationLog.isDebugEnabled()) { if (toFree < 0) { archiveExpirationLog.debug("Currently {} bytes free for Container {}; requirement is {} byte free, so no need to free space until an additional {} bytes are used", - usableSpace, containerName, minRequiredSpace, Math.abs(toFree)); + usableSpace, containerName, minRequiredSpace, Math.abs(toFree)); } else { archiveExpirationLog.debug("Currently {} bytes free for Container {}; requirement is {} byte free, so need to free {} bytes", - usableSpace, containerName, minRequiredSpace, toFree); + usableSpace, containerName, minRequiredSpace, toFree); } } @@ -1256,10 +1266,10 @@ public class FileSystemRepository implements ContentRepository { final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); if (deleteCount > 0) { LOG.info("Deleted {} files from archive for Container {}; oldest Archive Date is now {}; container cleanup took {} millis", - deleteCount, containerName, new Date(oldestArchiveDate), millis); + deleteCount, containerName, new Date(oldestArchiveDate), millis); } else { LOG.debug("Deleted {} files from archive for Container {}; oldest Archive Date is now {}; container cleanup took {} millis", - deleteCount, containerName, new Date(oldestArchiveDate), millis); + deleteCount, containerName, new Date(oldestArchiveDate), millis); } return oldestArchiveDate; @@ -1297,7 +1307,7 @@ public class FileSystemRepository implements ContentRepository { Files.deleteIfExists(file); containerState.decrementArchiveCount(); LOG.debug("Deleted archived ContentClaim with ID {} from Container {} because it was older than the configured max archival duration", - file.toFile().getName(), containerName); + file.toFile().getName(), containerName); } catch (final IOException ioe) { LOG.warn("Failed to remove archived ContentClaim with ID {} from Container {} due to {}", file.toFile().getName(), containerName, ioe.toString()); if (LOG.isDebugEnabled()) { @@ -1380,7 +1390,7 @@ public class FileSystemRepository implements ContentRepository { final long cleanupMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS) - deleteOldestMillis - sortRemainingMillis - deleteExpiredMillis; LOG.debug("Oldest Archive Date for Container {} is {}; delete expired = {} ms, sort remaining = {} ms, delete oldest = {} ms, cleanup = {} ms", - containerName, new Date(oldestContainerArchive), deleteExpiredMillis, sortRemainingMillis, deleteOldestMillis, cleanupMillis); + containerName, new Date(oldestContainerArchive), deleteExpiredMillis, sortRemainingMillis, deleteOldestMillis, cleanupMillis); return oldestContainerArchive; } @@ -1420,10 +1430,8 @@ public class FileSystemRepository implements ContentRepository { LOG.warn("", e); } } - } else { - if (remove(claim)) { - successCount++; - } + } else if (remove(claim)) { + successCount++; } } @@ -1571,7 +1579,9 @@ public class FileSystemRepository implements ContentRepository { } /** - * @return {@code true} if wait is required to create claims against this Container, based on whether or not the container has reached its back pressure threshold + * @return {@code true} if wait is required to create claims against + * this Container, based on whether or not the container has reached its + * back pressure threshold */ public boolean isWaitRequired() { if (!archiveEnabled) { @@ -1642,8 +1652,8 @@ public class FileSystemRepository implements ContentRepository { } } - private static class ClaimLengthPair { + private final ResourceClaim claim; private final Long length;