Re: Consumer client not able to receive messages when one of broker is pushed down in the cluster

2018-01-07 Thread rAhul
Please suggest on how to avoid restart of Kafka Consumer Client on one of
the broker fail down in the cluster.
Or do we need to update the offset manually if we put back the failed
broker in the cluster.

On 5 January 2018 at 19:18, rAhul  wrote:

> Hi,
>
> I have a Apache kafka cluster with 3 nodes(say 1,2,3) with replication
> factor of 3 and partitions as 3.
>
> When my producer client, consumer client and the cluster are running, able
> to transfer messages from producer to consumer without any issues.
>
> Now I stopped leader node say node 1 from the cluster and now say node 2
> is promoted as leader.
>
> Message flow from producer to consumer works fine without any issues.
>
> Now I started node 1 and stopped node 2, either node 1 or node 3 is
> promoted as leader.
>
> Now producer able to send messages but consumer not able to receive
> messages.
>
> I see consumer lag using kafka manager web console.
>
> Again if I start node 2, consumer able to receive messages.
>
> Please suggest how to overcome this issue and fix it.
>
> Thanks.
>


Kafka 1.0 - corrupt index found

2018-01-07 Thread peter holm

hi,

after restart kafka 1.0/win64 does not work, plenty of corrupt index 
file - file could not open - is this zookeeper keeping it open ? or is 
this a known bug ?


Peter



[jira] [Created] (KAFKA-6432) Lookup indices may cause unnecessary page fault

2018-01-07 Thread Ying Zheng (JIRA)
Ying Zheng created KAFKA-6432:
-

 Summary: Lookup indices may cause unnecessary page fault
 Key: KAFKA-6432
 URL: https://issues.apache.org/jira/browse/KAFKA-6432
 Project: Kafka
  Issue Type: Improvement
  Components: core, log
Reporter: Ying Zheng
 Attachments: Binary Search - Diagram 1.png, Binary Search - Diagram 
2.png

For each topic-partition, Kafka broker maintains two indices: one for message 
offset, one for message timestamp. By default, a new index entry is appended to 
each index for every 4KB messages. The lookup of the indices is a simple binary 
search. The indices are mmaped files, and cached by Linux page cache.

Both consumer fetch and follower fetch have to do an offset lookup, before 
accessing the actual message data. The simple binary search algorithm used for 
looking up the index is not cache friendly, and may cause page faults even on 
high QPS topic-partitions.

For example (diagram 1), when looking up an index entry in page 12, the binary 
search algorithm has to read page 0, 6, 9 and 11. After new messages are 
appended to the topic-partition, the index grows to 13 pages. Now, if the 
follower fetch request looking up the 1st index entry of page 13, the binary 
search algorithm will go to page 0, 7, 10 and 12. Among those pages, page 7 and 
10 have not been used for a long time, and may already be swapped to hard disk.

Actually, in a normal Kafka broker, all the follower fetch requests and most 
consumer fetch requests should only look up the last few entries of the index. 
We can make the index lookup more cache friendly, by searching in the last one 
or two pages of the index first. (Diagram 2)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6431) Lock contention in Purgatory

2018-01-07 Thread Ying Zheng (JIRA)
Ying Zheng created KAFKA-6431:
-

 Summary: Lock contention in Purgatory
 Key: KAFKA-6431
 URL: https://issues.apache.org/jira/browse/KAFKA-6431
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Ying Zheng
Priority: Minor


Purgatory is the data structure in Kafka broker that manages delayed 
operations. There is a ConcurrentHashMap (Kafka Pool) maps each operation key 
to the operations (in a ConcurrentLinkedQueue) that are interested in the key.

When an operation is done or expired, it's removed from the list 
(ConcurrentLinkedQueue). When the list is empty, it's removed from the 
ConcurrentHashMap. The 2nd operation has to be protected by a lock, to avoid 
adding new operations into a list that is being removed. This is currently done 
by a globally shared ReentrantReadWriteLock. All the read operations on 
purgatory have to acquire the read permission of this lock. The list removing 
operations needs the write permission of this lock.

