[ 
https://issues.apache.org/jira/browse/KAFKA-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16441757#comment-16441757
 ] 

ASF GitHub Bot commented on KAFKA-4228:
---------------------------------------

radai-rosenblatt closed pull request #1930: KAFKA-4228 - make producer close on 
sender thread death, make consumer shutdown on failure to rebalance, and make 
MM die on any of the above.
URL: https://github.com/apache/kafka/pull/1930
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3efc7b5cb69..e4367997282 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -323,6 +323,17 @@ private KafkaProducer(ProducerConfig config, Serializer<K> 
keySerializer, Serial
                     this.requestTimeoutMs);
             String ioThreadName = "kafka-producer-network-thread" + 
(clientId.length() > 0 ? " | " + clientId : "");
             this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
+            this.ioThread.setUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
+                @Override
+                public void uncaughtException(Thread t, Throwable e) {
+                    try {
+                        log.error("Thread " + t.getName() + " died due to 
uncaught exception", e);
+                    } finally {
+                        close(0, TimeUnit.MILLISECONDS); //cant wait to 
properly flush, because the thread doing the flushing just died
+                    }
+
+                }
+            });
             this.ioThread.start();
 
             this.errors = this.metrics.sensor("errors");
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 6471dad78df..0ab5521d66a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -126,41 +126,51 @@ public Sender(KafkaClient client,
      * The main run loop for the sender thread
      */
     public void run() {
-        log.debug("Starting Kafka producer I/O thread.");
-
-        // main loop, runs until close is called
-        while (running) {
-            try {
-                run(time.milliseconds());
-            } catch (Exception e) {
-                log.error("Uncaught error in kafka producer I/O thread: ", e);
+        boolean gracefulShutdown = false;
+        try {
+            log.debug("Starting Kafka producer I/O thread.");
+
+            // main loop, runs until close is called
+            while (running) {
+                try {
+                    run(time.milliseconds());
+                } catch (Exception e) {
+                    log.error("Uncaught error in kafka producer I/O thread: ", 
e);
+                }
             }
-        }
 
-        log.debug("Beginning shutdown of Kafka producer I/O thread, sending 
remaining records.");
+            log.debug("Beginning shutdown of Kafka producer I/O thread, 
sending remaining records.");
 
-        // okay we stopped accepting requests but there may still be
-        // requests in the accumulator or waiting for acknowledgment,
-        // wait until these are completed.
-        while (!forceClose && (this.accumulator.hasUnsent() || 
this.client.inFlightRequestCount() > 0)) {
+            // okay we stopped accepting requests but there may still be
+            // requests in the accumulator or waiting for acknowledgment,
+            // wait until these are completed.
+            while (!forceClose && (this.accumulator.hasUnsent() || 
this.client.inFlightRequestCount() > 0)) {
+                try {
+                    run(time.milliseconds());
+                } catch (Exception e) {
+                    log.error("Uncaught error in kafka producer I/O thread: ", 
e);
+                }
+            }
+            if (forceClose) {
+                // We need to fail all the incomplete batches and wake up the 
threads waiting on
+                // the futures.
+                this.accumulator.abortIncompleteBatches();
+            }
             try {
-                run(time.milliseconds());
+                this.client.close();
             } catch (Exception e) {
-                log.error("Uncaught error in kafka producer I/O thread: ", e);
+                log.error("Failed to close network client", e);
             }
-        }
-        if (forceClose) {
-            // We need to fail all the incomplete batches and wake up the 
threads waiting on
-            // the futures.
-            this.accumulator.abortIncompleteBatches();
-        }
-        try {
-            this.client.close();
-        } catch (Exception e) {
-            log.error("Failed to close network client", e);
-        }
 
-        log.debug("Shutdown of Kafka producer I/O thread has completed.");
+            log.debug("Shutdown of Kafka producer I/O thread has completed.");
+            gracefulShutdown = true;
+        } finally {
+            //make sure to clean up any pending batches. this is a nop on 
graceful shutdown
+            if (!gracefulShutdown) {
+                forceClose();
+                this.accumulator.abortIncompleteBatches();
+            }
+        }
     }
 
     /**
diff --git 
a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index f776578f6d4..f9ee2b633e0 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -588,7 +588,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: 
ConsumerConfig,
             if (doRebalance)
               syncedRebalance
           } catch {
-            case t: Throwable => error("error during syncedRebalance", t)
+            case t: Throwable =>
+              error("Error during syncedRebalance", t)
+              shutdown()
           }
         }
         info("stopping watcher executor thread for consumer " + 
consumerIdString)
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala 
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 17b8f0be6cf..7122c2136a5 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -434,6 +434,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
               records.foreach(producer.send)
               maybeFlushAndCommitOffsets()
             }
+            if (!mirrorMakerConsumer.hasData && !shuttingDown && 
!exitingOnSendFailure) {
+              //consumer has closed (due to error)
+              throw new IllegalStateException("Consumer has shut down")
+            }
           } catch {
             case cte: ConsumerTimeoutException =>
               trace("Caught ConsumerTimeoutException, continue iteration.")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Sender thread death leaves KafkaProducer in a bad state
> -------------------------------------------------------
>
>                 Key: KAFKA-4228
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4228
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.10.0.1
>            Reporter: radai rosenblatt
>            Priority: Major
>
> a KafkaProducer's Sender thread may die:
> {noformat}
> 2016/09/28 00:28:01.065 ERROR [KafkaThread] [kafka-producer-network-thread | 
> mm_ei-lca1_uniform] [kafka-mirror-maker] [] Uncaught exception in 
> kafka-producer-network-thread | mm_ei-lca1_uni
> java.lang.OutOfMemoryError: Java heap space
>        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) ~[?:1.8.0_40]
>        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[?:1.8.0_40]
>        at 
> org.apache.kafka.common.requests.RequestSend.serialize(RequestSend.java:35) 
> ~[kafka-clients-0.9.0.666.jar:?]
>        at 
> org.apache.kafka.common.requests.RequestSend.<init>(RequestSend.java:29) 
> ~[kafka-clients-0.9.0.666.jar:?]
>        at 
> org.apache.kafka.clients.producer.internals.Sender.produceRequest(Sender.java:355)
>  ~[kafka-clients-0.9.0.666.jar:?]
>        at 
> org.apache.kafka.clients.producer.internals.Sender.createProduceRequests(Sender.java:337)
>  ~[kafka-clients-0.9.0.666.jar:?]
>        at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:211) 
> ~[kafka-clients-0.9.0.666.jar:?]
>        at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134) 
> ~[kafka-clients-0.9.0.666.jar:?]
>        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
> {noformat}
> which leaves the producer in a bad state. in this state, a call to flush(), 
> for example, will hang indefinitely as the sender thread is not around to 
> flush batches but theyve not been aborted.
> even worse, this can happen in MirrorMaker just before a rebalance, at which 
> point MM will just block indefinitely during a rebalance (in 
> beforeReleasingPartitions()).
> a rebalance participant hung in such a way will cause rebalance to fail for 
> the rest of the participants, at which point 
> ZKRebalancerListener.watcherExecutorThread() dies to an exception (cannot 
> rebalance after X attempts) but the consumer that ran the thread will remain 
> live. the end result is a bunch of zombie mirror makers and orphan topic 
> partitions.
> a dead sender thread should result in closing the producer.
> a consumer failing to rebalance should shut down.
> any issue with the producer or consumer should cause mirror-maker death.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to