Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-11-23 Thread Ismael Juma
Hi Becket,

Relying on the cluster metadata doesn't seem like it would work if there
are multiple fetcher threads, right? It also doesn't work for the consumer
case, which Jay suggested would be good to handle.

Ismael

On Thu, Nov 23, 2017 at 2:21 AM, Becket Qin  wrote:

> Thanks for the KIP, Colin. It is an interesting idea.
>
> Thinking about the fetch protocol, at a high level, currently the following
> conveys two type of information:
> 1) what partitions I am interested in
> 2) where I am on those partitions, i.e. offsets
>
> An extreme optimization would be letting the leader know both 1) and 2)
> then the fetch request could be almost empty. I think we may be able
> achieve this when there is no leader migration.
>
> For 1) we actually kind of already have the information on each broker,
> which is the metadata. We have found that in many cases a versioned
> metadata is very helpful. With the metadata generation we can achieve 1),
> i.e. the follower do not need to tell the leader what does it interested
> in. More specifically, Assuming we add a generation to the metadata, in the
> fetch request the follower will include a metadata generation, if the
> generation matches the generation of the metadata on the leader, the leader
> will send back a response indicating that the leader knows the follower's
> interested set of partitions, so there is no need to send a full fetch
> request. Otherwise, the follower still needs to send a full fetch request
> in the next round. This will achieve the goal that unless there are leader
> migration, the followers do not need to send the full requests.
>
> There are other benefits of having a metadata generation. Those are
> orthogonal to this discussion. But since we may need it elsewhere, we need
> to introduce it at some point.
>
> For 2), there are two options, A) as Jun said, the leader can do a look up
> to know what is the last offset sent back to the follower for each
> partition, or B) the follower sends back the updated log end offset in the
> next fetch request. If we do (A), one potential optimization is that we can
> let the leader always return the offsets at index boundary or log end
> offset. For example, consider a log whose log end offset is 350, and the
> index file has an entry at offset 100, 200, 300. The leader will always try
> to return bytes at the offset boundary or log end, i.e. for each fetch
> response, the leader will try to return the data up to the highest offset
> index entry as long as the data could fit into the fetch size of the
> partition, so it could be either 100, 200, 300 or 350(LEO). If so, the
> leader will know the last returned offset without an additional log scan.
> If the leader was not able to return at the index boundary or log end
> offset, e.g. the fetch size is too small or the index bytes interval is too
> large, the leader could then fall back to lookup the offset. Alternatively,
> the leader can set a flag in the fetch response asking the follower to
> provide the fetch offset in the next fetch request.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Wed, Nov 22, 2017 at 4:53 PM, Jun Rao  wrote:
>
> > Hi, Colin,
> >
> > After step 3a, do we need to update the cached offset in the leader to be
> > the last offset in the data returned in the fetch response? If so, we
> need
> > another offset index lookup since the leader only knows that it gives
> out X
> > bytes in the fetch response, but not the last offset in those X bytes.
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Nov 22, 2017 at 4:01 PM, Colin McCabe 
> wrote:
> >
> > > On Wed, Nov 22, 2017, at 14:09, Jun Rao wrote:
> > > > Hi, Colin,
> > > >
> > > > When fetching data for a partition, the leader needs to translate the
> > > > fetch offset to a position in a log segment with an index lookup. If
> > the
> > > fetch
> > > > request now also needs to cache the offset for the next fetch
> request,
> > > > there will be an extra offset index lookup.
> > >
> > > Hmm.  So the way I was thinking about it was, with an incremental fetch
> > > request, for each partition:
> > >
> > > 1a. the leader consults its cache to find the offset it needs to use
> for
> > > the fetch request
> > > 2a. the leader performs a lookup to translate the offset to a file
> index
> > > 3a. the leader reads the data from the file
> > >
> > > In contrast, with a full fetch request, for each partition:
> > >
> > > 1b. the leader looks at the FetchRequest to find the offset it needs to
> > > use for the fetch request
> > > 2b. the leader performs a lookup to translate the offset to a file
> index
> > > 3b. the leader reads the data from the file
> > >
> > > It seems like there is only one offset index lookup in both cases?  The
> > > key point is that the cache in step #1a is not stored on disk.  Or
> maybe
> > > I'm missing something here.
> > >
> > > best,
> > > Colin
> > >
> > >
> > > > The offset index lookup can
> > > > potentially be expensive since it could require disk I/Os. One

Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-11-23 Thread Ismael Juma
Hi James,

There are 2 options being discussed.

Option A is similar to the existing approach where the follower informs the
leader of offsets it has seen by asking for the next ones. We just skip the
partitions where the offset hasn't changed.

