rfellows commented on code in PR #8246:
URL: https://github.com/apache/nifi/pull/8246#discussion_r1453611366


##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/queue/queue.effects.ts:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Injectable } from '@angular/core';
+import { Actions, createEffect, ofType } from '@ngrx/effects';
+import * as QueueActions from './queue.actions';
+import { Store } from '@ngrx/store';
+import {
+    asyncScheduler,
+    catchError,
+    filter,
+    from,
+    interval,
+    map,
+    of,
+    switchMap,
+    take,
+    takeUntil,
+    tap,
+    withLatestFrom
+} from 'rxjs';
+import { selectDropConnectionId, selectDropProcessGroupId, 
selectDropRequestEntity } from './queue.selectors';
+import { QueueService } from '../../service/queue.service';
+import { DropRequest } from './index';
+import { CancelDialog } from 
'../../../../ui/common/cancel-dialog/cancel-dialog.component';
+import { MatDialog } from '@angular/material/dialog';
+import { NiFiCommon } from '../../../../service/nifi-common.service';
+import { isDefinedAndNotNull } from '../../../../state/shared';
+import { YesNoDialog } from 
'../../../../ui/common/yes-no-dialog/yes-no-dialog.component';
+import { OkDialog } from '../../../../ui/common/ok-dialog/ok-dialog.component';
+import { loadConnection, loadProcessGroup } from '../flow/flow.actions';
+import { resetQueueState } from './queue.actions';
+
+@Injectable()
+export class QueueEffects {
+    constructor(
+        private actions$: Actions,
+        private store: Store<CanvasState>,
+        private queueService: QueueService,
+        private dialog: MatDialog,
+        private nifiCommon: NiFiCommon
+    ) {}
+
+    promptEmptyQueueRequest$ = createEffect(
+        () =>
+            this.actions$.pipe(
+                ofType(QueueActions.promptEmptyQueueRequest),
+                map((action) => action.request),
+                tap((request) => {
+                    const dialogReference = this.dialog.open(YesNoDialog, {
+                        data: {
+                            title: 'Empty Queue',
+                            message:
+                                'Are you sure you want to empty this queue? 
All FlowFiles waiting at the time of the request will be removed.'
+                        },
+                        panelClass: 'small-dialog'
+                    });
+
+                    
dialogReference.componentInstance.yes.pipe(take(1)).subscribe(() => {
+                        this.store.dispatch(
+                            QueueActions.submitEmptyQueueRequest({
+                                request
+                            })
+                        );
+                    });
+                })
+            ),
+        { dispatch: false }
+    );
+
+    submitEmptyQueueRequest$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(QueueActions.submitEmptyQueueRequest),
+            map((action) => action.request),
+            switchMap((request) => {
+                const dialogReference = this.dialog.open(CancelDialog, {
+                    data: {
+                        title: 'Empty Queue',
+                        message: 'Waiting for queue to empty...'
+                    },
+                    disableClose: true,
+                    panelClass: 'small-dialog'
+                });
+
+                
dialogReference.componentInstance.cancel.pipe(take(1)).subscribe(() => {
+                    
this.store.dispatch(QueueActions.stopPollingEmptyQueueRequest());
+                });
+
+                return 
from(this.queueService.submitEmptyQueueRequest(request)).pipe(
+                    map((response) =>
+                        QueueActions.submitEmptyQueueRequestSuccess({
+                            response: {
+                                dropEntity: response
+                            }
+                        })
+                    ),
+                    catchError((error) =>
+                        of(
+                            QueueActions.queueApiError({
+                                error: error.error
+                            })
+                        )
+                    )
+                );
+            })
+        )
+    );
+
+    promptEmptyQueuesRequest$ = createEffect(
+        () =>
+            this.actions$.pipe(
+                ofType(QueueActions.promptEmptyQueuesRequest),
+                map((action) => action.request),
+                tap((request) => {
+                    const dialogReference = this.dialog.open(YesNoDialog, {
+                        data: {
+                            title: 'Empty All Queues',
+                            message:
+                                'Are you sure you want to empty all queues in 
this Process Group? All FlowFiles from all connections waiting at the time of 
the request will be removed.'
+                        },
+                        panelClass: 'small-dialog'
+                    });
+
+                    
dialogReference.componentInstance.yes.pipe(take(1)).subscribe(() => {
+                        this.store.dispatch(
+                            QueueActions.submitEmptyQueuesRequest({
+                                request
+                            })
+                        );
+                    });
+                })
+            ),
+        { dispatch: false }
+    );
+
+    submitEmptyQueuesRequest$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(QueueActions.submitEmptyQueuesRequest),
+            map((action) => action.request),
+            switchMap((request) => {
+                const dialogReference = this.dialog.open(CancelDialog, {
+                    data: {
+                        title: 'Empty All Queues',
+                        message: 'Waiting for all queues to empty...'
+                    },
+                    disableClose: true,
+                    panelClass: 'small-dialog'
+                });
+
+                
dialogReference.componentInstance.cancel.pipe(take(1)).subscribe(() => {
+                    
this.store.dispatch(QueueActions.stopPollingEmptyQueueRequest());
+                });
+
+                return 
from(this.queueService.submitEmptyQueuesRequest(request)).pipe(
+                    map((response) =>
+                        QueueActions.submitEmptyQueueRequestSuccess({
+                            response: {
+                                dropEntity: response
+                            }
+                        })
+                    ),
+                    catchError((error) =>
+                        of(
+                            QueueActions.queueApiError({
+                                error: error.error
+                            })
+                        )
+                    )
+                );
+            })
+        )
+    );
+
+    submitEmptyQueueRequestSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(QueueActions.submitEmptyQueueRequestSuccess),
+            map((action) => action.response),
+            switchMap((response) => {
+                const dropRequest: DropRequest = 
response.dropEntity.dropRequest;
+                if (dropRequest.finished) {
+                    return of(QueueActions.deleteEmptyQueueRequest());
+                } else {
+                    return of(QueueActions.startPollingEmptyQueueRequest());
+                }
+            })
+        )
+    );
+
+    startPollingEmptyQueueRequest$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(QueueActions.startPollingEmptyQueueRequest),
+            switchMap(() =>
+                interval(2000, asyncScheduler).pipe(
+                    
takeUntil(this.actions$.pipe(ofType(QueueActions.stopPollingEmptyQueueRequest)))
+                )
+            ),
+            switchMap(() => of(QueueActions.pollEmptyQueueRequest()))
+        )
+    );
+
+    pollEmptyQueueRequest$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(QueueActions.pollEmptyQueueRequest),
+            
withLatestFrom(this.store.select(selectDropRequestEntity).pipe(isDefinedAndNotNull())),
+            switchMap(([action, dropEntity]) => {
+                return 
from(this.queueService.pollEmptyQueueRequest(dropEntity.dropRequest)).pipe(
+                    map((response) =>
+                        QueueActions.pollEmptyQueueRequestSuccess({
+                            response: {
+                                dropEntity: response
+                            }
+                        })
+                    ),
+                    catchError((error) =>
+                        of(
+                            QueueActions.queueApiError({
+                                error: error.error
+                            })
+                        )
+                    )
+                );
+            })
+        )
+    );
+
+    pollEmptyQueueRequestSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(QueueActions.pollEmptyQueueRequestSuccess),
+            map((action) => action.response),
+            filter((response) => response.dropEntity.dropRequest.finished),
+            switchMap((response) => 
of(QueueActions.stopPollingEmptyQueueRequest()))
+        )
+    );
+
+    stopPollingEmptyQueueRequest$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(QueueActions.stopPollingEmptyQueueRequest),
+            switchMap((response) => of(QueueActions.deleteEmptyQueueRequest()))
+        )
+    );
+
+    deleteEmptyQueueRequest$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(QueueActions.deleteEmptyQueueRequest),
+            
withLatestFrom(this.store.select(selectDropRequestEntity).pipe(isDefinedAndNotNull())),
+            switchMap(([action, dropEntity]) => {
+                this.dialog.closeAll();
+
+                return 
from(this.queueService.deleteEmptyQueueRequest(dropEntity.dropRequest)).pipe(
+                    map((response) =>
+                        QueueActions.showEmptyQueueResults({
+                            request: {
+                                dropEntity: response
+                            }
+                        })
+                    ),
+                    catchError((error) =>
+                        of(
+                            QueueActions.showEmptyQueueResults({
+                                request: {
+                                    dropEntity
+                                }
+                            })
+                        )
+                    )
+                );
+            })
+        )
+    );

