Comparing Pulsar and Kafka: unified queuing and streaming

2017-12-03 Thread Khurrum Nasim
Dear Kafka Community,

I happened to read this blog post comparing the messaging model between
Apache Pulsar and Apache Kafka. It sounds interesting. Apache Pulsar claims
to unify streaming (kafka) and queuing (rabbitmq) in one unified API.
Pulsar also seems to support Kafka API. Have anyone taken a look at Pulsar?
How does the community think about this? Pulsar is also an Apache project.
Is there any collaboration can happen between these two projects?

https://streaml.io/blog/pulsar-streaming-queuing/

BTW, I am a Kafka user, loving Kafka a lot. Just try to see what other
people think about this.

- KN


[jira] [Created] (KAFKA-6305) log.retention.hours, retention.ms not working in segments deletion for kafka topic

2017-12-03 Thread VinayKumar (JIRA)
VinayKumar created KAFKA-6305:
-

 Summary: log.retention.hours, retention.ms not working in segments 
deletion for kafka topic
 Key: KAFKA-6305
 URL: https://issues.apache.org/jira/browse/KAFKA-6305
 Project: Kafka
  Issue Type: Bug
  Components: config, log
Affects Versions: 0.10.2.1
 Environment: CentOS 7 
Reporter: VinayKumar


I'm using Kafka 0.10.2.1 version, and have log.retention.hours set to 
24hrs(log.retention.hours=24). But the topic partition segments are not deleted 
as per this retention, and segments more than 24hr are still there in the 
partition logs.

Further, I updated the retention.ms to 18 hrs(retention.ms=6480) for the 
topic, but still segments more than this retention period are there in the 
partition logs.
Can someone please help on this problem. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6304) The controller should allow update the partition reassignment for the partitions being reassigned

2017-12-03 Thread Jiangjie Qin (JIRA)
Jiangjie Qin created KAFKA-6304:
---

 Summary: The controller should allow update the partition 
reassignment for the partitions being reassigned
 Key: KAFKA-6304
 URL: https://issues.apache.org/jira/browse/KAFKA-6304
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin


Currently the controller will not process the partition reassignment of a 
partition if the partition is already being reassigned.

The issue is that if there is a broker failure during the partition 
reassignment, the partition reassignment may never finish. And the users may 
want to cancel the partition reassignment. However, the controller will refuse 
to do that unless user manually deletes the partition reassignment zk path, 
force a controller switch and then issue the revert command. This is pretty 
involved. It seems reasonable for the controller to replace the ongoing stuck 
reassignment and replace it with the updated partition assignment.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6303) Potential lack of synchronization in NioEchoServer#AcceptorThread

2017-12-03 Thread Ted Yu (JIRA)
Ted Yu created KAFKA-6303:
-

 Summary: Potential lack of synchronization in 
NioEchoServer#AcceptorThread
 Key: KAFKA-6303
 URL: https://issues.apache.org/jira/browse/KAFKA-6303
 Project: Kafka
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


In the run() method:
{code}
SocketChannel socketChannel = 
((ServerSocketChannel) key.channel()).accept();
socketChannel.configureBlocking(false);
newChannels.add(socketChannel);
{code}
Modification to newChannels should be protected by synchronized block.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-03 Thread Becket Qin
>The correlation ID is used within a single TCP session, to uniquely
>associate a request with a response.  The correlation ID is not unique
>(and has no meaning) outside the context of that single TCP session.
>
>Keep in mind, NetworkClient is in charge of TCP sessions, and generally
>tries to hide that information from the upper layers of the code.  So
>when you submit a request to NetworkClient, you don't know if that
>request creates a TCP session, or reuses an existing one.

Hmm, the correlation id is an application level information in each Kafka
request. It is maintained by o.a.k.c.NetworkClient. It is not associated
with TCP sessions. So even the TCP session disconnects and reconnects, the
correlation id is not reset and will still be monotonically increasing.