Option B involves the leader keeping track of the offsets returned to the
follower. So, when the follower does the next incremental request (with no
partitions), the leader assumes that the previously returned offsets were
stored by the follower. An important invariant is that the follower can
only send an empty incremental fetch request if the previous response was
successfully processed. What does the follower do if there was an issue
processing _some_ of the partitions in the response? The simplest option
would be to send a full fetch request. An alternative would be for the
follower to send an incremental fetch request with some offsets (overrides
to what the leader expects) although that adds even more complexity (i.e.
it's a combination of options A and B) and may not be worth it.

Ismael

On Thu, Nov 23, 2017 at 4:58 AM, James Cheng  wrote:

> I think the discussion may have already cover this but just in case...
>
> How does the leader decide when a newly written message is "committed"
> enough to hand out to consumers?
>
> When a message is produced and is stored to the disk of the leader, the
> message is not considered "committed" until it has hit all replicas in the
> ISR. Only at that point will the leader decide to hand out the message to
> normal consumers.
>
> In the current protocol, I believe the leader has to wait for 2 fetch
> requests from a follower before it considers the message committed: One to
> fetch the uncommitted message, and another to fetch anything after that. It
> is the fetch offset in the 2nd fetch that tells the leader that the
> follower now has the uncommitted message.
>
> As an example:
> 1a. Newly produced messages at offsets 10,11,12. Saved to leader, not yet
> replicated to followers.
> 2a. Follower asks for messages starting at offset 10. Leader hands out
> messages 10,11,12
> 3a. Follower asks for messages starting at offset 13. Based on that fetch
> request, the leader concludes that the follower already has messages
> 10,11,12, and so will now hand messages 10,11,12 out to consumers.
>
> How will the new protocol handle that? How will the leader know that the
> follower already has messages 10,11,12?
>
> In particular, how will the new protocol handle the case when not all
> partitions are returned in each request?
>
> Another example:
> 1b. Newly produced messages to topic A at offsets 10,11,12. Saved to
> leader, not yet replicated to followers.
> 2b. Newly produced 1MB message to topic B at offset 100. Saved to leader,
> not yet replicated to follower.
> 3b. Follower asks for messages from topic A starting at offset 10, and
> messages from topic B starting at offset 100.
> 4b. Leader decides to send to the follower the 1MB message at topic B
> offset 100. Due to replica.fetch.max.bytes, it only sends that single
> message to the follower.
> 5b. Follower asks for messages from topic A starting at offset 10, and
> messages from topic B starting at offset 101. Leader concludes that topic B
> offset 100 has been replicated and so can be handed out to consumers. Topic
> A messages 10,11,12 are not yet replicated and so cannot yet be handled out
> to consumers.
>
> In this particular case, the follower made no progress on replicating the
> new messages from topic A.
>
> How will the new protocol handle this scenario?
>
> -James
>
> > On Nov 22, 2017, at 7:54 PM, Colin McCabe  wrote:
> >
> > Oh, I see the issue now.  The broker uses sendfile() and sends some
> > message data without knowing what the ending offset is.  To learn that,
> > we would need another index access.
> > However, when we do that index->offset lookup, we know that the next
> offset-
> >> index lookup (done in the following fetch request) will be for the same
> > offset.  So we should be able to cache the result (the index).  Also:
> > Does the operating system’s page cache help us here?
> > Best,
> > Colin
> >
> > On Wed, Nov 22, 2017, at 16:53, Jun Rao wrote:
> >> Hi, Colin,
> >>
> >> After step 3a, do we need to update the cached offset in the
> >> leader to be> the last offset in the data returned in the fetch
> response? If so, we> need
> >> another offset index lookup since the leader only knows that it
> >> gives out> X
> >> bytes in the fetch response, but not the last offset in those X bytes.>
> >> Thanks,
> >>
> >> Jun
> >>
> >> On Wed, Nov 22, 2017 at 4:01 PM, Colin McCabe
> >>  wrote:>
> >>> On Wed, Nov 22, 2017, at 14:09, Jun Rao wrote:
>  Hi, Colin,
> 
>  When fetching data for a partition, the leader needs to
>  translate the> > > fetch offset to a position in a log segment with
> an index lookup.
>  If the> > fetch
>  request now also needs to cache the offset for the next fetch
>  request,> > > there will be

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-11-23 Thread Jan Filipiak
The comment is valid. It falls exactly into this topic, it has exactly 
todo with this!
Even when we have a statefull operation last. We move it to the very 
first processor (KtableSource)

and therefore cant present a proper RecordContext.

Regarding the other Jiras you are referring to. They harm the project 
more than they do good!
There is no need for this kind of optimizer and meta representation and 
what not. I hope they

never get implemented.

Best Jan


On 22.11.2017 14:44, Damian Guy wrote:

Jan, i think you comment with respect to filtering is valid, though not for
this KIP. We have separate JIRAs for topology optimization of which this
falls into.

Thanks,
Damian

On Wed, 22 Nov 2017 at 02:25 Guozhang Wang  wrote:


Jan,

Not sure I understand your argument that "we still going to present
change.oldValue to the filter even though the record context() is for
change.newValue". Are you referring to `KTableFilter#process()`? If yes
could you point to me which LOC are you concerning about?


Guozhang


On Mon, Nov 20, 2017 at 9:29 PM, Jan Filipiak 
wrote:


a remark of mine that got missed during migration:

There is this problem that even though we have source.table.filter.join
the state-fullness happens at the table step not a the join step. In a
filter
we still going to present change.oldValue to the filter even though the
record context() is for change.newValue. I would go as far as applying
the filter before the table processor. Not to just get KIP-159, but

because

I think its a side effect of a non ideal topology layout. If i can filter
99% of my
records. my state could be way smaller. Also widely escalates the context
of the KIP

I can only see upsides of executing the filter first.

Best Jan



On 20.11.2017 22:22, Matthias J. Sax wrote:


I am moving this back to the DISCUSS thread... Last 10 emails were sent
to VOTE thread.

Copying Guozhang's last summary below. Thanks for this summary. Very
comprehensive!

It seems, we all agree, that the current implementation of the context
at PAPI level is ok, but we should not leak it into DSL.

Thus, we can go with (2) or (3), were (3) is an extension to (2)
carrying the context to more operators than just sources. It also seems,
that we all agree, that many-to-one operations void the context.

I still think, that just going with plain (2) is too restrictive -- but
I am also fine if we don't go with the full proposal of (3).

Also note, that the two operators filter() and filterNot() don't modify
the record and thus for both, it would be absolutely valid to keep the
context.

I personally would keep the context for at least all one-to-one
operators. One-to-many is debatable and I am fine to not carry the
context further: at least the offset information is questionable for
this case -- note thought, that semantically, the timestamp is inherited
via one-to-many, and I also think this applies to "topic" and
"partition". Thus, I think it's still valuable information we can carry
downstreams.


-Matthias

Jan: which approach are you referring to as "the approach that is on the

table would be perfect"?

Note that in today's PAPI layer we are already effectively exposing the
record context which has the issues that we have been discussing right
now,
and its semantics is always referring to the "processing record" at

hand.

More specifically, we can think of processing a record a bit different:

1) the record traversed the topology from source to sink, it may be
transformed into new object or even generate multiple new objects

(think:

branch) along the traversal. And the record context is referring to

this

processing record. Here the "lifetime" of the record lasts for the

entire

topology traversal and any new records of this traversal is treated as
different transformed values of this record (this applies to join and
aggregations as well).

2) the record being processed is wiped out in the first operator after
the
source, and NEW records are forwarded to downstream operators. I.e.

each

record only lives between two adjacent operators, once it reached the

new

operator it's lifetime has ended and new records are generated.

I think in the past we have talked about Streams under both context,

and

we
do not have a clear agreement. I agree that 2) is logically more
understandable for users as it does not leak any internal

implementation

details (e.g. for stream-table joins, table record's traversal ends at
the
join operator as it is only be materialized, while stream record's
traversal goes through the join operator to further down until sinks).
However if we are going to interpret following 2) above then even for
non-stateful operators we would not inherit record context. What we're
discussing now, seems to infer a third semantics:

3) a record would traverse "through" one-to-one (non-stateful)

operators,

will "replicate" at one-to-many (non-stateful) operators (think:
"mapValues"
   ) and will "end" at many-to-one (stateful) operators wher

[GitHub] kafka pull request #4249: MINOR: fix typo in ProducerConfig doc

2017-11-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4249


---


[jira] [Created] (KAFKA-6266) Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of __consumer_offsets-xx to log start offset 203569 since the checkpointed offset 120955 is invalid

2017-11-23 Thread VinayKumar (JIRA)
VinayKumar created KAFKA-6266:
-

 Summary: Kafka 1.0.0 : Repeated occurrence of WARN Resetting first 
dirty offset of __consumer_offsets-xx to log start offset 203569 since the 
checkpointed offset 120955 is invalid. (kafka.log.LogCleanerManager$)
 Key: KAFKA-6266
 URL: https://issues.apache.org/jira/browse/KAFKA-6266
 Project: Kafka
  Issue Type: Bug
  Components: offset manager
Affects Versions: 1.0.0
 Environment: CentOS 7, Apache kafka_2.12-1.0.0
Reporter: VinayKumar


I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below 
warnings in the log.
I'm seeing these continuously in the log, and want these to be fixed- so that 
they wont repeat. Can someone please help me in fixing the below warnings.

WARN Resetting first dirty offset of __consumer_offsets-17 to log start offset 
3346 since the checkpointed offset 3332 is invalid. 
(kafka.log.LogCleanerManager$)
WARN Resetting first dirty offset of __consumer_offsets-23 to log start offset 
4 since the checkpointed offset 1 is invalid. (kafka.log.LogCleanerManager$)
WARN Resetting first dirty offset of __consumer_offsets-19 to log start offset 
203569 since the checkpointed offset 120955 is invalid. 
(kafka.log.LogCleanerManager$)
WARN Resetting first dirty offset of __consumer_offsets-35 to log start offset 
16957 since the checkpointed offset 7 is invalid. (kafka.log.LogCleanerManager$)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] kafka pull request #4192: MINOR: Remove unnecessary batch iteration in FileR...

2017-11-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4192


---


[GitHub] kafka pull request #4256: KAFKA-6238: Fix 1.0.0 upgrade instructions relatin...

2017-11-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4256


---


[jira] [Created] (KAFKA-6267) Kafka Producer - initTransaction forever waiting

2017-11-23 Thread Silvio Papa (JIRA)
Silvio Papa created KAFKA-6267:
--

 Summary: Kafka Producer - initTransaction forever waiting
 Key: KAFKA-6267
 URL: https://issues.apache.org/jira/browse/KAFKA-6267
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.11.0.2, 0.11.0.1
Reporter: Silvio Papa
Priority: Blocker
 Attachments: producer.JPG

In code of attached image, the producer remains forever awaiting in 
initTransaction with default configuration of broker



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6268) Tools should now swallow exceptions like resolving network names

