markap14 commented on code in PR #11111:
URL: https://github.com/apache/nifi/pull/11111#discussion_r3074388182
##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java:
##########
@@ -1195,19 +1195,37 @@ private void synchronizeProcessors(final ProcessGroup
group, final VersionedProc
final ProcessGroup topLevelGroup)
throws ProcessorInstantiationException {
- for (final VersionedProcessor proposedProcessor :
proposed.getProcessors()) {
- final ProcessorNode processor =
processorsByVersionedId.get(proposedProcessor.getIdentifier());
- if (processor == null) {
- final ProcessorNode added = addProcessor(group,
proposedProcessor, context.getComponentIdGenerator(), topLevelGroup);
- LOG.info("Added {} to {}", added, group);
- } else if
(updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
- updateProcessor(processor, proposedProcessor, topLevelGroup);
- // Any existing component that is modified during
synchronization may have its properties reverted to a pre-migration state,
- // so we then add it to the set to allow migrateProperties to
be called again to get it back to the migrated state
- createdAndModifiedExtensions.add(new
CreatedOrModifiedExtension(processor, getPropertyValues(processor)));
- LOG.info("Updated {}", processor);
- } else {
- processor.setPosition(new
Position(proposedProcessor.getPosition().getX(),
proposedProcessor.getPosition().getY()));
+ final Set<ProcessorNode> stoppedProcessors = new HashSet<>();
+
+ try {
+ for (final VersionedProcessor proposedProcessor :
proposed.getProcessors()) {
+ final ProcessorNode processor =
processorsByVersionedId.get(proposedProcessor.getIdentifier());
+ if (processor == null) {
+ final ProcessorNode added = addProcessor(group,
proposedProcessor, context.getComponentIdGenerator(), topLevelGroup);
+ LOG.info("Added {} to {}", added, group);
+ } else if
(updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
+ final long processorStopDeadline =
System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis();
+ try {
+ final boolean stopped = stopOrTerminate(processor,
processorStopDeadline, syncOptions);
+ if (stopped && proposedProcessor.getScheduledState()
== org.apache.nifi.flow.ScheduledState.RUNNING) {
+ stoppedProcessors.add(processor);
Review Comment:
I think the name of this collection is misleading - it's not generally just
"stopped processors" - it's processors we want to restart. Perhaps worth
renaming the collection to make it more clear?
##########
nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java:
##########
@@ -1195,19 +1195,37 @@ private void synchronizeProcessors(final ProcessGroup
group, final VersionedProc
final ProcessGroup topLevelGroup)
throws ProcessorInstantiationException {
- for (final VersionedProcessor proposedProcessor :
proposed.getProcessors()) {
- final ProcessorNode processor =
processorsByVersionedId.get(proposedProcessor.getIdentifier());
- if (processor == null) {
- final ProcessorNode added = addProcessor(group,
proposedProcessor, context.getComponentIdGenerator(), topLevelGroup);
- LOG.info("Added {} to {}", added, group);
- } else if
(updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
- updateProcessor(processor, proposedProcessor, topLevelGroup);
- // Any existing component that is modified during
synchronization may have its properties reverted to a pre-migration state,
- // so we then add it to the set to allow migrateProperties to
be called again to get it back to the migrated state
- createdAndModifiedExtensions.add(new
CreatedOrModifiedExtension(processor, getPropertyValues(processor)));
- LOG.info("Updated {}", processor);
- } else {
- processor.setPosition(new
Position(proposedProcessor.getPosition().getX(),
proposedProcessor.getPosition().getY()));
+ final Set<ProcessorNode> stoppedProcessors = new HashSet<>();
+
+ try {
+ for (final VersionedProcessor proposedProcessor :
proposed.getProcessors()) {
+ final ProcessorNode processor =
processorsByVersionedId.get(proposedProcessor.getIdentifier());
+ if (processor == null) {
+ final ProcessorNode added = addProcessor(group,
proposedProcessor, context.getComponentIdGenerator(), topLevelGroup);
+ LOG.info("Added {} to {}", added, group);
+ } else if
(updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
+ final long processorStopDeadline =
System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis();
+ try {
+ final boolean stopped = stopOrTerminate(processor,
processorStopDeadline, syncOptions);
+ if (stopped && proposedProcessor.getScheduledState()
== org.apache.nifi.flow.ScheduledState.RUNNING) {
+ stoppedProcessors.add(processor);
+ }
+ } catch (final TimeoutException |
FlowSynchronizationException e) {
+ throw new
ProcessorInstantiationException(processor.getIdentifier(), e);
Review Comment:
I don't think this is the right Exception - we are not attempting to
instantiate a Processor here, so I'd not throw ProcessorInstantiatonException,
probably FlowSynchronizationException
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]