Maybe I did not make it clear. I am not suggesting anything relying on TCP
or transport layer. Everything is handled at application layer. From the
clients perspective, the timeout is not defined as TCP timeout, it is
defined as the upper bound of time it will wait before receiving a
response. If the client did not receive a response before the timeout is
reached, it will just retry. My suggestion was that as long as a
FetchRequest needs to be retried, no matter for what reason, we just use a
full FetchRequest. This does not depend on NetworkClient implementations,
i.e. regardless of whether the retry is on the existing TCP connection or a
new one.

The question we are trying to answer here is essentially how to let the
leader and followers agree on the messages in the log. And we are comparing
the following two solutions:
1. Use something like a TCP ACK with epoch at Request/Response level.
2. Piggy back the leader knowledge at partition level for the follower to
confirm.

Personally I think (2) is better because (2) is more direct. The leader is
the one who maintains all the state (LEOs) of the followers. At the end of
the day, the leader just wants to make sure all those states are correct.
(2) directly confirms those states with the followers instead of inferring
that from a epoch. Note that there is a subtle but maybe important
difference between our use case of epoch and TCP seq. The difference is
that a TCP ACK confirms all the packets with a lower seq has been received.
In our case, a high epoch request does not mean all the data in the
previous response was successful. So in the KIP, the statement of "When the
leader receives a fetch request with epoch N + 1, it knows that the data it
sent back for the fetch request with epoch N was successfully processed by
the follower." could be tricky or expensive to make right in some cases.

Not sure if we have considered this, but when thinking of the above
comparison, the following two potential issues came up:

1. Thinking about the case of a consumer. If consumer.seek() or
consumer.pause() is called. the consumer has essentially updated its
interested set of topics or positions. This will needs a full FetchRequest
to update the position on the leader. And thus create a new session. Now if
users call seek()/pause() very often, the broker could run out of fetch
session slot pretty quickly.

2. Corrupted messages. If a fetch response has a corrupt message, the
follower will back off for a while and try fetch again. During the back off
period, the follower will not be fetching from the partition with corrupt
message. And after the back off the partition will be added back. With the
current design, it seems the follower will need to keep creating new
sessions.

In the above two cases, it might still be useful to let the session id be
unique for each client instance (just like the producer id for the
idempotent produce) and allow the client to update the leader side
interested partitions and position with full FetchRequest without creating
a new session id.

Thanks,

Jiangjie (Becket) Qin



On Sun, Dec 3, 2017 at 12:55 PM, Colin McCabe  wrote:

> On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote:
> > Thanks for the explanation, Colin. A few more questions.
> >
> > >The session epoch is not complex.  It's just a number which increments
> > >on each incremental fetch.  The session epoch is also useful for
> > >debugging-- it allows you to match up requests and responses when
> > >looking at log files.
> >
> > Currently each request in Kafka has a correlation id to help match the
> > requests and responses. Is epoch doing something differently?
>
> Hi Becket,
>
> The correlation ID is used within a single TCP session, to uniquely
> associate a request with a response.  The correlation ID is not unique
> (and has no meaning) outside the context of that single TCP session.
>
> Keep in mind, NetworkClient is in charge of TCP sessions, and generally
> tries to hide that information from the upper layers of the code.  So
> when you submit a request to NetworkClient, you don't know if that
> request creates a TCP session, or reuses an existing one.
>
> >
> > >Unfortunately, this 

Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-03 Thread Colin McCabe
On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote:
> Thanks for the explanation, Colin. A few more questions.
> 
> >The session epoch is not complex.  It's just a number which increments
> >on each incremental fetch.  The session epoch is also useful for
> >debugging-- it allows you to match up requests and responses when
> >looking at log files.
> 
> Currently each request in Kafka has a correlation id to help match the
> requests and responses. Is epoch doing something differently?

Hi Becket,

The correlation ID is used within a single TCP session, to uniquely
associate a request with a response.  The correlation ID is not unique
(and has no meaning) outside the context of that single TCP session.

Keep in mind, NetworkClient is in charge of TCP sessions, and generally
tries to hide that information from the upper layers of the code.  So
when you submit a request to NetworkClient, you don't know if that
request creates a TCP session, or reuses an existing one.