2017-11-23 Thread Antony Stubbs (JIRA)
Antony Stubbs created KAFKA-6268:


 Summary: Tools should now swallow exceptions like resolving 
network names
 Key: KAFKA-6268
 URL: https://issues.apache.org/jira/browse/KAFKA-6268
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.11.0.1
Reporter: Antony Stubbs


The cli consumer client shows nothing when it can't resolve a domain. This and 
other errors like it should be shown to the user by default. You have to turn 
on DEBUG level logging in the tools log4j to find there is an error.

{{[2017-11-23 16:40:56,401] DEBUG Error connecting to node 
as-broker-1-eu-west-1b-public:9092 (id: 1 rack: null) 
(org.apache.kafka.clients.NetworkClient)
java.io.IOException: Can't resolve address: as-broker-1-eu-west-1b-public:9092
at org.apache.kafka.common.network.Selector.connect(Selector.java:195)
at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:764)
at 
org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:60)
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:908)
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:819)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:431)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:199)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:223)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:200)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at kafka.consumer.NewShinyConsumer.(BaseConsumer.scala:64)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:72)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Caused by: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:101)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
at org.apache.kafka.common.network.Selector.connect(Selector.java:192)
... 18 more
}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Build failed in Jenkins: kafka-trunk-jdk9 #216

2017-11-23 Thread Apache Jenkins Server
See 


Changes:

[ismael] MINOR: fix typo in ProducerConfig doc

--
[...truncated 1.44 MB...]
kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyDeletionOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyDeletionOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFound STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFound PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAclInheritance STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAclInheritance PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testDistributedConcurrentModificationOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testDistributedConcurrentModificationOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAclManagementAPIs STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAclManagementAPIs PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testWildCardAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testWildCardAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testTopicAcl STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testTopicAcl PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testSuperUserHasAccess STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testSuperUserHasAccess PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testDenyTakesPrecedence STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testDenyTakesPrecedence PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFoundOverride STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFoundOverride PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyModificationOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyModificationOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testLoadCache STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testLoadCache PASSED

kafka.security.auth.OperationTest > testJavaConversions STARTED

kafka.security.auth.OperationTest > testJavaConversions PASSED

kafka.security.auth.ZkAuthorizationTest > testIsZkSecurityEnabled STARTED

kafka.security.auth.ZkAuthorizationTest > testIsZkSecurityEnabled PASSED

kafka.security.auth.ZkAuthorizationTest > testZkUtils STARTED

kafka.security.auth.ZkAuthorizationTest > testZkUtils PASSED

kafka.security.auth.ZkAuthorizationTest > testZkAntiMigration STARTED

kafka.security.auth.ZkAuthorizationTest > testZkAntiMigration PASSED

kafka.security.auth.ZkAuthorizationTest > testZkMigration STARTED

kafka.security.auth.ZkAuthorizationTest > testZkMigration PASSED

kafka.security.auth.ZkAuthorizationTest > testChroot STARTED

kafka.security.auth.ZkAuthorizationTest > testChroot PASSED

kafka.security.auth.ZkAuthorizationTest > testDelete STARTED

kafka.security.auth.ZkAuthorizationTest > testDelete PASSED

kafka.security.auth.ZkAuthorizationTest > testDeleteRecursive STARTED

kafka.security.auth.ZkAuthorizationTest > testDeleteRecursive PASSED

kafka.security.auth.AclTest > testAclJsonConversion STARTED

kafka.security.auth.AclTest > testAclJsonConversion PASSED

kafka.security.auth.ResourceTypeTest > testJavaConversions STARTED

kafka.security.auth.ResourceTypeTest > testJavaConversions PASSED

kafka.security.auth.ResourceTypeTest > testFromString STARTED

kafka.security.auth.ResourceTypeTest > testFromString PASSED

kafka.security.auth.PermissionTypeTest > testJavaConversions STARTED

kafka.security.auth.PermissionTypeTest > testJavaConversions PASSED

kafka.security.auth.PermissionTypeTest > testFromString STARTED

kafka.security.auth.PermissionTypeTest > testFromString PASSED

kafka.KafkaTest > testGetKafkaConfigFromArgsWrongSetValue STARTED

kafka.KafkaTest > testGetKafkaConfigFromArgsWrongSetValue PASSED

kafka.KafkaTest > testKafkaSslPasswords STARTED

kafka.KafkaTest > testKafkaSslPasswords PASSED

kafka.KafkaTest > testGetKafkaConfigFromArgs STARTED

kafka.KafkaTest > testGetKafkaConfigFromArgs PASSED

kafka.KafkaTest > testGetKafkaConfigFromArgsNonArgsAtTheEnd STARTED

kafka.KafkaTest > testGetKafkaConfigFromArgsNonArgsAtTheEnd PASSED

kafka.KafkaTest > testGetKafkaConfigFromArgsNonArgsOnly STARTED

kafka.KafkaTest > testGetKafkaConfigFromArgsNonArgsOnly PASSED

kafka.KafkaTest > testGetKafkaConfigFromArgsNonArgsAtTheBegging STARTED

kafka.KafkaTest > testGetKafkaConfigFromArgsNonArgsAtTheBegging PASSED

kafka.producer.SyncProducerTest > testReachableServer STARTED

kafka.producer.SyncProducerTest > testReachableServer PASSED

kafka.producer.SyncProducerTest > testMessageSizeTooLarge STARTED

kafka.producer.SyncProducerTest > testMessageSizeTooLarge PASSED

kafka.producer.SyncProducerTest > testNotEnoughReplicas STARTED

kafka.producer.SyncProducerTest > testNotEnoughReplicas PASSED

k

[jira] [Created] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-23 Thread Andreas Schroeder (JIRA)
Andreas Schroeder created KAFKA-6269:


 Summary: KTable state restore fails after rebalance
 Key: KAFKA-6269
 URL: https://issues.apache.org/jira/browse/KAFKA-6269
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0
Reporter: Andreas Schroeder


I have the following kafka streams topology:

entity-B -> map step -> entity-B-exists (with state store)
entity-A   -> map step -> entity-A-exists (with state store)

(entity-B-exists, entity-A-exists) -> outer join with state store.

The topology building code looks like this (some data type, serde, valuemapper, 
and joiner code omitted):

def buildTable[V](builder: StreamsBuilder,
  sourceTopic: String,
  existsTopic: String,
  valueSerde: Serde[V],
  valueMapper: ValueMapper[String, V]): KTable[String, 
V] = {

  val stream: KStream[String, String] = builder.stream[String, 
String](sourceTopic)
  val transformed: KStream[String, V] = stream.mapValues(valueMapper)
  transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))

  val inMemoryStoreName = s"$existsTopic-persisted"

  val materialized = 
Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
  .withKeySerde(Serdes.String())
  .withValueSerde(valueSerde)
  .withLoggingDisabled()

  builder.table(existsTopic, materialized)
}


val builder = new StreamsBuilder
val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
(value != null) "" else null

val entitiesB: KTable[String, EntityBInfo] =
  buildTable(builder,
 "entity-B",
 "entity-B-exists",
 EntityBInfoSerde,
 ListingImagesToEntityBInfo)

val entitiesA: KTable[String, String] =
  buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
mapToEmptyString)

val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
EntityDiff.fromJoin(a, b)

val materialized = 
Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
  .withKeySerde(Serdes.String())
  .withValueSerde(EntityDiffSerde)
  .withLoggingEnabled(new java.util.HashMap[String, String]())

val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, joiner, 
materialized)


We run 4 processor machines with 30 stream threads each; each topic has 30 
partitions
so that there is a total of 4 x 30 = 120 partitions to consume. The initial 
launch
of the processor works fine, but when killing one processor and letting
him re-join the stream threads leads to some faulty behaviour.

