rfellows commented on code in PR #8246: URL: https://github.com/apache/nifi/pull/8246#discussion_r1453611366
########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/queue/queue.effects.ts: ########## @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Injectable } from '@angular/core'; +import { Actions, createEffect, ofType } from '@ngrx/effects'; +import * as QueueActions from './queue.actions'; +import { Store } from '@ngrx/store'; +import { + asyncScheduler, + catchError, + filter, + from, + interval, + map, + of, + switchMap, + take, + takeUntil, + tap, + withLatestFrom +} from 'rxjs'; +import { selectDropConnectionId, selectDropProcessGroupId, selectDropRequestEntity } from './queue.selectors'; +import { QueueService } from '../../service/queue.service'; +import { DropRequest } from './index'; +import { CancelDialog } from '../../../../ui/common/cancel-dialog/cancel-dialog.component'; +import { MatDialog } from '@angular/material/dialog'; +import { NiFiCommon } from '../../../../service/nifi-common.service'; +import { isDefinedAndNotNull } from '../../../../state/shared'; +import { YesNoDialog } from '../../../../ui/common/yes-no-dialog/yes-no-dialog.component'; +import { OkDialog } from '../../../../ui/common/ok-dialog/ok-dialog.component'; +import { loadConnection, loadProcessGroup } from '../flow/flow.actions'; +import { resetQueueState } from './queue.actions'; + +@Injectable() +export class QueueEffects { + constructor( + private actions$: Actions, + private store: Store<CanvasState>, + private queueService: QueueService, + private dialog: MatDialog, + private nifiCommon: NiFiCommon + ) {} + + promptEmptyQueueRequest$ = createEffect( + () => + this.actions$.pipe( + ofType(QueueActions.promptEmptyQueueRequest), + map((action) => action.request), + tap((request) => { + const dialogReference = this.dialog.open(YesNoDialog, { + data: { + title: 'Empty Queue', + message: + 'Are you sure you want to empty this queue? All FlowFiles waiting at the time of the request will be removed.' + }, + panelClass: 'small-dialog' + }); + + dialogReference.componentInstance.yes.pipe(take(1)).subscribe(() => { + this.store.dispatch( + QueueActions.submitEmptyQueueRequest({ + request + }) + ); + }); + }) + ), + { dispatch: false } + ); + + submitEmptyQueueRequest$ = createEffect(() => + this.actions$.pipe( + ofType(QueueActions.submitEmptyQueueRequest), + map((action) => action.request), + switchMap((request) => { + const dialogReference = this.dialog.open(CancelDialog, { + data: { + title: 'Empty Queue', + message: 'Waiting for queue to empty...' + }, + disableClose: true, + panelClass: 'small-dialog' + }); + + dialogReference.componentInstance.cancel.pipe(take(1)).subscribe(() => { + this.store.dispatch(QueueActions.stopPollingEmptyQueueRequest()); + }); + + return from(this.queueService.submitEmptyQueueRequest(request)).pipe( + map((response) => + QueueActions.submitEmptyQueueRequestSuccess({ + response: { + dropEntity: response + } + }) + ), + catchError((error) => + of( + QueueActions.queueApiError({ + error: error.error + }) + ) + ) + ); + }) + ) + ); + + promptEmptyQueuesRequest$ = createEffect( + () => + this.actions$.pipe( + ofType(QueueActions.promptEmptyQueuesRequest), + map((action) => action.request), + tap((request) => { + const dialogReference = this.dialog.open(YesNoDialog, { + data: { + title: 'Empty All Queues', + message: + 'Are you sure you want to empty all queues in this Process Group? All FlowFiles from all connections waiting at the time of the request will be removed.' + }, + panelClass: 'small-dialog' + }); + + dialogReference.componentInstance.yes.pipe(take(1)).subscribe(() => { + this.store.dispatch( + QueueActions.submitEmptyQueuesRequest({ + request + }) + ); + }); + }) + ), + { dispatch: false } + ); + + submitEmptyQueuesRequest$ = createEffect(() => + this.actions$.pipe( + ofType(QueueActions.submitEmptyQueuesRequest), + map((action) => action.request), + switchMap((request) => { + const dialogReference = this.dialog.open(CancelDialog, { + data: { + title: 'Empty All Queues', + message: 'Waiting for all queues to empty...' + }, + disableClose: true, + panelClass: 'small-dialog' + }); + + dialogReference.componentInstance.cancel.pipe(take(1)).subscribe(() => { + this.store.dispatch(QueueActions.stopPollingEmptyQueueRequest()); + }); + + return from(this.queueService.submitEmptyQueuesRequest(request)).pipe( + map((response) => + QueueActions.submitEmptyQueueRequestSuccess({ + response: { + dropEntity: response + } + }) + ), + catchError((error) => + of( + QueueActions.queueApiError({ + error: error.error + }) + ) + ) + ); + }) + ) + ); + + submitEmptyQueueRequestSuccess$ = createEffect(() => + this.actions$.pipe( + ofType(QueueActions.submitEmptyQueueRequestSuccess), + map((action) => action.response), + switchMap((response) => { + const dropRequest: DropRequest = response.dropEntity.dropRequest; + if (dropRequest.finished) { + return of(QueueActions.deleteEmptyQueueRequest()); + } else { + return of(QueueActions.startPollingEmptyQueueRequest()); + } + }) + ) + ); + + startPollingEmptyQueueRequest$ = createEffect(() => + this.actions$.pipe( + ofType(QueueActions.startPollingEmptyQueueRequest), + switchMap(() => + interval(2000, asyncScheduler).pipe( + takeUntil(this.actions$.pipe(ofType(QueueActions.stopPollingEmptyQueueRequest))) + ) + ), + switchMap(() => of(QueueActions.pollEmptyQueueRequest())) + ) + ); + + pollEmptyQueueRequest$ = createEffect(() => + this.actions$.pipe( + ofType(QueueActions.pollEmptyQueueRequest), + withLatestFrom(this.store.select(selectDropRequestEntity).pipe(isDefinedAndNotNull())), + switchMap(([action, dropEntity]) => { + return from(this.queueService.pollEmptyQueueRequest(dropEntity.dropRequest)).pipe( + map((response) => + QueueActions.pollEmptyQueueRequestSuccess({ + response: { + dropEntity: response + } + }) + ), + catchError((error) => + of( + QueueActions.queueApiError({ + error: error.error + }) + ) + ) + ); + }) + ) + ); + + pollEmptyQueueRequestSuccess$ = createEffect(() => + this.actions$.pipe( + ofType(QueueActions.pollEmptyQueueRequestSuccess), + map((action) => action.response), + filter((response) => response.dropEntity.dropRequest.finished), + switchMap((response) => of(QueueActions.stopPollingEmptyQueueRequest())) + ) + ); + + stopPollingEmptyQueueRequest$ = createEffect(() => + this.actions$.pipe( + ofType(QueueActions.stopPollingEmptyQueueRequest), + switchMap((response) => of(QueueActions.deleteEmptyQueueRequest())) + ) + ); + + deleteEmptyQueueRequest$ = createEffect(() => + this.actions$.pipe( + ofType(QueueActions.deleteEmptyQueueRequest), + withLatestFrom(this.store.select(selectDropRequestEntity).pipe(isDefinedAndNotNull())), + switchMap(([action, dropEntity]) => { + this.dialog.closeAll(); + + return from(this.queueService.deleteEmptyQueueRequest(dropEntity.dropRequest)).pipe( + map((response) => + QueueActions.showEmptyQueueResults({ + request: { + dropEntity: response + } + }) + ), + catchError((error) => + of( + QueueActions.showEmptyQueueResults({ + request: { + dropEntity + } + }) + ) + ) + ); + }) + ) + ); Review Comment: The `isDefinedAndNotNull` function seems to be doing too much. Meaning it doesn't work how we expect it to in this case. It is effectively giving us the latest DropRequestEntity that was not null/undefined and not just filtering out the ones that are null/undefined. What makes it even more odd is that the entity doesn't even have to be in the store, there is a reference to the last one that wasn't null somehow. Scenario: * Empty a single queue, get success. * Empty all queues where the request fails (remove view access to one of the processors connected to a queue) * Click cancel on the waiting dialog since it won't go away on its own. * See the dialog present itself with the results from the previous successful empty queue action. ![Kapture 2024-01-16 at 10 38 01](https://github.com/apache/nifi/assets/713866/661847bb-89a0-4ea2-8e38-946c262a01e4) ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/queue/queue.effects.ts: ########## @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Injectable } from '@angular/core'; +import { Actions, createEffect, ofType } from '@ngrx/effects'; +import * as QueueActions from './queue.actions'; +import { Store } from '@ngrx/store'; +import { + asyncScheduler, + catchError, + filter, + from, + interval, + map, + of, + switchMap, + take, + takeUntil, + tap, + withLatestFrom +} from 'rxjs'; +import { selectDropConnectionId, selectDropProcessGroupId, selectDropRequestEntity } from './queue.selectors'; +import { QueueService } from '../../service/queue.service'; +import { DropRequest } from './index'; +import { CancelDialog } from '../../../../ui/common/cancel-dialog/cancel-dialog.component'; +import { MatDialog } from '@angular/material/dialog'; +import { NiFiCommon } from '../../../../service/nifi-common.service'; +import { isDefinedAndNotNull } from '../../../../state/shared'; +import { YesNoDialog } from '../../../../ui/common/yes-no-dialog/yes-no-dialog.component'; +import { OkDialog } from '../../../../ui/common/ok-dialog/ok-dialog.component'; +import { loadConnection, loadProcessGroup } from '../flow/flow.actions'; +import { resetQueueState } from './queue.actions'; + +@Injectable() +export class QueueEffects { + constructor( + private actions$: Actions, + private store: Store<CanvasState>, + private queueService: QueueService, + private dialog: MatDialog, + private nifiCommon: NiFiCommon + ) {} + + promptEmptyQueueRequest$ = createEffect( + () => + this.actions$.pipe( + ofType(QueueActions.promptEmptyQueueRequest), + map((action) => action.request), + tap((request) => { + const dialogReference = this.dialog.open(YesNoDialog, { + data: { + title: 'Empty Queue', + message: + 'Are you sure you want to empty this queue? All FlowFiles waiting at the time of the request will be removed.' + }, + panelClass: 'small-dialog' + }); + + dialogReference.componentInstance.yes.pipe(take(1)).subscribe(() => { + this.store.dispatch( + QueueActions.submitEmptyQueueRequest({ + request + }) + ); + }); + }) + ), + { dispatch: false } + ); + + submitEmptyQueueRequest$ = createEffect(() => + this.actions$.pipe( + ofType(QueueActions.submitEmptyQueueRequest), + map((action) => action.request), + switchMap((request) => { + const dialogReference = this.dialog.open(CancelDialog, { + data: { + title: 'Empty Queue', + message: 'Waiting for queue to empty...' + }, + disableClose: true, + panelClass: 'small-dialog' + }); + + dialogReference.componentInstance.cancel.pipe(take(1)).subscribe(() => { + this.store.dispatch(QueueActions.stopPollingEmptyQueueRequest()); + }); + + return from(this.queueService.submitEmptyQueueRequest(request)).pipe( + map((response) => + QueueActions.submitEmptyQueueRequestSuccess({ + response: { + dropEntity: response + } + }) + ), + catchError((error) => + of( + QueueActions.queueApiError({ + error: error.error + }) + ) + ) + ); + }) + ) + ); + + promptEmptyQueuesRequest$ = createEffect( + () => + this.actions$.pipe( + ofType(QueueActions.promptEmptyQueuesRequest), + map((action) => action.request), + tap((request) => { + const dialogReference = this.dialog.open(YesNoDialog, { + data: { + title: 'Empty All Queues', + message: + 'Are you sure you want to empty all queues in this Process Group? All FlowFiles from all connections waiting at the time of the request will be removed.' + }, + panelClass: 'small-dialog' + }); + + dialogReference.componentInstance.yes.pipe(take(1)).subscribe(() => { + this.store.dispatch( + QueueActions.submitEmptyQueuesRequest({ + request + }) + ); + }); + }) + ), + { dispatch: false } + ); + + submitEmptyQueuesRequest$ = createEffect(() => + this.actions$.pipe( + ofType(QueueActions.submitEmptyQueuesRequest), + map((action) => action.request), + switchMap((request) => { + const dialogReference = this.dialog.open(CancelDialog, { + data: { + title: 'Empty All Queues', + message: 'Waiting for all queues to empty...' + }, + disableClose: true, + panelClass: 'small-dialog' + }); + + dialogReference.componentInstance.cancel.pipe(take(1)).subscribe(() => { + this.store.dispatch(QueueActions.stopPollingEmptyQueueRequest()); + }); + + return from(this.queueService.submitEmptyQueuesRequest(request)).pipe( + map((response) => + QueueActions.submitEmptyQueueRequestSuccess({ + response: { + dropEntity: response + } + }) + ), + catchError((error) => + of( + QueueActions.queueApiError({ + error: error.error + }) + ) + ) Review Comment: We should consider closing the dialog in case of an error. While the general error handling approach is TBD, in this scenario the request fails and the user is left with a waiting dialog and no indication that it actually failed. I discovered this when trying to empty all queues and one of my processors is not viewable to my user. The backend responds to the request with a `403: Unable to view Processor with ID 834e1653-018c-1000-37db-5b30738632b9. Contact the system administrator.` I imagine the same issue would exist on the action to empty a single queue but i couldn't find a breaking scenario to verify. ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/queue/queue.effects.ts: ########## @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Injectable } from '@angular/core'; +import { Actions, createEffect, ofType } from '@ngrx/effects'; +import * as QueueActions from './queue.actions'; +import { Store } from '@ngrx/store'; +import { + asyncScheduler, + catchError, + filter, + from, + interval, + map, + of, + switchMap, + take, + takeUntil, + tap, + withLatestFrom +} from 'rxjs'; +import { selectDropConnectionId, selectDropProcessGroupId, selectDropRequestEntity } from './queue.selectors'; +import { QueueService } from '../../service/queue.service'; +import { DropRequest } from './index'; +import { CancelDialog } from '../../../../ui/common/cancel-dialog/cancel-dialog.component'; +import { MatDialog } from '@angular/material/dialog'; +import { NiFiCommon } from '../../../../service/nifi-common.service'; +import { isDefinedAndNotNull } from '../../../../state/shared'; +import { YesNoDialog } from '../../../../ui/common/yes-no-dialog/yes-no-dialog.component'; +import { OkDialog } from '../../../../ui/common/ok-dialog/ok-dialog.component'; +import { loadConnection, loadProcessGroup } from '../flow/flow.actions'; +import { resetQueueState } from './queue.actions'; + +@Injectable() +export class QueueEffects { + constructor( + private actions$: Actions, + private store: Store<CanvasState>, + private queueService: QueueService, + private dialog: MatDialog, + private nifiCommon: NiFiCommon + ) {} + + promptEmptyQueueRequest$ = createEffect( + () => + this.actions$.pipe( + ofType(QueueActions.promptEmptyQueueRequest), + map((action) => action.request), + tap((request) => { + const dialogReference = this.dialog.open(YesNoDialog, { + data: { + title: 'Empty Queue', + message: + 'Are you sure you want to empty this queue? All FlowFiles waiting at the time of the request will be removed.' + }, + panelClass: 'small-dialog' + }); + + dialogReference.componentInstance.yes.pipe(take(1)).subscribe(() => { + this.store.dispatch( + QueueActions.submitEmptyQueueRequest({ + request + }) + ); + }); + }) + ), + { dispatch: false } + ); + + submitEmptyQueueRequest$ = createEffect(() => + this.actions$.pipe( + ofType(QueueActions.submitEmptyQueueRequest), + map((action) => action.request), + switchMap((request) => { + const dialogReference = this.dialog.open(CancelDialog, { + data: { + title: 'Empty Queue', + message: 'Waiting for queue to empty...' + }, + disableClose: true, + panelClass: 'small-dialog' + }); + + dialogReference.componentInstance.cancel.pipe(take(1)).subscribe(() => { + this.store.dispatch(QueueActions.stopPollingEmptyQueueRequest()); + }); + + return from(this.queueService.submitEmptyQueueRequest(request)).pipe( + map((response) => + QueueActions.submitEmptyQueueRequestSuccess({ + response: { + dropEntity: response + } + }) + ), + catchError((error) => + of( + QueueActions.queueApiError({ + error: error.error + }) + ) + ) + ); + }) + ) + ); + + promptEmptyQueuesRequest$ = createEffect( + () => + this.actions$.pipe( + ofType(QueueActions.promptEmptyQueuesRequest), + map((action) => action.request), + tap((request) => { + const dialogReference = this.dialog.open(YesNoDialog, { + data: { + title: 'Empty All Queues', + message: + 'Are you sure you want to empty all queues in this Process Group? All FlowFiles from all connections waiting at the time of the request will be removed.' + }, + panelClass: 'small-dialog' + }); + + dialogReference.componentInstance.yes.pipe(take(1)).subscribe(() => { + this.store.dispatch( + QueueActions.submitEmptyQueuesRequest({ + request + }) + ); + }); + }) + ), + { dispatch: false } + ); + + submitEmptyQueuesRequest$ = createEffect(() => + this.actions$.pipe( + ofType(QueueActions.submitEmptyQueuesRequest), + map((action) => action.request), + switchMap((request) => { + const dialogReference = this.dialog.open(CancelDialog, { + data: { + title: 'Empty All Queues', + message: 'Waiting for all queues to empty...' + }, + disableClose: true, + panelClass: 'small-dialog' + }); + + dialogReference.componentInstance.cancel.pipe(take(1)).subscribe(() => { + this.store.dispatch(QueueActions.stopPollingEmptyQueueRequest()); + }); + + return from(this.queueService.submitEmptyQueuesRequest(request)).pipe( + map((response) => + QueueActions.submitEmptyQueueRequestSuccess({ + response: { + dropEntity: response + } + }) + ), + catchError((error) => + of( + QueueActions.queueApiError({ + error: error.error + }) + ) + ) + ); + }) + ) + ); + + submitEmptyQueueRequestSuccess$ = createEffect(() => + this.actions$.pipe( + ofType(QueueActions.submitEmptyQueueRequestSuccess), + map((action) => action.response), + switchMap((response) => { + const dropRequest: DropRequest = response.dropEntity.dropRequest; + if (dropRequest.finished) { + return of(QueueActions.deleteEmptyQueueRequest()); + } else { + return of(QueueActions.startPollingEmptyQueueRequest()); + } + }) + ) + ); + + startPollingEmptyQueueRequest$ = createEffect(() => + this.actions$.pipe( + ofType(QueueActions.startPollingEmptyQueueRequest), + switchMap(() => + interval(2000, asyncScheduler).pipe( + takeUntil(this.actions$.pipe(ofType(QueueActions.stopPollingEmptyQueueRequest))) + ) + ), + switchMap(() => of(QueueActions.pollEmptyQueueRequest())) + ) + ); + + pollEmptyQueueRequest$ = createEffect(() => + this.actions$.pipe( + ofType(QueueActions.pollEmptyQueueRequest), + withLatestFrom(this.store.select(selectDropRequestEntity).pipe(isDefinedAndNotNull())), + switchMap(([action, dropEntity]) => { + return from(this.queueService.pollEmptyQueueRequest(dropEntity.dropRequest)).pipe( + map((response) => + QueueActions.pollEmptyQueueRequestSuccess({ + response: { + dropEntity: response + } + }) + ), + catchError((error) => + of( + QueueActions.queueApiError({ + error: error.error + }) + ) + ) + ); + }) + ) + ); + + pollEmptyQueueRequestSuccess$ = createEffect(() => + this.actions$.pipe( + ofType(QueueActions.pollEmptyQueueRequestSuccess), + map((action) => action.response), + filter((response) => response.dropEntity.dropRequest.finished), + switchMap((response) => of(QueueActions.stopPollingEmptyQueueRequest())) + ) + ); + + stopPollingEmptyQueueRequest$ = createEffect(() => + this.actions$.pipe( + ofType(QueueActions.stopPollingEmptyQueueRequest), + switchMap((response) => of(QueueActions.deleteEmptyQueueRequest())) + ) + ); + + deleteEmptyQueueRequest$ = createEffect(() => + this.actions$.pipe( + ofType(QueueActions.deleteEmptyQueueRequest), + withLatestFrom(this.store.select(selectDropRequestEntity).pipe(isDefinedAndNotNull())), + switchMap(([action, dropEntity]) => { + this.dialog.closeAll(); + + return from(this.queueService.deleteEmptyQueueRequest(dropEntity.dropRequest)).pipe( + map((response) => + QueueActions.showEmptyQueueResults({ + request: { + dropEntity: response + } + }) + ), + catchError((error) => + of( + QueueActions.showEmptyQueueResults({ + request: { + dropEntity + } + }) + ) + ) + ); + }) + ) + ); Review Comment: _Update_ I think I found a solution. Based on this ngrx doc: https://ngrx.io/guide/effects#incorporating-state, we need to use `concatLatestFrom` rather than `withLatestFrom` to prevent the selector from firing until the correct action is dispatched ``` concatLatestFrom((action) => this.store.select(selectDropRequestEntity).pipe(isDefinedAndNotNull())), ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org