[jira] [Commented] (KAFKA-1886) SimpleConsumer swallowing ClosedByInterruptException

2015-04-28 Thread Aditya Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14517489#comment-14517489
 ] 

Aditya Auradkar commented on KAFKA-1886:


[~nehanarkhede] I've updated the patch. This is a pretty trivial but relatively 
useful bug fix and for that reason I think we should merge it. Thoughts?

> SimpleConsumer swallowing ClosedByInterruptException
> 
>
> Key: KAFKA-1886
> URL: https://issues.apache.org/jira/browse/KAFKA-1886
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Aditya A Auradkar
>Assignee: Aditya Auradkar
> Attachments: KAFKA-1886.patch, KAFKA-1886.patch, 
> KAFKA-1886_2015-02-02_13:57:23.patch, KAFKA-1886_2015-04-28_10:27:39.patch
>
>
> This issue was originally reported by a Samza developer. I've included an 
> exchange of mine with Chris Riccomini. I'm trying to reproduce the problem on 
> my dev setup.
> From: criccomi
> Hey all,
> Samza's BrokerProxy [1] threads appear to be wedging randomly when we try to 
> interrupt its fetcher thread. I noticed that SimpleConsumer.scala catches 
> Throwable in its sendRequest method [2]. I'm wondering: if 
> blockingChannel.send/receive throws a ClosedByInterruptException
> when the thread is interrupted, what happens? It looks like sendRequest will 
> catch the exception (which I
> think clears the thread's interrupted flag), and then retries the send. If 
> the send succeeds on the retry, I think that the ClosedByInterruptException 
> exception is effectively swallowed, and the BrokerProxy will continue
> fetching messages as though its thread was never interrupted.
> Am I misunderstanding how things work?
> Cheers,
> Chris
> [1] 
> https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L126
> [2] 
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L75



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1886) SimpleConsumer swallowing ClosedByInterruptException

2015-04-28 Thread Aditya A Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14517479#comment-14517479
 ] 

Aditya A Auradkar commented on KAFKA-1886:
--

Updated reviewboard https://reviews.apache.org/r/30196/diff/
 against branch origin/trunk

> SimpleConsumer swallowing ClosedByInterruptException
> 
>
> Key: KAFKA-1886
> URL: https://issues.apache.org/jira/browse/KAFKA-1886
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Aditya A Auradkar
>Assignee: Aditya Auradkar
> Attachments: KAFKA-1886.patch, KAFKA-1886.patch, 
> KAFKA-1886_2015-02-02_13:57:23.patch, KAFKA-1886_2015-04-28_10:27:39.patch
>
>
> This issue was originally reported by a Samza developer. I've included an 
> exchange of mine with Chris Riccomini. I'm trying to reproduce the problem on 
> my dev setup.
> From: criccomi
> Hey all,
> Samza's BrokerProxy [1] threads appear to be wedging randomly when we try to 
> interrupt its fetcher thread. I noticed that SimpleConsumer.scala catches 
> Throwable in its sendRequest method [2]. I'm wondering: if 
> blockingChannel.send/receive throws a ClosedByInterruptException
> when the thread is interrupted, what happens? It looks like sendRequest will 
> catch the exception (which I
> think clears the thread's interrupted flag), and then retries the send. If 
> the send succeeds on the retry, I think that the ClosedByInterruptException 
> exception is effectively swallowed, and the BrokerProxy will continue
> fetching messages as though its thread was never interrupted.
> Am I misunderstanding how things work?
> Cheers,
> Chris
> [1] 
> https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L126
> [2] 
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L75



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1886) SimpleConsumer swallowing ClosedByInterruptException

2015-04-26 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14513154#comment-14513154
 ] 

Neha Narkhede commented on KAFKA-1886:
--

[~aauradkar] Took a quick look at your patch again. Are you planning on fixing 
it so we can merge? Realize that the simple consumer changes are going to 
matter less as we make more progress on the new consumer.