Fist, the total number of assigned partitions over all processor machines is
larger than 120 (sometimes 157, sometimes just 132), so the partition / task
assignment seems to assign the same job to different stream threads.

The processor machines trying to re-join the consumer group fail constantly 
with the error message of 'Detected a task that got migrated to another thread.'
We gave the processor half an hour to recover; usually, rebuilding the 
KTable states take around 20 seconds (with Kafka 0.11.0.1).

Here are the details of the errors we see:

stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
migrated to another thread. This implies that this thread missed a rebalance 
and dropped out of the consumer group. Trying to rejoin the consumer group now.

org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
entity-B-exists-0 should not change while restoring: old end offset 4750539, 
current offset 4751388
> StreamsTask taskId: 1_0
> > ProcessorTopology:
>   KSTREAM-SOURCE-08:
>   topics: [entity-A-exists]
>   children:   [KTABLE-SOURCE-09]
>   KTABLE-SOURCE-09:
>   states: [entity-A-exists-persisted]
>   children:   [KTABLE-JOINTHIS-11]
>   KTABLE-JOINTHIS-11:
>   states: [entity-B-exists-persisted]
>   children:   [KTABLE-MERGE-10]
>   KTABLE-MERGE-10:
>   states: [entity-A-joined-with-entity-B]
>   KSTREAM-SOURCE-03:
>   topics: [entity-B-exists]
>   children:   [KTABLE-SOURCE-04]
>   KTABLE-SOURCE-04:
>   states: [entity-B-exists-persisted]
>   children:   [KTABLE-JOINOTHER-12]
>   KTABLE-JOINOTHER-12:
>   states: [entity-A-exists-persisted]
>   children:   [KTABLE-MERGE-10]
>   KTABLE-MERGE-10:
>   states: 

[GitHub] kafka pull request #4231: MINOR: Small cleanups/refactoring in kafka.control...

2017-11-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4231


---


Build failed in Jenkins: kafka-trunk-jdk9 #217

2017-11-23 Thread Apache Jenkins Server
See 


Changes:

[ismael] MINOR: Remove unnecessary batch iteration in FileRecords.downConvert

[ismael] KAFKA-6238; Fix 1.0.0 upgrade instructions relating to the message

--
[...truncated 1.44 MB...]
kafka.zk.ZKPathTest > testMakeSurePersistsPathExists STARTED

kafka.zk.ZKPathTest > testMakeSurePersistsPathExists PASSED

kafka.zk.KafkaZkClientTest > testSetGetAndDeletePartitionReassignment STARTED

kafka.zk.KafkaZkClientTest > testSetGetAndDeletePartitionReassignment PASSED

kafka.zk.KafkaZkClientTest > testGetDataAndVersion STARTED

kafka.zk.KafkaZkClientTest > testGetDataAndVersion PASSED

kafka.zk.KafkaZkClientTest > testGetChildren STARTED

kafka.zk.KafkaZkClientTest > testGetChildren PASSED

kafka.zk.KafkaZkClientTest > testSetAndGetConsumerOffset STARTED

kafka.zk.KafkaZkClientTest > testSetAndGetConsumerOffset PASSED

kafka.zk.KafkaZkClientTest > testEntityConfigManagementMethods STARTED

kafka.zk.KafkaZkClientTest > testEntityConfigManagementMethods PASSED

kafka.zk.KafkaZkClientTest > testCreateRecursive STARTED

kafka.zk.KafkaZkClientTest > testCreateRecursive PASSED

kafka.zk.KafkaZkClientTest > testGetConsumerOffsetNoData STARTED

kafka.zk.KafkaZkClientTest > testGetConsumerOffsetNoData PASSED

kafka.zk.KafkaZkClientTest > testDeleteTopicPathMethods STARTED

kafka.zk.KafkaZkClientTest > testDeleteTopicPathMethods PASSED

kafka.zk.KafkaZkClientTest > testAclManagementMethods STARTED

kafka.zk.KafkaZkClientTest > testAclManagementMethods PASSED

kafka.zk.KafkaZkClientTest > testGetDataAndStat STARTED

kafka.zk.KafkaZkClientTest > testGetDataAndStat PASSED

kafka.zk.KafkaZkClientTest > testConditionalUpdatePath STARTED

kafka.zk.KafkaZkClientTest > testConditionalUpdatePath PASSED

kafka.zk.KafkaZkClientTest > testTopicAssignmentMethods STARTED

kafka.zk.KafkaZkClientTest > testTopicAssignmentMethods PASSED

kafka.zk.KafkaZkClientTest > testDeleteRecursive STARTED

kafka.zk.KafkaZkClientTest > testDeleteRecursive PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes PASSED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic STARTED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic PASSED

kafka.metrics.MetricsTest > testMetricsReporterAfterDeletingTopic STARTED

kafka.metrics.MetricsTest > testMetricsReporterAfterDeletingTopic PASSED

kafka.metrics.MetricsTest > 
testBrokerTopicMetricsUnregisteredAfterDeletingTopic STARTED

kafka.metrics.MetricsTest > 
testBrokerTopicMetricsUnregisteredAfterDeletingTopic PASSED

kafka.metrics.MetricsTest > testClusterIdMetric STARTED

kafka.metrics.MetricsTest > testClusterIdMetric PASSED

kafka.metrics.MetricsTest > testControllerMetrics STARTED

kafka.metrics.MetricsTest > testControllerMetrics PASSED

kafka.metrics.MetricsTest > testWindowsStyleTagNames STARTED

kafka.metrics.MetricsTest > testWindowsStyleTagNames PASSED

kafka.metrics.MetricsTest > testMetricsLeak STARTED

kafka.metrics.MetricsTest > testMetricsLeak PASSED

kafka.metrics.MetricsTest > testBrokerTopicMetricsBytesInOut STARTED

kafka.metrics.MetricsTest > testBrokerTopicMetricsBytesInOut PASSED

kafka.metrics.KafkaTimerTest > testKafkaTimer STARTED

kafka.metrics.KafkaTimerTest > testKafkaTimer PASSED

kafka.security.auth.ResourceTypeTest > testJavaConversions STARTED

kafka.security.auth.ResourceTypeTest > testJavaConversions PASSED

kafka.security.auth.ResourceTypeTest > testFromString STARTED

kafka.security.auth.ResourceTypeTest > testFromString PASSED

kafka.security.auth.OperationTest > testJavaConversions STARTED

kafka.security.auth.OperationTest > testJavaConversions PASSED

kafka.security.auth.ZkAuthorizationTest > testIsZkSecurityEnabled STARTED

kafka.security.auth.ZkAuthorizationTest > testIsZkSecurityEnabled PASSED

kafka.security.auth.ZkAuthorizatio

Kafka ERROR after upgrade to Kafka 1.0.0 version : java.lang.OutOfMemoryError: Java heap space

2017-11-23 Thread Vinay Kumar
Hi,

I upgraded Kafka from 0.10.2.1 to 1.0.0 version. And only from then, I'm
seeing the kafka service going down because of below issue:

ERROR [KafkaApi-1] Error when handling request ; (kafka.server.KafkaApis)
{replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=
,partitions=[{partition=1,fetch_offset=847134777,max_bytes=1048576}]}]}
java.lang.OutOfMemoryError: Java heap space

Even after increasing the HEAP size (KAFKA_HEAP_OPTS) to 5G, still the same
error is coming (below is the error log)

Can you please help on this issue. The no. of partitions used is 3, and the
replication factor for the topic is 2.



[2017-11-17 10:53:32,178] ERROR [KafkaApi-0] Error when handling request
{replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=
,partitions=[{partition=2,fetch_offset=776723939,max_bytes=1048576}]}]}
(kafka.server.KafkaApis)

java.lang.OutOfMemoryError: Java heap space

at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)