Review Comment:
   The `isDefinedAndNotNull` function seems to be doing too much. Meaning it 
doesn't work how we expect it to in this case. It is effectively giving us the 
latest DropRequestEntity that was not null/undefined and not just filtering out 
the ones that are null/undefined. What makes it even more odd is that the 
entity doesn't even have to be in the store, there is a reference to the last 
one that wasn't null somehow.
   
   Scenario:
   * Empty a single queue, get success.
   * Empty all queues where the request fails (remove view access to one of the 
processors connected to a queue)
   * Click cancel on the waiting dialog since it won't go away on its own.
   * See the dialog present itself with the results from the previous 
successful empty queue action.
   
   ![Kapture 2024-01-16 at 10 38 
01](https://github.com/apache/nifi/assets/713866/661847bb-89a0-4ea2-8e38-946c262a01e4)
   



##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/queue/queue.effects.ts:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Injectable } from '@angular/core';
+import { Actions, createEffect, ofType } from '@ngrx/effects';
+import * as QueueActions from './queue.actions';
+import { Store } from '@ngrx/store';
+import {
+    asyncScheduler,
+    catchError,
+    filter,
+    from,
+    interval,
+    map,
+    of,
+    switchMap,
+    take,
+    takeUntil,
+    tap,
+    withLatestFrom
+} from 'rxjs';
+import { selectDropConnectionId, selectDropProcessGroupId, 
selectDropRequestEntity } from './queue.selectors';
+import { QueueService } from '../../service/queue.service';
+import { DropRequest } from './index';
+import { CancelDialog } from 
'../../../../ui/common/cancel-dialog/cancel-dialog.component';
+import { MatDialog } from '@angular/material/dialog';
+import { NiFiCommon } from '../../../../service/nifi-common.service';
+import { isDefinedAndNotNull } from '../../../../state/shared';
+import { YesNoDialog } from 
'../../../../ui/common/yes-no-dialog/yes-no-dialog.component';
+import { OkDialog } from '../../../../ui/common/ok-dialog/ok-dialog.component';
+import { loadConnection, loadProcessGroup } from '../flow/flow.actions';
+import { resetQueueState } from './queue.actions';
+
+@Injectable()
+export class QueueEffects {
+    constructor(
+        private actions$: Actions,
+        private store: Store<CanvasState>,
+        private queueService: QueueService,
+        private dialog: MatDialog,
+        private nifiCommon: NiFiCommon
+    ) {}
+
+    promptEmptyQueueRequest$ = createEffect(
+        () =>
+            this.actions$.pipe(
+                ofType(QueueActions.promptEmptyQueueRequest),
+                map((action) => action.request),
+                tap((request) => {
+                    const dialogReference = this.dialog.open(YesNoDialog, {
+                        data: {
+                            title: 'Empty Queue',
+                            message:
+                                'Are you sure you want to empty this queue? 
All FlowFiles waiting at the time of the request will be removed.'
+                        },
+                        panelClass: 'small-dialog'
+                    });
+
+                    
dialogReference.componentInstance.yes.pipe(take(1)).subscribe(() => {
+                        this.store.dispatch(
+                            QueueActions.submitEmptyQueueRequest({
+                                request
+                            })
+                        );
+                    });
+                })
+            ),
+        { dispatch: false }
+    );
+
+    submitEmptyQueueRequest$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(QueueActions.submitEmptyQueueRequest),
+            map((action) => action.request),
+            switchMap((request) => {
+                const dialogReference = this.dialog.open(CancelDialog, {
+                    data: {
+                        title: 'Empty Queue',
+                        message: 'Waiting for queue to empty...'
+                    },
+                    disableClose: true,
+                    panelClass: 'small-dialog'
+                });
+
+                
dialogReference.componentInstance.cancel.pipe(take(1)).subscribe(() => {
+                    
this.store.dispatch(QueueActions.stopPollingEmptyQueueRequest());
+                });
+
+                return 
from(this.queueService.submitEmptyQueueRequest(request)).pipe(
+                    map((response) =>
+                        QueueActions.submitEmptyQueueRequestSuccess({
+                            response: {
+                                dropEntity: response
+                            }
+                        })
+                    ),
+                    catchError((error) =>
+                        of(
+                            QueueActions.queueApiError({
+                                error: error.error
+                            })
+                        )
+                    )
+                );
+            })
+        )
+    );
+
+    promptEmptyQueuesRequest$ = createEffect(
+        () =>
+            this.actions$.pipe(
+                ofType(QueueActions.promptEmptyQueuesRequest),
+                map((action) => action.request),
+                tap((request) => {
+                    const dialogReference = this.dialog.open(YesNoDialog, {
+                        data: {
+                            title: 'Empty All Queues',
+                            message:
+                                'Are you sure you want to empty all queues in 
this Process Group? All FlowFiles from all connections waiting at the time of 
the request will be removed.'
+                        },
+                        panelClass: 'small-dialog'
+                    });
+
+                    
dialogReference.componentInstance.yes.pipe(take(1)).subscribe(() => {
+                        this.store.dispatch(
+                            QueueActions.submitEmptyQueuesRequest({
+                                request
+                            })
+                        );
+                    });
+                })
+            ),
+        { dispatch: false }
+    );
+
+    submitEmptyQueuesRequest$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(QueueActions.submitEmptyQueuesRequest),
+            map((action) => action.request),
+            switchMap((request) => {
+                const dialogReference = this.dialog.open(CancelDialog, {
+                    data: {
+                        title: 'Empty All Queues',
+                        message: 'Waiting for all queues to empty...'
+                    },
+                    disableClose: true,
+                    panelClass: 'small-dialog'
+                });
+
+                
dialogReference.componentInstance.cancel.pipe(take(1)).subscribe(() => {
+                    
this.store.dispatch(QueueActions.stopPollingEmptyQueueRequest());
+                });
+
+                return 
from(this.queueService.submitEmptyQueuesRequest(request)).pipe(
+                    map((response) =>
+                        QueueActions.submitEmptyQueueRequestSuccess({
+                            response: {
+                                dropEntity: response
+                            }
+                        })
+                    ),
+                    catchError((error) =>
+                        of(
+                            QueueActions.queueApiError({
+                                error: error.error
+                            })
+                        )
+                    )

Review Comment:
   We should consider closing the dialog in case of an error. While the general 
error handling approach is TBD, in this scenario the request fails and the user 
is left with a waiting dialog and no indication that it actually failed.
   
   I discovered this when trying to empty all queues and one of my processors 
is not viewable to my user. The backend responds to the request with a `403: 
Unable to view Processor with ID 834e1653-018c-1000-37db-5b30738632b9. Contact 
the system administrator.`
   
   I imagine the same issue would exist on the action to empty a single queue 
but i couldn't find a breaking scenario to verify.
   



##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/queue/queue.effects.ts:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Injectable } from '@angular/core';
+import { Actions, createEffect, ofType } from '@ngrx/effects';
+import * as QueueActions from './queue.actions';
+import { Store } from '@ngrx/store';
+import {
+    asyncScheduler,
+    catchError,
+    filter,
+    from,
+    interval,
+    map,
+    of,
+    switchMap,
+    take,
+    takeUntil,
+    tap,
+    withLatestFrom
+} from 'rxjs';
+import { selectDropConnectionId, selectDropProcessGroupId, 
selectDropRequestEntity } from './queue.selectors';
+import { QueueService } from '../../service/queue.service';
+import { DropRequest } from './index';
+import { CancelDialog } from 
'../../../../ui/common/cancel-dialog/cancel-dialog.component';
+import { MatDialog } from '@angular/material/dialog';
+import { NiFiCommon } from '../../../../service/nifi-common.service';
+import { isDefinedAndNotNull } from '../../../../state/shared';
+import { YesNoDialog } from 
'../../../../ui/common/yes-no-dialog/yes-no-dialog.component';
+import { OkDialog } from '../../../../ui/common/ok-dialog/ok-dialog.component';
+import { loadConnection, loadProcessGroup } from '../flow/flow.actions';
+import { resetQueueState } from './queue.actions';
+
+@Injectable()
+export class QueueEffects {
+    constructor(
+        private actions$: Actions,
+        private store: Store<CanvasState>,
+        private queueService: QueueService,
+        private dialog: MatDialog,
+        private nifiCommon: NiFiCommon
+    ) {}
+
+    promptEmptyQueueRequest$ = createEffect(
+        () =>
+            this.actions$.pipe(
+                ofType(QueueActions.promptEmptyQueueRequest),
+                map((action) => action.request),
+                tap((request) => {
+                    const dialogReference = this.dialog.open(YesNoDialog, {
+                        data: {
+                            title: 'Empty Queue',
+                            message:
+                                'Are you sure you want to empty this queue? 
All FlowFiles waiting at the time of the request will be removed.'
+                        },
+                        panelClass: 'small-dialog'
+                    });
+
+                    
dialogReference.componentInstance.yes.pipe(take(1)).subscribe(() => {
+                        this.store.dispatch(
+                            QueueActions.submitEmptyQueueRequest({
+                                request
+                            })
+                        );
+                    });
+                })
+            ),
+        { dispatch: false }
+    );
+
+    submitEmptyQueueRequest$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(QueueActions.submitEmptyQueueRequest),
+            map((action) => action.request),
+            switchMap((request) => {
+                const dialogReference = this.dialog.open(CancelDialog, {
+                    data: {
+                        title: 'Empty Queue',
+                        message: 'Waiting for queue to empty...'
+                    },
+                    disableClose: true,
+                    panelClass: 'small-dialog'
+                });
+
+                
dialogReference.componentInstance.cancel.pipe(take(1)).subscribe(() => {
+                    
this.store.dispatch(QueueActions.stopPollingEmptyQueueRequest());
+                });
+
+                return 
from(this.queueService.submitEmptyQueueRequest(request)).pipe(
+                    map((response) =>
+                        QueueActions.submitEmptyQueueRequestSuccess({
+                            response: {
+                                dropEntity: response
+                            }
+                        })
+                    ),
+                    catchError((error) =>
+                        of(
+                            QueueActions.queueApiError({
+                                error: error.error
+                            })
+                        )
+                    )
+                );
+            })
+        )
+    );
+
+    promptEmptyQueuesRequest$ = createEffect(
+        () =>
+            this.actions$.pipe(
+                ofType(QueueActions.promptEmptyQueuesRequest),
+                map((action) => action.request),
+                tap((request) => {
+                    const dialogReference = this.dialog.open(YesNoDialog, {
+                        data: {
+                            title: 'Empty All Queues',
+                            message:
+                                'Are you sure you want to empty all queues in 
this Process Group? All FlowFiles from all connections waiting at the time of 
the request will be removed.'
+                        },
+                        panelClass: 'small-dialog'
+                    });
+
+                    
dialogReference.componentInstance.yes.pipe(take(1)).subscribe(() => {
+                        this.store.dispatch(
+                            QueueActions.submitEmptyQueuesRequest({
+                                request
+                            })
+                        );
+                    });
+                })
+            ),
+        { dispatch: false }
+    );
+
+    submitEmptyQueuesRequest$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(QueueActions.submitEmptyQueuesRequest),
+            map((action) => action.request),
+            switchMap((request) => {
+                const dialogReference = this.dialog.open(CancelDialog, {
+                    data: {
+                        title: 'Empty All Queues',
+                        message: 'Waiting for all queues to empty...'
+                    },
+                    disableClose: true,
+                    panelClass: 'small-dialog'
+                });
+
+                
dialogReference.componentInstance.cancel.pipe(take(1)).subscribe(() => {
+                    
this.store.dispatch(QueueActions.stopPollingEmptyQueueRequest());
+                });
+
+                return 
from(this.queueService.submitEmptyQueuesRequest(request)).pipe(
+                    map((response) =>
+                        QueueActions.submitEmptyQueueRequestSuccess({
+                            response: {
+                                dropEntity: response
+                            }
+                        })
+                    ),
+                    catchError((error) =>
+                        of(
+                            QueueActions.queueApiError({
+                                error: error.error
+                            })
+                        )
+                    )
+                );
+            })
+        )
+    );
+
+    submitEmptyQueueRequestSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(QueueActions.submitEmptyQueueRequestSuccess),
+            map((action) => action.response),
+            switchMap((response) => {
+                const dropRequest: DropRequest = 
response.dropEntity.dropRequest;
+                if (dropRequest.finished) {
+                    return of(QueueActions.deleteEmptyQueueRequest());
+                } else {
+                    return of(QueueActions.startPollingEmptyQueueRequest());
+                }
+            })
+        )
+    );
+
+    startPollingEmptyQueueRequest$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(QueueActions.startPollingEmptyQueueRequest),
+            switchMap(() =>
+                interval(2000, asyncScheduler).pipe(
+                    
takeUntil(this.actions$.pipe(ofType(QueueActions.stopPollingEmptyQueueRequest)))
+                )
+            ),
+            switchMap(() => of(QueueActions.pollEmptyQueueRequest()))
+        )
+    );
+
+    pollEmptyQueueRequest$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(QueueActions.pollEmptyQueueRequest),
+            
withLatestFrom(this.store.select(selectDropRequestEntity).pipe(isDefinedAndNotNull())),
+            switchMap(([action, dropEntity]) => {
+                return 
from(this.queueService.pollEmptyQueueRequest(dropEntity.dropRequest)).pipe(
+                    map((response) =>
+                        QueueActions.pollEmptyQueueRequestSuccess({
+                            response: {
+                                dropEntity: response
+                            }
+                        })
+                    ),
+                    catchError((error) =>
+                        of(
+                            QueueActions.queueApiError({
+                                error: error.error
+                            })
+                        )
+                    )
+                );
+            })
+        )
+    );
+
+    pollEmptyQueueRequestSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(QueueActions.pollEmptyQueueRequestSuccess),
+            map((action) => action.response),
+            filter((response) => response.dropEntity.dropRequest.finished),
+            switchMap((response) => 
of(QueueActions.stopPollingEmptyQueueRequest()))
+        )
+    );
+
+    stopPollingEmptyQueueRequest$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(QueueActions.stopPollingEmptyQueueRequest),
+            switchMap((response) => of(QueueActions.deleteEmptyQueueRequest()))
+        )
+    );
+
+    deleteEmptyQueueRequest$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(QueueActions.deleteEmptyQueueRequest),
+            
withLatestFrom(this.store.select(selectDropRequestEntity).pipe(isDefinedAndNotNull())),
+            switchMap(([action, dropEntity]) => {
+                this.dialog.closeAll();
+
+                return 
from(this.queueService.deleteEmptyQueueRequest(dropEntity.dropRequest)).pipe(
+                    map((response) =>
+                        QueueActions.showEmptyQueueResults({
+                            request: {
+                                dropEntity: response
+                            }
+                        })
+                    ),
+                    catchError((error) =>
+                        of(
+                            QueueActions.showEmptyQueueResults({
+                                request: {
+                                    dropEntity
+                                }
+                            })
+                        )
+                    )
+                );
+            })
+        )
+    );

Review Comment:
   _Update_ I think I found a solution. Based on this ngrx doc:  
https://ngrx.io/guide/effects#incorporating-state, we need to use 
`concatLatestFrom` rather than `withLatestFrom` to prevent the selector from 
firing until the correct action is dispatched
   
   ```
   concatLatestFrom((action) => 
this.store.select(selectDropRequestEntity).pipe(isDefinedAndNotNull())),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to