> SimpleConsumer swallowing ClosedByInterruptException
> 
>
> Key: KAFKA-1886
> URL: https://issues.apache.org/jira/browse/KAFKA-1886
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Aditya A Auradkar
>Assignee: Aditya Auradkar
> Attachments: KAFKA-1886.patch, KAFKA-1886.patch, 
> KAFKA-1886_2015-02-02_13:57:23.patch
>
>
> This issue was originally reported by a Samza developer. I've included an 
> exchange of mine with Chris Riccomini. I'm trying to reproduce the problem on 
> my dev setup.
> From: criccomi
> Hey all,
> Samza's BrokerProxy [1] threads appear to be wedging randomly when we try to 
> interrupt its fetcher thread. I noticed that SimpleConsumer.scala catches 
> Throwable in its sendRequest method [2]. I'm wondering: if 
> blockingChannel.send/receive throws a ClosedByInterruptException
> when the thread is interrupted, what happens? It looks like sendRequest will 
> catch the exception (which I
> think clears the thread's interrupted flag), and then retries the send. If 
> the send succeeds on the retry, I think that the ClosedByInterruptException 
> exception is effectively swallowed, and the BrokerProxy will continue
> fetching messages as though its thread was never interrupted.
> Am I misunderstanding how things work?
> Cheers,
> Chris
> [1] 
> https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L126
> [2] 
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L75



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1886) SimpleConsumer swallowing ClosedByInterruptException

2015-02-02 Thread Aditya A Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14302007#comment-14302007
 ] 

Aditya A Auradkar commented on KAFKA-1886:
--

Updated reviewboard https://reviews.apache.org/r/30196/diff/
 against branch origin/trunk

> SimpleConsumer swallowing ClosedByInterruptException
> 
>
> Key: KAFKA-1886
> URL: https://issues.apache.org/jira/browse/KAFKA-1886
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Aditya A Auradkar
>Assignee: Aditya Auradkar
> Attachments: KAFKA-1886.patch, KAFKA-1886.patch, 
> KAFKA-1886_2015-02-02_13:57:23.patch
>
>
> This issue was originally reported by a Samza developer. I've included an 
> exchange of mine with Chris Riccomini. I'm trying to reproduce the problem on 
> my dev setup.
> From: criccomi
> Hey all,
> Samza's BrokerProxy [1] threads appear to be wedging randomly when we try to 
> interrupt its fetcher thread. I noticed that SimpleConsumer.scala catches 
> Throwable in its sendRequest method [2]. I'm wondering: if 
> blockingChannel.send/receive throws a ClosedByInterruptException
> when the thread is interrupted, what happens? It looks like sendRequest will 
> catch the exception (which I
> think clears the thread's interrupted flag), and then retries the send. If 
> the send succeeds on the retry, I think that the ClosedByInterruptException 
> exception is effectively swallowed, and the BrokerProxy will continue
> fetching messages as though its thread was never interrupted.
> Am I misunderstanding how things work?
> Cheers,
> Chris
> [1] 
> https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L126
> [2] 
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L75



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1886) SimpleConsumer swallowing ClosedByInterruptException

2015-02-02 Thread Aditya A Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14301981#comment-14301981
 ] 

Aditya A Auradkar commented on KAFKA-1886:
--

Created reviewboard https://reviews.apache.org/r/30527/diff/
 against branch origin/trunk

> SimpleConsumer swallowing ClosedByInterruptException
> 
>
> Key: KAFKA-1886
> URL: https://issues.apache.org/jira/browse/KAFKA-1886
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Aditya A Auradkar
>Assignee: Aditya Auradkar
> Attachments: KAFKA-1886.patch, KAFKA-1886.patch
>
>
> This issue was originally reported by a Samza developer. I've included an 
> exchange of mine with Chris Riccomini. I'm trying to reproduce the problem on 
> my dev setup.
> From: criccomi
> Hey all,
> Samza's BrokerProxy [1] threads appear to be wedging randomly when we try to 
> interrupt its fetcher thread. I noticed that SimpleConsumer.scala catches 
> Throwable in its sendRequest method [2]. I'm wondering: if 
> blockingChannel.send/receive throws a ClosedByInterruptException
> when the thread is interrupted, what happens? It looks like sendRequest will 
> catch the exception (which I
> think clears the thread's interrupted flag), and then retries the send. If 
> the send succeeds on the retry, I think that the ClosedByInterruptException 
> exception is effectively swallowed, and the BrokerProxy will continue
> fetching messages as though its thread was never interrupted.
> Am I misunderstanding how things work?
> Cheers,
> Chris
> [1] 
> https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L126
> [2] 
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L75



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1886) SimpleConsumer swallowing ClosedByInterruptException

2015-01-22 Thread Aditya A Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14288350#comment-14288350
 ] 

Aditya A Auradkar commented on KAFKA-1886:
--