at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)

at org.apache.kafka.common.record.AbstractRecords.downConvert(A
bstractRecords.java:101)

at org.apache.kafka.common.record.FileRecords.downConvert(FileR
ecords.java:253)

at kafka.server.KafkaApis.$anonfun$handleFetchRequest$4(KafkaAp
is.scala:520)

at kafka.server.KafkaApis.$anonfun$handleFetchRequest$4$adapted
(KafkaApis.scala:518)

at kafka.server.KafkaApis$$Lambda$668/2135785513.apply(Unknown
Source)

at scala.Option.map(Option.scala:146)

at kafka.server.KafkaApis.$anonfun$handleFetchRequest$3(KafkaAp
is.scala:518)

at kafka.server.KafkaApis.$anonfun$handleFetchRequest$3$adapted
(KafkaApis.scala:508)

at kafka.server.KafkaApis$$Lambda$667/987137379.apply(Unknown
Source)

at scala.Option.flatMap(Option.scala:171)

at kafka.server.KafkaApis.convertedPartitionData$1(KafkaApis.sc
ala:508)

at kafka.server.KafkaApis.$anonfun$handleFetchRequest$12(KafkaA
pis.scala:556)

at kafka.server.KafkaApis$$Lambda$664/79178972.apply(Unknown Source)

at scala.collection.Iterator.foreach(Iterator.scala:929)

at scala.collection.Iterator.foreach$(Iterator.scala:929)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)

at scala.collection.IterableLike.foreach(IterableLike.scala:71)

at scala.collection.IterableLike.foreach$(IterableLike.scala:70)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at kafka.server.KafkaApis.createResponse$2(KafkaApis.scala:555)

at kafka.server.KafkaApis.$anonfun$handleFetchRequest$14(KafkaA
pis.scala:569)

at kafka.server.KafkaApis.$anonfun$handleFetchRequest$14$adapte
d(KafkaApis.scala:569)

at kafka.server.KafkaApis$$Lambda$663/1244681576.apply(Unknown
Source)

at kafka.server.KafkaApis.$anonfun$sendResponseMaybeThrottle$1(
KafkaApis.scala:2034)

at kafka.server.KafkaApis$$Lambda$480/119699941.apply$mcVI$sp(Unknown
Source)

at kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottl
e(ClientRequestQuotaManager.scala:52)

at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.s
cala:2034)

at kafka.server.KafkaApis.fetchResponseCallback$1(KafkaApis.sca
la:569)

at kafka.server.KafkaApis.$anonfun$handleFetchRequest$15(KafkaA
pis.scala:588)

at kafka.server.KafkaApis$$Lambda$662/1703788464.apply$mcVI$sp(Unknown
Source)


Thanks,

Vinay Kumar


[GitHub] kafka pull request #4260: KAFKA-5647: Use KafkaZkClient in ReassignPartition...

2017-11-23 Thread omkreddy
GitHub user omkreddy opened a pull request:

https://github.com/apache/kafka/pull/4260

KAFKA-5647: Use KafkaZkClient in ReassignPartitionsCommand and 
PreferredReplicaLeaderElectionCommand

*  Use KafkaZkClient in ReassignPartitionsCommand
*  Use KafkaZkClient in PreferredReplicaLeaderElectionCommand
*  Updated test classes to use new methods
*  All existing tests should pass

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/omkreddy/kafka KAFKA-5647-ADMINCOMMANDS

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4260.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4260


commit b0dcf9fde0754f17e576186c47889b621b3b9769
Author: Manikumar Reddy 
Date:   2017-11-17T15:38:52Z

KAFKA-5647: Use KafkaZkClient in ReassignPartitionsCommand and 
PreferredReplicaLeaderElectionCommand




---


Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-11-23 Thread Jun Rao
Yes, caching the log segment position after the index lookup may work. One
subtle issue is that for a compacted topic, the underlying log segment may
have changed between two consecutive fetch requests, and we need to think
through the impact of that.

Thanks,

Jun

On Wed, Nov 22, 2017 at 7:54 PM, Colin McCabe  wrote:

> Oh, I see the issue now.  The broker uses sendfile() and sends some
> message data without knowing what the ending offset is.  To learn that, we
> would need another index access.
>
> However, when we do that index->offset lookup, we know that the next
> offset->index lookup (done in the following fetch request) will be for the
> same offset.  So we should be able to cache the result (the index).  Also:
> Does the operating system’s page cache help us here?
>
> Best,
> Colin
>
> On Wed, Nov 22, 2017, at 16:53, Jun Rao wrote:
> > Hi, Colin,
> >
> > After step 3a, do we need to update the cached offset in the leader to be
> > the last offset in the data returned in the fetch response? If so, we
> > need
> > another offset index lookup since the leader only knows that it gives out
> > X
> > bytes in the fetch response, but not the last offset in those X bytes.
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Nov 22, 2017 at 4:01 PM, Colin McCabe 
> wrote:
> >
> > > On Wed, Nov 22, 2017, at 14:09, Jun Rao wrote:
> > > > Hi, Colin,
> > > >
> > > > When fetching data for a partition, the leader needs to translate the
> > > > fetch offset to a position in a log segment with an index lookup. If
> the
> > > fetch
> > > > request now also needs to cache the offset for the next fetch
> request,
> > > > there will be an extra offset index lookup.
> > >
> > > Hmm.  So the way I was thinking about it was, with an incremental fetch
> > > request, for each partition:
> > >
> > > 1a. the leader consults its cache to find the offset it needs to use
> for
> > > the fetch request
> > > 2a. the leader performs a lookup to translate the offset to a file
> index
> > > 3a. the leader reads the data from the file
> > >
> > > In contrast, with a full fetch request, for each partition:
> > >
> > > 1b. the leader looks at the FetchRequest to find the offset it needs to
> > > use for the fetch request
> > > 2b. the leader performs a lookup to translate the offset to a file
> index
> > > 3b. the leader reads the data from the file
> > >
> > > It seems like there is only one offset index lookup in both cases?  The
> > > key point is that the cache in step #1a is not stored on disk.  Or
> maybe
> > > I'm missing something here.
> > >
> > > best,
> > > Colin
> > >
> > >
> > > > The offset index lookup can
> > > > potentially be expensive since it could require disk I/Os. One way to
> > > > optimize this a bit is to further cache the log segment position for
> the
> > > > next offset. The tricky issue is that for a compacted topic, the
> > > > underlying
> > > > log segment could have changed between two consecutive fetch
> requests. We
> > > > could potentially make that case work, but the logic will be more
> > > > complicated.
> > > >
> > > > Another thing is that it seems that the proposal only saves the
> metadata
> > > > overhead if there are low volume topics. If we use Jay's suggestion
> of
> > > > including 0 partitions in subsequent fetch requests, it seems that we
> > > > could
> > > > get the metadata saving even if all topics have continuous traffic.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Wed, Nov 22, 2017 at 1:14 PM, Colin McCabe 
> > > wrote:
> > > >
> > > > > On Tue, Nov 21, 2017, at 22:11, Jun Rao wrote:
> > > > > > Hi, Jay,
> > > > > >
> > > > > > I guess in your proposal the leader has to cache the last offset
> > > given
> > > > > > back for each partition so that it knows from which offset to
> serve
> > > the
> > > > > next
> > > > > > fetch request.
> > > > >
> > > > > Hi Jun,
> > > > >
> > > > > Just to clarify, the leader has to cache the last offset for each
> > > > > follower / UUID in the original KIP-227 proposal as well.  Sorry if
> > > that
> > > > > wasn't clear.
> > > > >
> > > > > > This is doable but it means that the leader needs to do an
> > > > > > additional index lookup per partition to serve a fetch request.
> Not
> > > sure
> > > > > > if the benefit from the lighter fetch request obviously offsets
> the
> > > > > > additional index lookup though.
> > > > >
> > > > > The runtime impact should be a small constant factor at most,
> right?
> > > > > You would just have a mapping between UUID and the latest offset in
> > > each
> > > > > partition data structure.  It seems like the runtime impact of
> looking
> > > > > up the fetch offset in a hash table (or small array) in the
> in-memory
> > > > > partition data structure should be very similar to the runtime
> impact
> > > of
> > > > > looking up the fetch offset in the FetchRequest.
> > > > >
> > > > > The extra memory consumption per partition is O(num_brokers),
> which is
> > > > > essentially a small 

