[jira] [Created] (KAFKA-12585) FencedInstanceIdException can cause heartbeat thread to never be closed

2021-03-30 Thread Brian Hawkins (Jira)
Brian Hawkins created KAFKA-12585:
-

 Summary: FencedInstanceIdException can cause heartbeat thread to 
never be closed
 Key: KAFKA-12585
 URL: https://issues.apache.org/jira/browse/KAFKA-12585
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.7.0, 2.5.1
Reporter: Brian Hawkins


The bug has been there since static consumers was introduced.

The problem is all within AbstractCoordinator.java

If a FencedInstanceIdException is throw and onFailure (line 1406) is called by 
a thread other than the heartbeat thread this will occur.  

In the onFailure callback the heartbeatThread.failed is set and the 
heartbeatThread is disabled, but the actual thread is waiting on line 1350 
(AbstractCoordinator.this.wait())

Sometime later pollHeartbeat is called (line 316).  The check for hasFailed is 
true so it sets heartbeatThread = null without freeing the thread and now it 
will never be closed.

 

I have verified this within a debuger using two clients that create read and 
close over and over again using the same group and instance id.  I tested this 
with 2.5.1 but found the same code bug to be in the latest master branch, the 
above line numbers are for the latest in github.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-5847) Plugin option to filter consumer and producer messages on the broker

2017-09-06 Thread Brian Hawkins (JIRA)
Brian Hawkins created KAFKA-5847:


 Summary: Plugin option to filter consumer and producer messages on 
the broker
 Key: KAFKA-5847
 URL: https://issues.apache.org/jira/browse/KAFKA-5847
 Project: Kafka
  Issue Type: Wish
  Components: core
Affects Versions: 0.10.2.1
Reporter: Brian Hawkins


The idea is that I could specify a plugin that would receive a message, after 
authorization but before it is written to the log.  The plugin could then 
modify or reject the message before passing it on.  A good place for this would 
be in KafkaApis.scala in handleProducerRequest.

Similarly a message could be modified before it is sent to the consumer.

I have two use cases in mind: 
1. deal with large messages, the interceptor/filter would write the message to 
a large storage server (think s3).
2. encrypt data before being written to the log.

I'm planning on doing this work, just curious if others are interested so I can 
make a pull request of it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)