> 
> >Unfortunately, this doesn't work.  Imagine the client misses an
> >increment fetch response about a partition.  And then the partition is
> >never updated after that.  The client has no way to know about the
> >partition, since it won't be included in any future incremental fetch
> >responses.  And there are no offsets to compare, since the partition is
> >simply omitted from the response.
> 
> I am curious about in which situation would the follower miss a response
> of a partition. If the entire FetchResponse is lost (e.g. timeout), the
> follower would disconnect and retry. That will result in sending a full
> FetchRequest.

Basically, you are proposing that we rely on TCP for reliable delivery
in a distributed system.  That isn't a good idea for a bunch of
different reasons.  First of all, TCP timeouts tend to be very long.  So
if the TCP session timing out is your error detection mechanism, you
have to wait minutes for messages to timeout.  Of course, we add a
timeout on top of that after which we declare the connection bad and
manually close it.  But just because the session is closed on one end
doesn't mean that the other end knows that it is closed.  So the leader
may have to wait quite a long time before TCP decides that yes,
connection X from the follower is dead and not coming back, even though
gremlins ate the FIN packet which the follower attempted to translate. 
If the cache state is tied to that TCP session, we have to keep that
cache around for a much longer time than we should.

Secondly, from a software engineering perspective, it's not a good idea
to try to tightly tie together TCP and our code.  We would have to
rework how we interact with NetworkClient so that we are aware of things
like TCP sessions closing or opening.  We would have to be careful
preserve the ordering of incoming messages when doing things like
putting incoming requests on to a queue to be processed by multiple
threads.  It's just a lot of complexity to add, and there's no upside.

Imagine that I made an argument that client IDs are "complex" and should
be removed from our APIs.  After all, we can just look at the remote IP
address and TCP port of each connection.  Would you think that was a
good idea?  The client ID is useful when looking at logs.  For example,
if a rebalance is having problems, you want to know what clients were
having a problem.  So having the client ID field to guide you is
actually much less "complex" in practice than not having an ID.

Similarly, if metadata responses had epoch numbers (simple incrementing
numbers), we would not have to debug problems like clients accidentally
getting old metadata from servers that had been partitioned off from the
network for a while.  Clients would know the difference between old and
new metadata.  So putting epochs in to the metadata request is much less
"complex" operationally, even though it's an extra field in the request.
 This has been discussed before on the mailing list.

So I think the bottom line for me is that having the session ID and
session epoch, while it adds two extra fields, reduces operational
complexity and increases debuggability.  It avoids tightly coupling us
to assumptions about reliable ordered delivery which tend to be violated
in practice in multiple layers of the stack.  Finally, it  avoids the
necessity of refactoring NetworkClient.

best,
Colin


> If there is an error such as NotLeaderForPartition is
> returned for some partitions, the follower can always send a full
> FetchRequest. Is there a scenario that only some of the partitions in a
> FetchResponse is lost?
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> 
> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe  wrote:
> 
> > On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote:
> > > On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe 
> > wrote:
> > >
> > > > On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote:
> > > > > Hey Colin,
> > > > >
> > > > > Thanks much for the update. I have a few 

[GitHub] kafka pull request #4289: KAFKA-6301 changing regular expression in ops.html...

2017-12-03 Thread waleedfateem
GitHub user waleedfateem opened a pull request:

https://github.com/apache/kafka/pull/4289

KAFKA-6301 changing regular expression in ops.html from '*' to '.*'

The documentation for section "Mirroring data between clusters" states the 
following:
Or you could mirror all topics using --whitelist '*'
The regular expression should be '.*' instead.

This fix makes the change directly to the ops.html file.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/waleedfateem/kafka KAFKA-6301

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4289.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4289


commit f26dd298c24d22656d26575a03201a43c326b4df
Author: waleedfateem 
Date:   2017-12-03T19:42:47Z

KAFKA-6301 changing incorrect Java Regex example for mirroring all topics 
from '*' to '.*' in ops.html




---


Jenkins build is back to normal : kafka-trunk-jdk9 #236

2017-12-03 Thread Apache Jenkins Server
See 




