tpalfy commented on a change in pull request #4411:
URL: https://github.com/apache/nifi/pull/4411#discussion_r463554505



##########
File path: 
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
##########
@@ -190,33 +197,50 @@
     public final void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
         AMQPResource<T> resource = resourceQueue.poll();
         if (resource == null) {
-            resource = createResource(context);
+            try {
+                resource = createResource(context);
+            } catch (Exception e) {
+                getLogger().error("Failed to initialize AMQP client", e);
+                context.yield();
+                return;
+            }
+        } else if (!resource.isAlive()) {

Review comment:
       With this we put checks in two places, _before_ and _during_ the AMQP 
communication. And the general problem is, the resource could "die" between the 
two.
   
   It might be better to incorporate all resource health checks in the _during_ 
phase.

##########
File path: 
nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
##########
@@ -190,33 +197,50 @@
     public final void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
         AMQPResource<T> resource = resourceQueue.poll();
         if (resource == null) {
-            resource = createResource(context);
+            try {
+                resource = createResource(context);
+            } catch (Exception e) {
+                getLogger().error("Failed to initialize AMQP client", e);
+                context.yield();
+                return;
+            }
+        } else if (!resource.isAlive()) {
+            getLogger().error("AMQP client has lost connection while it was 
waiting in the resource pool, dropping the AMQP client.");
+            closeResource(resource);
+            return;
         }
 
         try {
             processResource(resource.getConnection(), resource.getWorker(), 
context, session);
-            resourceQueue.offer(resource);
-        } catch (final Exception e) {
-            try {
-                resource.close();
-            } catch (final Exception e2) {
-                e.addSuppressed(e2);
-            }
 
-            throw e;
+            if (!resourceQueue.offer(resource)) {
+                getLogger().info("Worker queue is full, closing AMQP client");
+                closeResource(resource);
+            }
+        } catch (final Exception e) {
+            getLogger().error("Failed to process message, dropping the AMQP 
client and yielding", e);

Review comment:
       The exception handling has some inconsistencies.
   It would be better to distinguish between the AMQP client failure and other 
NiFi-related failures.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to