Build failed in Jenkins: kafka-trunk-jdk8 #2236

2017-11-23 Thread Apache Jenkins Server
See 


Changes:

[ismael] MINOR: Remove unnecessary batch iteration in FileRecords.downConvert

[ismael] KAFKA-6238; Fix 1.0.0 upgrade instructions relating to the message

[ismael] MINOR: Small cleanups/refactoring in kafka.controller

--
[...truncated 401.99 KB...]

kafka.security.auth.ResourceTypeTest > testJavaConversions STARTED

kafka.security.auth.ResourceTypeTest > testJavaConversions PASSED

kafka.security.auth.ResourceTypeTest > testFromString STARTED

kafka.security.auth.ResourceTypeTest > testFromString PASSED

kafka.security.auth.OperationTest > testJavaConversions STARTED

kafka.security.auth.OperationTest > testJavaConversions PASSED

kafka.security.auth.AclTest > testAclJsonConversion STARTED

kafka.security.auth.AclTest > testAclJsonConversion PASSED

kafka.security.auth.ZkAuthorizationTest > testIsZkSecurityEnabled STARTED

kafka.security.auth.ZkAuthorizationTest > testIsZkSecurityEnabled PASSED

kafka.security.auth.ZkAuthorizationTest > testZkUtils STARTED

kafka.security.auth.ZkAuthorizationTest > testZkUtils PASSED

kafka.security.auth.ZkAuthorizationTest > testZkAntiMigration STARTED

kafka.security.auth.ZkAuthorizationTest > testZkAntiMigration PASSED

kafka.security.auth.ZkAuthorizationTest > testZkMigration STARTED

kafka.security.auth.ZkAuthorizationTest > testZkMigration PASSED

kafka.security.auth.ZkAuthorizationTest > testChroot STARTED

kafka.security.auth.ZkAuthorizationTest > testChroot PASSED

kafka.security.auth.ZkAuthorizationTest > testDelete STARTED

kafka.security.auth.ZkAuthorizationTest > testDelete PASSED

kafka.security.auth.ZkAuthorizationTest > testDeleteRecursive STARTED

kafka.security.auth.ZkAuthorizationTest > testDeleteRecursive PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAllowAllAccess STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAllowAllAccess PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testLocalConcurrentModificationOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testLocalConcurrentModificationOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyDeletionOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyDeletionOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFound STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFound PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAclInheritance STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAclInheritance PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testDistributedConcurrentModificationOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testDistributedConcurrentModificationOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAclManagementAPIs STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAclManagementAPIs PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testWildCardAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testWildCardAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testTopicAcl STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testTopicAcl PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testSuperUserHasAccess STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testSuperUserHasAccess PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testDenyTakesPrecedence STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testDenyTakesPrecedence PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFoundOverride STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFoundOverride PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyModificationOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyModificationOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testLoadCache STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testLoadCache PASSED

kafka.integration.PrimitiveApiTest > testMultiProduce STARTED

kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch STARTED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize 
STARTED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests STARTED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch STARTED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression STARTED

kafka.integration.PrimitiveApiTest > 
t

Build failed in Jenkins: kafka-1.0-jdk7 #92

2017-11-23 Thread Apache Jenkins Server
See 


Changes:

[ismael] MINOR: fix typo in ProducerConfig doc

--
[...truncated 196.78 KB...]
kafka.api.SaslSslAdminClientIntegrationTest > testAclOperations2 PASSED

kafka.api.SaslSslAdminClientIntegrationTest > testDescribeReplicaLogDirs STARTED

kafka.api.SaslSslAdminClientIntegrationTest > testDescribeReplicaLogDirs PASSED

kafka.api.SaslSslAdminClientIntegrationTest > testInvalidAlterConfigs STARTED

kafka.api.SaslSslAdminClientIntegrationTest > testInvalidAlterConfigs PASSED

kafka.api.SaslSslAdminClientIntegrationTest > testClose STARTED

kafka.api.SaslSslAdminClientIntegrationTest > testClose PASSED

kafka.api.SaslSslAdminClientIntegrationTest > testMinimumRequestTimeouts STARTED

kafka.api.SaslSslAdminClientIntegrationTest > testMinimumRequestTimeouts PASSED

kafka.api.SaslSslAdminClientIntegrationTest > testForceClose STARTED

kafka.api.SaslSslAdminClientIntegrationTest > testForceClose PASSED

kafka.api.SaslSslAdminClientIntegrationTest > testListNodes STARTED

kafka.api.SaslSslAdminClientIntegrationTest > testListNodes PASSED

kafka.api.SaslSslAdminClientIntegrationTest > testDelayedClose STARTED

kafka.api.SaslSslAdminClientIntegrationTest > testDelayedClose PASSED

kafka.api.SaslSslAdminClientIntegrationTest > testCreateDeleteTopics STARTED

kafka.api.SaslSslAdminClientIntegrationTest > testCreateDeleteTopics PASSED

kafka.api.SaslSslAdminClientIntegrationTest > 
testAlterReplicaLogDirsBeforeTopicCreation STARTED

kafka.api.SaslSslAdminClientIntegrationTest > 
testAlterReplicaLogDirsBeforeTopicCreation PASSED

kafka.api.SaslSslAdminClientIntegrationTest > testDescribeLogDirs STARTED

kafka.api.SaslSslAdminClientIntegrationTest > testDescribeLogDirs PASSED

kafka.api.SaslSslAdminClientIntegrationTest > testDescribeCluster STARTED

kafka.api.SaslSslAdminClientIntegrationTest > testDescribeCluster PASSED

kafka.api.SaslSslAdminClientIntegrationTest > testCreatePartitions STARTED

kafka.api.SaslSslAdminClientIntegrationTest > testCreatePartitions PASSED

kafka.api.SaslSslAdminClientIntegrationTest > testDescribeNonExistingTopic 
STARTED

kafka.api.SaslSslAdminClientIntegrationTest > testDescribeNonExistingTopic 
PASSED

kafka.api.SaslSslAdminClientIntegrationTest > testDescribeAndAlterConfigs 
STARTED

kafka.api.SaslSslAdminClientIntegrationTest > testDescribeAndAlterConfigs PASSED

kafka.api.SaslSslAdminClientIntegrationTest > testCallInFlightTimeouts STARTED

kafka.api.SaslSslAdminClientIntegrationTest > testCallInFlightTimeouts PASSED

kafka.api.ConsumerBounceTest > testCloseDuringRebalance STARTED

kafka.api.ConsumerBounceTest > testCloseDuringRebalance PASSED

kafka.api.ConsumerBounceTest > testClose STARTED

kafka.api.ConsumerBounceTest > testClose PASSED

kafka.api.ConsumerBounceTest > testSeekAndCommitWithBrokerFailures STARTED

kafka.api.ConsumerBounceTest > testSeekAndCommitWithBrokerFailures PASSED

kafka.api.ConsumerBounceTest > testSubscribeWhenTopicUnavailable STARTED

kafka.api.ConsumerBounceTest > testSubscribeWhenTopicUnavailable PASSED

kafka.api.ConsumerBounceTest > testConsumptionWithBrokerFailures STARTED

kafka.api.ConsumerBounceTest > testConsumptionWithBrokerFailures SKIPPED