Created reviewboard https://reviews.apache.org/r/30196/diff/
 against branch origin/trunk

> SimpleConsumer swallowing ClosedByInterruptException
> 
>
> Key: KAFKA-1886
> URL: https://issues.apache.org/jira/browse/KAFKA-1886
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Aditya A Auradkar
>Assignee: Jun Rao
> Attachments: KAFKA-1886.patch
>
>
> This issue was originally reported by a Samza developer. I've included an 
> exchange of mine with Chris Riccomini. I'm trying to reproduce the problem on 
> my dev setup.
> From: criccomi
> Hey all,
> Samza's BrokerProxy [1] threads appear to be wedging randomly when we try to 
> interrupt its fetcher thread. I noticed that SimpleConsumer.scala catches 
> Throwable in its sendRequest method [2]. I'm wondering: if 
> blockingChannel.send/receive throws a ClosedByInterruptException
> when the thread is interrupted, what happens? It looks like sendRequest will 
> catch the exception (which I
> think clears the thread's interrupted flag), and then retries the send. If 
> the send succeeds on the retry, I think that the ClosedByInterruptException 
> exception is effectively swallowed, and the BrokerProxy will continue
> fetching messages as though its thread was never interrupted.
> Am I misunderstanding how things work?
> Cheers,
> Chris
> [1] 
> https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L126
> [2] 
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L75



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1886) SimpleConsumer swallowing ClosedByInterruptException

2015-01-21 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14286326#comment-14286326
 ] 

Chris Riccomini commented on KAFKA-1886:


IMO, the SimpleConsumer should at least throw the proper exception.

> SimpleConsumer swallowing ClosedByInterruptException
> 
>
> Key: KAFKA-1886
> URL: https://issues.apache.org/jira/browse/KAFKA-1886
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Aditya A Auradkar
>Assignee: Jun Rao
>
> This issue was originally reported by a Samza developer. I've included an 
> exchange of mine with Chris Riccomini. I'm trying to reproduce the problem on 
> my dev setup.
> From: criccomi
> Hey all,
> Samza's BrokerProxy [1] threads appear to be wedging randomly when we try to 
> interrupt its fetcher thread. I noticed that SimpleConsumer.scala catches 
> Throwable in its sendRequest method [2]. I'm wondering: if 
> blockingChannel.send/receive throws a ClosedByInterruptException
> when the thread is interrupted, what happens? It looks like sendRequest will 
> catch the exception (which I
> think clears the thread's interrupted flag), and then retries the send. If 
> the send succeeds on the retry, I think that the ClosedByInterruptException 
> exception is effectively swallowed, and the BrokerProxy will continue
> fetching messages as though its thread was never interrupted.
> Am I misunderstanding how things work?
> Cheers,
> Chris
> [1] 
> https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L126
> [2] 
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L75



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1886) SimpleConsumer swallowing ClosedByInterruptException

2015-01-21 Thread Aditya A Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14286287#comment-14286287
 ] 

Aditya A Auradkar commented on KAFKA-1886:
--

[~junrao] any thoughts?

> SimpleConsumer swallowing ClosedByInterruptException
> 
>
> Key: KAFKA-1886
> URL: https://issues.apache.org/jira/browse/KAFKA-1886
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Aditya A Auradkar
>Assignee: Jun Rao
>
> This issue was originally reported by a Samza developer. I've included an 
> exchange of mine with Chris Riccomini. I'm trying to reproduce the problem on 
> my dev setup.
> From: criccomi
> Hey all,
> Samza's BrokerProxy [1] threads appear to be wedging randomly when we try to 
> interrupt its fetcher thread. I noticed that SimpleConsumer.scala catches 
> Throwable in its sendRequest method [2]. I'm wondering: if 
> blockingChannel.send/receive throws a ClosedByInterruptException
> when the thread is interrupted, what happens? It looks like sendRequest will 
> catch the exception (which I
> think clears the thread's interrupted flag), and then retries the send. If 
> the send succeeds on the retry, I think that the ClosedByInterruptException 
> exception is effectively swallowed, and the BrokerProxy will continue
> fetching messages as though its thread was never interrupted.
> Am I misunderstanding how things work?
> Cheers,
> Chris
> [1] 
> https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L126
> [2] 
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L75



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1886) SimpleConsumer swallowing ClosedByInterruptException