Jenkins build is back to normal : kafka-trunk-jdk7 #3016

2017-12-03 Thread Apache Jenkins Server
See 




Build failed in Jenkins: kafka-trunk-jdk8 #2256

2017-12-03 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] MINOR: Fix broker compatibility tests

--
[...truncated 1.42 MB...]
org.apache.kafka.common.config.ConfigDefTest > testValidateCannotParse STARTED

org.apache.kafka.common.config.ConfigDefTest > testValidateCannotParse PASSED

org.apache.kafka.common.config.ConfigDefTest > testValidate STARTED

org.apache.kafka.common.config.ConfigDefTest > testValidate PASSED

org.apache.kafka.common.config.ConfigDefTest > 
testInternalConfigDoesntShowUpInDocs STARTED

org.apache.kafka.common.config.ConfigDefTest > 
testInternalConfigDoesntShowUpInDocs PASSED

org.apache.kafka.common.config.ConfigDefTest > testInvalidDefaultString STARTED

org.apache.kafka.common.config.ConfigDefTest > testInvalidDefaultString PASSED

org.apache.kafka.common.config.ConfigDefTest > testSslPasswords STARTED

org.apache.kafka.common.config.ConfigDefTest > testSslPasswords PASSED

org.apache.kafka.common.config.ConfigDefTest > testBaseConfigDefDependents 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testBaseConfigDefDependents 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringClass 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringClass 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringShort 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringShort 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testInvalidDefault STARTED

org.apache.kafka.common.config.ConfigDefTest > testInvalidDefault PASSED

org.apache.kafka.common.config.ConfigDefTest > toRst STARTED

org.apache.kafka.common.config.ConfigDefTest > toRst PASSED

org.apache.kafka.common.config.ConfigDefTest > testCanAddInternalConfig STARTED

org.apache.kafka.common.config.ConfigDefTest > testCanAddInternalConfig PASSED

org.apache.kafka.common.config.ConfigDefTest > testMissingRequired STARTED

org.apache.kafka.common.config.ConfigDefTest > testMissingRequired PASSED

org.apache.kafka.common.config.ConfigDefTest > 
testParsingEmptyDefaultValueForStringFieldShouldSucceed STARTED

org.apache.kafka.common.config.ConfigDefTest > 
testParsingEmptyDefaultValueForStringFieldShouldSucceed PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringPassword 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringPassword 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringList 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringList 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringLong 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringLong 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringBoolean 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringBoolean 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testNullDefaultWithValidator 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testNullDefaultWithValidator 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testMissingDependentConfigs 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testMissingDependentConfigs 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringDouble 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringDouble 
PASSED

org.apache.kafka.common.config.ConfigDefTest > toEnrichedRst STARTED

org.apache.kafka.common.config.ConfigDefTest > toEnrichedRst PASSED

org.apache.kafka.common.config.ConfigDefTest > testDefinedTwice STARTED

org.apache.kafka.common.config.ConfigDefTest > testDefinedTwice PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringString 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringString 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testNestedClass STARTED

org.apache.kafka.common.config.ConfigDefTest > testNestedClass PASSED

org.apache.kafka.common.config.ConfigDefTest > testBadInputs STARTED

org.apache.kafka.common.config.ConfigDefTest > testBadInputs PASSED

org.apache.kafka.common.config.ConfigDefTest > testValidateMissingConfigKey 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testValidateMissingConfigKey 
PASSED

org.apache.kafka.common.config.AbstractConfigTest > testOriginalsWithPrefix 
STARTED

org.apache.kafka.common.config.AbstractConfigTest > testOriginalsWithPrefix 
PASSED

org.apache.kafka.common.config.AbstractConfigTest > testConfiguredInstances 
STARTED

org.apache.kafka.common.config.AbstractConfigTest > testConfiguredInstances 
PASSED

org.apache.kafka.common.config.AbstractConfigTest > 
testValuesWithPrefixOverride STARTED

org.apache.kafka.common.config.AbstractConfigTest > 
testValuesWithPrefixOverride PASSED