kafka.api.ApiUtilsTest > testShortStringNonASCII STARTED

kafka.api.ApiUtilsTest > testShortStringNonASCII PASSED

kafka.api.ApiUtilsTest > testShortStringASCII STARTED

kafka.api.ApiUtilsTest > testShortStringASCII PASSED

kafka.api.SaslPlainPlaintextConsumerTest > testZkAclsDisabled STARTED

kafka.api.SaslPlainPlaintextConsumerTest > testZkAclsDisabled PASSED

kafka.api.SaslPlainPlaintextConsumerTest > testCoordinatorFailover STARTED

kafka.api.SaslPlainPlaintextConsumerTest > testCoordinatorFailover PASSED

kafka.api.SaslPlainPlaintextConsumerTest > testSimpleConsumption STARTED
ERROR: Could not install GRADLE_3_4_RC_2_HOME
java.lang.NullPointerException
at 
hudson.plugins.toolenv.ToolEnvBuildWrapper$1.buildEnvVars(ToolEnvBuildWrapper.java:46)
at hudson.model.AbstractBuild.getEnvironment(AbstractBuild.java:887)
at hudson.plugins.git.GitSCM.getParamExpandedRepos(GitSCM.java:421)
at 
hudson.plugins.git.GitSCM.compareRemoteRevisionWithImpl(GitSCM.java:629)
at hudson.plugins.git.GitSCM.compareRemoteRevisionWith(GitSCM.java:594)
at hudson.scm.SCM.compareRemoteRevisionWith(SCM.java:391)
at hudson.scm.SCM.poll(SCM.java:408)
at hudson.model.AbstractProject._poll(AbstractProject.java:1394)
at hudson.model.AbstractProject.poll(AbstractProject.java:1297)
at hudson.triggers.SCMTrigger$Runner.runPolling(SCMTrigger.java:594)
at hudson.triggers.SCMTrigger$Runner.run(SCMTrigger.java:640)
at 
hudson.util.SequentialExecutionQueue$QueueEntry.run(SequentialExecutionQueue.java:119)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask

Jenkins build is back to normal : kafka-trunk-jdk9 #218

2017-11-23 Thread Apache Jenkins Server
See 




Jenkins build is back to normal : kafka-1.0-jdk7 #93

2017-11-23 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-11-23 Thread Becket Qin
Hi Ismael,

Yes, you are right. The metadata may not help for multiple fetch thread or
the consumer case. Session based approach is probably better in this case.

The optimization of only returning data at the offset index entry boundary
may still be worth considering. It also helps improve the index lookup in
general.

@Jun,
Good point of log compacted topics. Perhaps we can make sure the read will
always be operated on the original segment file even if a compacted log
segment is swapped in. Combining this with the above solution which always
returns the data at the index boundary when possible, it seems we can avoid
the additional look up safely.

Thanks,

Jiangjie (Becket) Qin


On Thu, Nov 23, 2017 at 9:31 AM, Jun Rao  wrote:

> Yes, caching the log segment position after the index lookup may work. One
> subtle issue is that for a compacted topic, the underlying log segment may
> have changed between two consecutive fetch requests, and we need to think
> through the impact of that.
>
> Thanks,
>
> Jun
>
> On Wed, Nov 22, 2017 at 7:54 PM, Colin McCabe  wrote:
>
> > Oh, I see the issue now.  The broker uses sendfile() and sends some
> > message data without knowing what the ending offset is.  To learn that,
> we
> > would need another index access.
> >
> > However, when we do that index->offset lookup, we know that the next
> > offset->index lookup (done in the following fetch request) will be for
> the
> > same offset.  So we should be able to cache the result (the index).
> Also:
> > Does the operating system’s page cache help us here?
> >
> > Best,
> > Colin
> >
> > On Wed, Nov 22, 2017, at 16:53, Jun Rao wrote:
> > > Hi, Colin,
> > >
> > > After step 3a, do we need to update the cached offset in the leader to
> be
> > > the last offset in the data returned in the fetch response? If so, we
> > > need
> > > another offset index lookup since the leader only knows that it gives
> out
> > > X
> > > bytes in the fetch response, but not the last offset in those X bytes.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Wed, Nov 22, 2017 at 4:01 PM, Colin McCabe 
> > wrote:
> > >
> > > > On Wed, Nov 22, 2017, at 14:09, Jun Rao wrote:
> > > > > Hi, Colin,
> > > > >
> > > > > When fetching data for a partition, the leader needs to translate
> the
> > > > > fetch offset to a position in a log segment with an index lookup.
> If
> > the
> > > > fetch
> > > > > request now also needs to cache the offset for the next fetch
> > request,
> > > > > there will be an extra offset index lookup.
> > > >
> > > > Hmm.  So the way I was thinking about it was, with an incremental
> fetch
> > > > request, for each partition:
> > > >
> > > > 1a. the leader consults its cache to find the offset it needs to use
> > for
> > > > the fetch request
> > > > 2a. the leader performs a lookup to translate the offset to a file
> > index
> > > > 3a. the leader reads the data from the file
> > > >
> > > > In contrast, with a full fetch request, for each partition:
> > > >
> > > > 1b. the leader looks at the FetchRequest to find the offset it needs
> to
> > > > use for the fetch request
> > > > 2b. the leader performs a lookup to translate the offset to a file
> > index
> > > > 3b. the leader reads the data from the file
> > > >
> > > > It seems like there is only one offset index lookup in both cases?
> The
> > > > key point is that the cache in step #1a is not stored on disk.  Or
> > maybe
> > > > I'm missing something here.
> > > >
> > > > best,
> > > > Colin
> > > >
> > > >
> > > > > The offset index lookup can
> > > > > potentially be expensive since it could require disk I/Os. One way
> to
> > > > > optimize this a bit is to further cache the log segment position
> for
> > the
> > > > > next offset. The tricky issue is that for a compacted topic, the
> > > > > underlying
> > > > > log segment could have changed between two consecutive fetch
> > requests. We
> > > > > could potentially make that case work, but the logic will be more
> > > > > complicated.
> > > > >
> > > > > Another thing is that it seems that the proposal only saves the
> > metadata
> > > > > overhead if there are low volume topics. If we use Jay's suggestion
> > of
> > > > > including 0 partitions in subsequent fetch requests, it seems that
> we
> > > > > could
> > > > > get the metadata saving even if all topics have continuous traffic.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Wed, Nov 22, 2017 at 1:14 PM, Colin McCabe 
> > > > wrote:
> > > > >
> > > > > > On Tue, Nov 21, 2017, at 22:11, Jun Rao wrote:
> > > > > > > Hi, Jay,
> > > > > > >
> > > > > > > I guess in your proposal the leader has to cache the last
> offset
> > > > given
> > > > > > > back for each partition so that it knows from which offset to
> > serve
> > > > the
> > > > > > next
> > > > > > > fetch request.
> > > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > Just to clarify, the leader has to cache the last offset for each
> > > > > > follower / UUID in

Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-11-23 Thread Dong Lin
Hey Colin,

Thanks for the KIP! This is definitely useful when there are many idle
partitions in the clusters.

Just in case it is useful, I will provide some number here. We observe that
for a clsuter that have around 2.5k partitions per broker, the
ProduceRequestTotal time average value is around 25 ms. For a cluster with
2.5k partitions per broker whose AllTopicsBytesInRate is only around 6
MB/s, the ProduceRequestTotalTime average value is around 180 ms, most of
which is spent on ProduceRequestRemoteTime. The increased
ProduceRequestTotalTime significantly reduces throughput of producers with
ack=all. I think this KIP can help address this problem.

Here are some of my ideas on the current KIP:

- The KIP says that the follower will include a partition in
the IncrementalFetchRequest if the LEO of the partition has been updated.
It seems that doing so may prevent leader from knowing information (e.g.
LogStartOffset) of the follower that will otherwise be included in the
FetchRequest. Maybe we should have a paragraph to explicitly define the
full criteria of when the fetcher should include a partition in the
FetchResponse and probably include logStartOffset as part of the criteria?

- It seems that every time the set of partitions in the
ReplicaFetcherThread is changed, or if follower restarts, a new UUID will
be generated in the leader and leader will add a new entry in the in-memory
map to map the UUID to list of partitions (and other metadata such as fetch
offset). This map with grow over time depending depending on the frequency
of events such as partition movement or broker restart. As you mentioned,
we probably need to timeout entries in this map. But there is also tradeoff
in this timeout -- large timeout increase memory usage whereas smaller
timeout increases frequency of the full FetchRequest. Could you specify the
default value of this timeout and probably also explain how it affects the
performance of this KIP? Also, do you think we can avoid having duplicate
entries from the same ReplicaFetcher (in case of partition set change) by
using brokerId+fetcherThreadIndex as the UUID?

I agree with the previous comments that 1) ideally we want to evolve the
existing existing FetchRequest instead of adding a new request type; and 2)
KIP hopefully can also apply to replication service such as e.g.
MirrorMaker. In addition, ideally we probably want to implement the new
logic in a separate class without having to modify the existing class (e.g.
Log, LogManager) so that the implementation and design can be simpler going
forward. Motivated by these concepts, I am wondering if the following
alternative design may be worth thinking.

Here are the details of a potentially feasible alternative approach.

*Protocol change: *

- We add a fetcherId of string type in the FetchRequest. This fetcherId is
similarly to UUID and helps leader correlate the fetcher (i.e.
ReplicaFetcherThread or MM consumer) with the state of the fetcher. This
fetcherId is determined by the fetcher. For most consumers this fetcherId
is null. For ReplicaFetcherThread this fetcherId = brokerId + threadIndex.
For MM this is groupId+someIndex.

*Proposed change in leader broker:*

- A new class FetcherHandler will be used in the leader to map the
fetcherId to state of the fetcher. The state of the fetcher is a list of
FETCH_REQUEST_PARTITION_V0 for selected partitions.

- After leader receives a FetchRequest, it first transforms the
FetchRequest by doing request = FetcherHandler.addPartition(request) before
giving this partition to KafkaApis.handle(request). If the fetcherId in
this request is null, this method does not make any change. Otherwise, it
takes the list of FETCH_REQUEST_PARTITION_V0 associated with this fetcherId
and append it to the given request. The state of a new non-null fetcherId
is an empty list.

- The KafkaApis.handle(request) will process the request and generate a
response. All existing logic in ReplicaManager, LogManager and so on does
not need to be changed.

- The leader calls response = FetcherHandler.removePartition(response)
before sending the response back to the fetcher.
FetcherHandler.removePartition(response)
enumerates all partitions in the response. If a partition is "empty" (e.g.
no records to be sent), this partition and its FETCH_REQUEST_PARTITION_V0
in the original FetchRequest is added to the state  of this fetcherId; and
this partition is removed from the response. If the partition is not
"empty", the partition is remove from the state of this fetcherId.

*Proposed change in the ReplicaFetcherThread:*

- In addition the set of assigned partitions, the ReplicaFetcherThreads
also keeps track of the subset of assigned partitions which are non-empty
in the last FetchResponse. The is initialized to be the set of assigned
partitions. Then it is updated every time a FetchResponse is received. The
FetchResponse constructed by ReplicaFetcherThread includes exactly this
subset of assigned partition.

Here is how

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-11-23 Thread Matthias J. Sax
From a DSL point of view, users only see the new value on a
KTable#filter anyway. So why should it be an issue that we use
 pair under the hood?

User sees newValue and gets corresponding RecordContext. I can't see any
issue here?

I cannot follow here:

>> Even when we have a statefull operation last. We move it to the very
>> first processor (KtableSource)
>> and therefore cant present a proper RecordContext.



With regard to `builder.table().filter()`:

I see you point that it would be good to be able to apply the filter()
first to reduce the stat store size of the table. But how is this
related to KIP-159?

Btw: with KIP-182, I am wondering if this would not be possible, by
putting a custom dummy store into the table and materialize the filter
result afterwards? It's not a nice way to do, but seems to be possible.


-Matthias

On 11/23/17 4:56 AM, Jan Filipiak wrote:
> The comment is valid. It falls exactly into this topic, it has exactly
> todo with this!
> Even when we have a statefull operation last. We move it to the very
> first processor (KtableSource)
> and therefore cant present a proper RecordContext.
> 
> Regarding the other Jiras you are referring to. They harm the project
> more than they do good!
> There is no need for this kind of optimizer and meta representation and
> what not. I hope they
> never get implemented.
> 
> Best Jan
> 
> 
> On 22.11.2017 14:44, Damian Guy wrote:
>> Jan, i think you comment with respect to filtering is valid, though
>> not for
>> this KIP. We have separate JIRAs for topology optimization of which this
>> falls into.
>>
>> Thanks,
>> Damian
>>
>> On Wed, 22 Nov 2017 at 02:25 Guozhang Wang  wrote:
>>
>>> Jan,
>>>
>>> Not sure I understand your argument that "we still going to present
>>> change.oldValue to the filter even though the record context() is for
>>> change.newValue". Are you referring to `KTableFilter#process()`? If yes
>>> could you point to me which LOC are you concerning about?
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Mon, Nov 20, 2017 at 9:29 PM, Jan Filipiak 
>>> wrote:
>>>
 a remark of mine that got missed during migration:

 There is this problem that even though we have source.table.filter.join
 the state-fullness happens at the table step not a the join step. In a
 filter
 we still going to present change.oldValue to the filter even though the
 record context() is for change.newValue. I would go as far as applying
 the filter before the table processor. Not to just get KIP-159, but
>>> because
 I think its a side effect of a non ideal topology layout. If i can
 filter
 99% of my
 records. my state could be way smaller. Also widely escalates the
 context
 of the KIP

 I can only see upsides of executing the filter first.

 Best Jan



 On 20.11.2017 22:22, Matthias J. Sax wrote:

> I am moving this back to the DISCUSS thread... Last 10 emails were
> sent
> to VOTE thread.
>
> Copying Guozhang's last summary below. Thanks for this summary. Very
> comprehensive!
>
> It seems, we all agree, that the current implementation of the context
> at PAPI level is ok, but we should not leak it into DSL.
>
> Thus, we can go with (2) or (3), were (3) is an extension to (2)
> carrying the context to more operators than just sources. It also
> seems,
> that we all agree, that many-to-one operations void the context.
>
> I still think, that just going with plain (2) is too restrictive --
> but
> I am also fine if we don't go with the full proposal of (3).
>
> Also note, that the two operators filter() and filterNot() don't
> modify
> the record and thus for both, it would be absolutely valid to keep the
> context.
>
> I personally would keep the context for at least all one-to-one
> operators. One-to-many is debatable and I am fine to not carry the
> context further: at least the offset information is questionable for
> this case -- note thought, that semantically, the timestamp is
> inherited
> via one-to-many, and I also think this applies to "topic" and
> "partition". Thus, I think it's still valuable information we can
> carry
> downstreams.
>
>
> -Matthias
>
> Jan: which approach are you referring to as "the approach that is
> on the
>> table would be perfect"?
>>
>> Note that in today's PAPI layer we are already effectively
>> exposing the
>> record context which has the issues that we have been discussing
>> right
>> now,
>> and its semantics is always referring to the "processing record" at
>>> hand.
>> More specifically, we can think of processing a record a bit
>> different:
>>
>> 1) the record traversed the topology from source to sink, it may be
>> transformed into new object or even generate multiple new objects
>>> (think:
>> branch) along the travers