2015-01-20 Thread Aditya A Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14284963#comment-14284963
 ] 

Aditya A Auradkar commented on KAFKA-1886:
--

If interested, I hacked an existing test for this.

def testConsumerEmptyTopic() {
  val newTopic = "new-topic"
  TestUtils.createTopic(zkClient, newTopic, numPartitions = 1, 
replicationFactor = 1, servers = servers)
  val thread = new Thread {
override def run {
  System.out.println("Starting the fetch")
  val start = System.currentTimeMillis()
  try
  {
val fetchResponse = consumer.fetch(new 
FetchRequestBuilder().minBytes(10).maxWait(3000).addFetch(newTopic, 0, 0, 
1).build())
  }
  catch {
  case e: Throwable =>{
val  end = System.currentTimeMillis()
System.out.println("Caught exception" + e + ". Took " + (end - 
start));
System.out.println("Fetch interrupted " + 
Thread.currentThread().isInterrupted)
  }
  }
}
  }

 thread.start()
  Thread.sleep(1000)
  thread.interrupt()
  thread.join()
  System.out.println("Ending test")
  }

> SimpleConsumer swallowing ClosedByInterruptException
> 
>
> Key: KAFKA-1886
> URL: https://issues.apache.org/jira/browse/KAFKA-1886
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Aditya A Auradkar
>Assignee: Jun Rao
>
> This issue was originally reported by a Samza developer. I've included an 
> exchange of mine with Chris Riccomini. I'm trying to reproduce the problem on 
> my dev setup.
> From: criccomi
> Hey all,
> Samza's BrokerProxy [1] threads appear to be wedging randomly when we try to 
> interrupt its fetcher thread. I noticed that SimpleConsumer.scala catches 
> Throwable in its sendRequest method [2]. I'm wondering: if 
> blockingChannel.send/receive throws a ClosedByInterruptException
> when the thread is interrupted, what happens? It looks like sendRequest will 
> catch the exception (which I
> think clears the thread's interrupted flag), and then retries the send. If 
> the send succeeds on the retry, I think that the ClosedByInterruptException 
> exception is effectively swallowed, and the BrokerProxy will continue
> fetching messages as though its thread was never interrupted.
> Am I misunderstanding how things work?
> Cheers,
> Chris
> [1] 
> https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L126
> [2] 
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L75



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1886) SimpleConsumer swallowing ClosedByInterruptException

2015-01-20 Thread Aditya A Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14284961#comment-14284961
 ] 

Aditya A Auradkar commented on KAFKA-1886:
--

Did a good bit of poking around. Basically, I wrote a test that runs a 
SimpleConsumer inside a thread and interrupts that thread from the main thread. 
This forces a ClosedByInterruptException that we catch in the 
SimpleConsumer:sendRequest method. Catching this exception does not reset the 
interrupt status of the Thread. The returned exception is a 
ClosedChannelException and the original exception is swallowed.

I can't spot any bug in Kafka here. I can suggest a couple of improvements:
- Don't retry inside SimpleConsumer if we catch a ClosedByInterruptException. 
Seems like extra work for nothing.
- Inspect code to check if we are catching InterruptedException somewhere. 
Based on a cursory inspection, I couldn't find anything.

> SimpleConsumer swallowing ClosedByInterruptException
> 
>
> Key: KAFKA-1886
> URL: https://issues.apache.org/jira/browse/KAFKA-1886
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Aditya A Auradkar
>Assignee: Jun Rao
>
> This issue was originally reported by a Samza developer. I've included an 
> exchange of mine with Chris Riccomini. I'm trying to reproduce the problem on 
> my dev setup.
> From: criccomi
> Hey all,
> Samza's BrokerProxy [1] threads appear to be wedging randomly when we try to 
> interrupt its fetcher thread. I noticed that SimpleConsumer.scala catches 
> Throwable in its sendRequest method [2]. I'm wondering: if 
> blockingChannel.send/receive throws a ClosedByInterruptException
> when the thread is interrupted, what happens? It looks like sendRequest will 
> catch the exception (which I
> think clears the thread's interrupted flag), and then retries the send. If 
> the send succeeds on the retry, I think that the ClosedByInterruptException 
> exception is effectively swallowed, and the BrokerProxy will continue
> fetching messages as though its thread was never interrupted.
> Am I misunderstanding how things work?
> Cheers,
> Chris
> [1] 
> https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L126
> [2] 
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L75



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)