Our profiling result shows that Kafka broker is spending a nontrivial time on 
this read write lock.

The problem is exacerbated when there are a large amount of short operations. 
For example, when we are doing sync produce operations (acks=all), a 
DelayedProduce operation is added and then removed for each message. If the QPS 
of the topic is not high, it's very likely that, when the operation is done and 
removed, the list of that key (topic partitions) becomes empty, and has to be 
removed when holding the write lock. This operation blocks all the read / write 
operations on purgatory for awhile. As there are tens of IO threads accessing 
purgatory concurrently, this shared lock can easily become a bottleneck. 

Actually, we only want to avoid concurrent read / write on the same key. The 
operations on different keys do not conflict with each other.

I suggest to shard purgatory into smaller partitions, and lock each individual 
partition independently.

Assuming there are 10 io threads actively accessing purgatory, sharding 
purgatory into 512 partitions will make the probability for 2 threads accessing 
the same partition at the same time to about 2%. We also can use ReentrantLock 
instead of ReentrantReadWriteLock. When the read operations are not much more 
than write operations, ReentrantLock has lower overhead than 
ReentrantReadWriteLock.







--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6430) Improve Kafka GZip compression performance

2018-01-07 Thread Ying Zheng (JIRA)
Ying Zheng created KAFKA-6430:
-

 Summary: Improve Kafka GZip compression performance
 Key: KAFKA-6430
 URL: https://issues.apache.org/jira/browse/KAFKA-6430
 Project: Kafka
  Issue Type: Improvement
  Components: clients, core
Reporter: Ying Zheng
Priority: Minor


To compress messages, Kafka uses DataOutputStream on top of GZIPOutputStream:
new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
To decompress messages, Kafka uses DataInputStream on top of GZIPInputStream:
   new DataInputStream(new GZIPInputStream(buffer));
This is very straight forward, but actually inefficient. For each message, in 
addition to the key and value data, Kafka has to write about 30 some metadata 
bytes (slightly varies in different Kafka version), including magic byte, 
checksum, timestamp, offset, key length, value length etc. For each of these 
bytes, java DataOutputStream has to call write(byte) once. Here is the awkward 
writeInt() method in DataInputStream, which writes 4 bytes separately in 
big-endian order. 
public final void writeInt(int v) throws IOException {
out.write((v >>> 24) & 0xFF);
out.write((v >>> 16) & 0xFF);
out.write((v >>>  8) & 0xFF);
out.write((v >>>  0) & 0xFF);
incCount(4);
}

Unfortunately, GZIPOutputStream does not implement the write(byte) method. 
Instead, it only provides a write(byte[], offset, len) method, which calls the 
corresponding JNI zlib function. The write(byte) calls from DataOutputStream 
are translated into write(byte[], offset, len) calls in a very inefficient way: 
(Oracle JDK 1.8 code)
class DeflaterOutputStream {
public void write(int b) throws IOException {
byte[] buf = new byte[1];
buf[0] = (byte)(b & 0xff);
write(buf, 0, 1);
}

public void write(byte[] b, int off, int len) throws IOException {
if (def.finished()) {
throw new IOException("write beyond end of stream");
}
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return;
}
if (!def.finished()) {
def.setInput(b, off, len);
while (!def.needsInput()) {
deflate();
}
}
}
}

class GZIPOutputStream extends DeflaterOutputStream {
public synchronized void write(byte[] buf, int off, int len)
throws IOException
{
super.write(buf, off, len);
crc.update(buf, off, len);
}
}

class Deflater {
private native int deflateBytes(long addr, byte[] b, int off, int len, int 
flush);
}