[GitHub] kafka pull request #4286: MINOR: fix broker compatibility tests

2017-12-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4286


---


[jira] [Created] (KAFKA-6302) Topic can not be recreated after it is deleted

2017-12-03 Thread kic (JIRA)
kic created KAFKA-6302:
--

 Summary: Topic can not be recreated after it is deleted
 Key: KAFKA-6302
 URL: https://issues.apache.org/jira/browse/KAFKA-6302
 Project: Kafka
  Issue Type: Bug
  Components: admin, clients
Affects Versions: 1.0.0
Reporter: kic


I use an embedded kafka for unit test. My application relies on the ability to 
recreate topics programmatically. Currently it is not possible to re-create a 
topic after it has been deleted.

{code}
// needs compile time depedency 'net.manub:scalatest-embedded-kafka_2.11:1.0.0'
package kic.kafka.embedded

import java.util.Properties

import org.apache.kafka.clients.admin.{AdminClient, NewTopic}
import org.scalatest._

import scala.collection.JavaConverters._

class EmbeddedKafaJavaWrapperTest extends FlatSpec with Matchers {
  val props = new Properties()
  val testTopic = "test-topic"

  "A running server" should "return a list of topics" in {
props.setProperty("bootstrap.servers", "localhost:10001")
props.setProperty("delete.enable.topic", "true")
props.setProperty("group.id", "test-client")
props.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.LongDeserializer")
props.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("clinet.id", "test-client")
props.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.LongSerializer")
props.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer")

EmbeddedKafaJavaWrapper.start(10001, 10002, props)

try {
  implicit val admin = AdminClient.create(props)

  // create topic and confirm it exists
  createTopic(testTopic)
  val topics = listTopics()
  info(s"topics: $topics")
  topics should contain(testTopic)

  // now we should be able to send something to this topic
  // TODO create producer and send something

  // delete topic
  deleteTopic(testTopic)
  listTopics() shouldNot contain(testTopic)

  // recreate topic
  createTopic(testTopic)
  // listTopics() should contain(testTopic)

  // and finally consume from the topic and expect to get 0 entries
  // TODO create consumer and poll once
} finally {
  EmbeddedKafaJavaWrapper.stop()
}

  }

  def listTopics()(implicit admin: AdminClient) =
admin.listTopics().names().get()

  def createTopic(topic: String)(implicit admin: AdminClient) =
admin.createTopics(Seq(new NewTopic(topic, 1, 1)).asJava)

  def deleteTopic(topic: String)(implicit admin: AdminClient) =
admin.deleteTopics(Seq("test-topic").asJava).all().get()

}
{code}

Btw, what happens to connected consumers when I delete a topic? 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Documentation JIRA KAFKA-6301 and access to repo

2017-12-03 Thread Waleed Fateem
Hello,

I created a JIRA (KAFKA-6301) for a minor change to the documentation but
it doesn't seem like I can assign the ticket to myself. Can someone help me
out?

I'm also trying to commit and push the change to the Kafka repository but
I'm getting the following error:

remote: Permission to apache/kafka.git denied to waleedfateem.

fatal: unable to access 'https://github.com/apache/kafka.git/': The
requested URL returned error: 403

Regards,

Waleed


[jira] [Created] (KAFKA-6301) Incorrect Java Regex example '*' for mirroring all topics

2017-12-03 Thread Waleed Fateem (JIRA)
Waleed Fateem created KAFKA-6301:


 Summary: Incorrect Java Regex example '*' for mirroring all topics
 Key: KAFKA-6301
 URL: https://issues.apache.org/jira/browse/KAFKA-6301
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Affects Versions: 1.0.0, 0.11.0.0, 0.10.2.0
Reporter: Waleed Fateem
Priority: Minor


The documentation for section "Mirroring data between clusters" states the 
following:

Or you could mirror all topics using --whitelist '*'

The regular expression should be '.*' instead. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-03 Thread Jan Filipiak


On 02.12.2017 23:34, Colin McCabe wrote:

On Thu, Nov 30, 2017, at 23:29, Jan Filipiak wrote:

Hi,

this discussion is going a little bit far from what I intended this
thread for.
I can see all of this beeing related.

To let you guys know what I am currently thinking is the following:

I do think the handling of Id's and epoch is rather complicated. I think
the complexity
comes from aiming for to much.

1. Currently all the work is towards making fetchRequest
completely empty. This brings all sorts of pain with regards to the
broker actually needs
to know what he send even though it tries to use sendfile as much as
possible.
2. Currently all the work is towards also making empty fetch request
across TCP sessions.

In this thread I aimed to relax our goals with regards to point 2.
Connection resets for us
are really the exceptions and I would argue, trying to introduce
complexity for sparing
1 full request on connection reset is not worth it. Therefore I argued
to keep the Server
side information with the Session instead somewhere global. Its not
going to bring in the
results.

As the discussion unvields I also want to challenge our approach for
point 1.
I do not see a reason to introduce complexity (and
   especially on the fetch answer path). Did we consider that from the
client we just send the offsets
we want to fetch and skip the topic partition description and just use
the order to match the information
on the broker side again? This would also reduce the fetch sizes a lot
while skipping a ton of complexity.

Hi Jan,

We need to solve the problem of the fetch request taking
O(num_partitions) space and time to process.  A solution that keeps the
O(num_partitions) behavior, but improves it by a constant factor,
doesn't really solve the problem.  And omitting some partition
information, but leaving other partition information in place,
definitely falls in that category, wouldn't you agree?  Also, as others
have noted, if you omit the partition IDs, you run into a lot of
problems surrounding changes in the partition membership.

best,
Colin

Hi Colin,

I agree that a fetch request sending only offsets still growths with the 
number of partitions.
Processing time, I can only follow as it comes to parsing, but I don't 
see a difference in the

work a broker has todo more for received offsets than cached offsets.

Given we still have the 100.000 partition case a fetchrequest as I 
suggest would savely
get below <1MB. How much of an improvement this is really depends on 
your set up.


Say you have all of these in 1 topic you are saving effectively maybe 
50% already.

As you increase topics and depending on how long you topic names are you get
extra savings.
In my playground cluster this is 160 topics, average 10 partitions, 2 
brokers
average and mean topic length is 54 and replication factor 2. This would 
result in
a saving of 5,5 bytes / topic-partition fetched. So from currently 21,5 
bytes per topic-partion
it would go down to basically 8, almost 2/3 saving. On our production 
cluster which has
a higher broker to replication factor ratio the savings are bigger. The 
Average of replicated partitions per
topic there is ~3 . This is roughly 75% percent saving in fetch request 
size. For us,
since we have many slowly changing smaller topics, varint encoding of 
offsets would give another big boost

as many fit into 2-3 bytes.


I do not quite understand what it means to omit partition-ids and 
changing ownership. The partition ID can be retrieved
by ordinal position from the brokers cache. The broker serving the fetch 
request
should not care if this consumer owns the partition in terms of its 
group membership. If the broker should no longer be the leader
of the partition he can return "not leader for partition" as usual. 
Maybe you can point me where this has been explained as I

couldn't really find a place where it got clear to me.

I think 75% saving and more is realistic and even though linear to the 
number of partitions fetch a very practical aproach that fits
the design principles "the consumer decides" a lot better. I am still 
trying to fully understand how the plan is to update the offsets broker
wise. No need to explain that here as I think I know where to look it 
up, I guess that is introduces a lot of complexity with sendfile and
an additional index lookup that I have a hard time believing it will pay 
off. Both in source code complexity and efficiency.


I intend to send you an answer on the other threads as soon as I get to 
it.  Hope this explains my view of

the size trade-off well enough. Would very much appreciate your opinion.

Best Jan




Hope these ideas are interesting

best Jan


On 01.12.2017 01:47, Becket Qin wrote:

Hi Colin,

Thanks for updating the KIP. I have two comments:

1. The session epoch seems introducing some complexity. It would be good if
we don't have to maintain the epoch.
2. If all the partitions has data returned (even a few messages), the next
fetch