class CRC32 {
public void update(byte[] b, int off, int len) {
if (b == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
crc = updateBytes(crc, b, off, len);
}

private native static int updateBytes(int crc, byte[] b, int off, int len);
}
For each meta data byte, the code above has to allocate 1 single byte array, 
acquire several locks, call two native JNI methods (Deflater.deflateBytes and 
CRC32.updateBytes). In each Kafka message, there are about 30 some meta data 
bytes.

The call stack of Deflater.deflateBytes():
DeflaterOutputStream.public void write(int b) -> GZIPOutputStream.write(byte[] 
buf, int off, int len) -> DeflaterOutputStream.write(byte[] b, int off, int 
len) -> DeflaterOutputStream.deflate() -> Deflater.deflate(byte[] b, int off, 
int len) -> Deflater.deflate(byte[] b, int off, int len, int flush) -> 
Deflater.deflateBytes(long addr, byte[] b, int off, int len, int flush)

The call stack of CRC32.updateBytes():
DeflaterOutputStream.public void write(int b) -> GZIPOutputStream.write(byte[] 
buf, int off, int len) -> CRC32.update(byte[] b, int off, int len) -> 
CRC32.updateBytes(int crc, byte[] b, int off, int len)

At Uber, we found that adding a small buffer between DataOutputStream and 
GZIPOutputStream can speed up Kafka GZip compression speed by about 60% in 
average.
 -return new DataOutputStream(new GZIPOutputStream(buffer, 
bufferSize));
+return new DataOutputStream(new BufferedOutputStream(new 
GZIPOutputStream(buffer, bufferSize), 1 << 14));

The similar fix also applies to GZip decompression.

Here is the test result using the production traffic at Uber:
|| Topic || Avg Message Size (bytes) || Vanilla Kafka Throughput (MB/s) || 
Kafka /w GZip Buffer Throughput (MB/s) || Speed Up||
| topic 1 | 197 | 10.9 | 21.9 | 2.0 |
| topic 2 | 208 | 8.5 | 15.9 | 1.9 |
| topic 3 | 624 | 15.3 | 20.2 | 1.3 |
| topic 4 | 766 | 28.0 | 43.7 | 1.6 |
| topic 5 | 1168 | 22.9 | 25.4 | 1.1 |
| topic 6 | 165021 | 9.1 | 9.2 |  1.0 |





--
This message was sent by 

Re: [DISCUSS] KIP-222 - Add "describe consumer group" to KafkaAdminClient

2018-01-07 Thread Jorge Esteban Quilcate Otoya
Great!

I have added `listGroupOffsets` to the KIP.

If there are no additional feedback, VOTE thread is already open.

Cheers,
Jorge


El mar., 2 ene. 2018 a las 17:49, Gwen Shapira ()
escribió:

> listGroups and listGroupOffsets will make it a snap to transition the
> existing ConsumerGroups CLI to depend on client libraries only.
>
> Thanks for adding them :)
>
> On Sun, Dec 31, 2017 at 1:39 PM Jorge Esteban Quilcate Otoya <
> quilcate.jo...@gmail.com> wrote:
>
> > Thanks all for your feedback, and sorry for late response.
> >
> > I'm considering the following:
> >
> > ```AdminClient.java
> > public abstract ListGroupsResult listGroups(ListGroupsOptions
> options);
> >
> > public ListGroupsResult listGroups() {
> > return listGroups(new ListGroupsOptions());
> > }
> >
> > public ListGroupsResult listConsumerGroups(ListGroupsOptions
> options) {
> > //filtering groups by ConsumerProtocol.PROTOCOL_TYPE
> > }
> >
> > public ListGroupsResult listConsumerGroups() {
> > return listConsumerGroups(new ListGroupsOptions());
> > }
> > ```
> >
> > About `describeConsumerGroups`, I'm considering renaming to
> > `describeGroups` and rename `ConsumerGroupDescription` and
> > `ConsumerDescription` to `GroupDescription` to `MemberDescription`.
> > Not sure we need a deserializer, we can access `DescribeGroupsResponse`
> > members directly.
> >
> > As @dan says, I also think `listGroupOffsets` could be added to this KIP
> to
> > make it complete.
> >
> > I'm thinking about renaming this KIP to "Add Consumer Group operations to
> > Admin API".
> >
> > I'm updating the KIP accordingly.
> >
> > Cheers and happy 2018!
> >
> > Jorge.
> >
> > El mié., 13 dic. 2017 a las 19:06, Colin McCabe ()
> > escribió:
> >
> > > On Tue, Dec 12, 2017, at 09:39, Jason Gustafson wrote:
> > > > Hi Colin,
> > > >
> > > > They do share the same namespace. We have a "protocol type" field in
> > the
> > > > JoinGroup request to make sure that all members are of the same kind.
> > >
> > > Hi Jason,
> > >
> > > Thanks.  That makes sense.
> > >
> > > > Very roughly what I was thinking is something like this. First we
> > > introduce an
> > > > interface for deserialization:
> > > >
> > > > interface GroupMetadataDeserializer {
> > > >   String protocolType();
> > > >   Metadata desrializeMetadata(ByteBuffer);
> > > >   Assignment deserializeAssignment(ByteBuffer);
> > > > }
> > > >
> > > > Then we add some kind of generic container:
> > > >
> > > > class MemberMetadata {
> > > >   Metadata metadata;
> > > >   Assignment assignment;
> > > > }
> > > >
> > > > Then we have two APIs: one generic and one specific to consumer
> groups:
> > > >
> > > >  Map> describeGroup(String groupId,
> > > > GroupMetadataDeserializer deserializer);
> > > >
> > > > Map describeConsumerGroup(String
> > groupId);
> > > >
> > > > (This is just a sketch, so obviously we can change them to use
> futures
> > or
> > > > to batch or whatever.)
> > > >
> > > > I think it would be fine to not provide a connect-specific API since
> > this
> > > > usage will probably be limited to Connect itself.
> > >
> > > Yeah, it probably makes sense to have a separation between
> describeGroup
> > > and describeConsumerGroup.
> > >
> > > We will have to be pretty careful with cross-version compatibility in
> > > describeConsumerGroup.  It should be possible for an old client to talk
> > > to a new broker, and a new client to talk to an old broker.  So we
> > > should be prepared to read data in multiple formats.
> > >
> > > I'm not sure if we need to have a 'deserializer' argument to
> > > describeGroup.  We can just let them access a byte array, right?
> > > Theoretically they might also just want to check for the presence or
> > > absence of a group, but not deserialize anything.
> > >
> > > best,
> > > Colin
> > >
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > >
> > > > On Mon, Dec 11, 2017 at 9:15 PM, Colin McCabe 
> > > wrote:
> > > >
> > > > > Sorry... this is probably a silly question, but do Kafka Connect
> > groups
> > > > > share a namespace with consumer groups?  If we had a separate API
> for
> > > > > Kafka Connect groups vs. Consumer groups, would that make sense?
> Or
> > > > > should we unify them?
> > > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > >
> > > > > On Mon, Dec 11, 2017, at 16:11, Jason Gustafson wrote:
> > > > > > Hi Jorge,
> > > > > >
> > > > > > Kafka group management is actually more general than consumer
> > groups
> > > > > > (e.g.
> > > > > > there are kafka connect groups). If we are adding these APIs, I
> > would
> > > > > > suggest we consider the more general protocol and how to expose
> > > > > > group-protocol-specific metadata. For example, it might be
> > > reasonable to
> > > > > > have both an API to access to the 

Re: Could give me the right to assign task to myself for kafka Jira

2018-01-07 Thread Guozhang Wang
Xin,

I have added you to the contributor list, you can assign yourself to JIRAs
now.

CHeers,

Guozhang

On Sun, Jan 7, 2018 at 2:11 AM, Xin Li  wrote:

> Hey,
>
> I want to assign this task [https://issues.apache.org/
> jira/browse/KAFKA-6422] to myself so I can fix that. But it looks like I
> don’t have the right to do so. Could you please grant me that right for
> that ?
>
> Thank you,
>
> Xin LI
>



-- 
-- Guozhang


Could give me the right to assign task to myself for kafka Jira

2018-01-07 Thread Xin Li
Hey,

I want to assign this task [https://issues.apache.org/jira/browse/KAFKA-6422] 
to myself so I can fix that. But it looks like I don’t have the right to do so. 
Could you please grant me that right for that ?

Thank you,

Xin LI