[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-02 Thread Yuto Kawamura (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313577#comment-15313577
 ] 

Yuto Kawamura commented on KAFKA-3775:
--

Thanks for feedback [~mjsax] .

> 1) a KStreams application should process the whole topic and not parts of it 
> – limiting the number of partitions is kinda artificial from my point of view

So the question is what "KStreams application" consists of. I know that Kafka 
Streams is designed to work evenly with standalone but the main purpose of 
making it able to work as standalone is about easy development and testing 
IIUC. Practially, if we try to run it with the production traffic which 
consists of hundreads of partitions, it is practially impossible to assign all 
partitions to a single instance transparently. Indeed restricting the maximum 
number of partition per instance is an artificial control but that should be 
given as Kafka Streams is not an execution framework as I said. Users have 
almost full control of how to construct the Kafka Streams app cluster, that is, 
it should be allowed to run instances gradually one by one instead of starting 
necessary number of instances at once, but it's impossible with the existing 
impl by the reason I described.

> 2) even if we limit the number of partitions, it is quite random which would 
> get processed which not – I would assume that users would like to have a more 
> transparent assignment

I think Kafka Streams partition assignment already isn't transparent. Unless 
the sticky partition assignment strategy enabled, StreamPartitionAssignor 
chooses which task(partition) assigned to which instance in round robin with 
intorducing randomness. That is, we have no control of which partition assigned 
to which instance by nature.
At least you can ensure that all partitions are being assigned if you start 
instances more than {{partitions / `max.assigned.tasks`}}, and also it's remain 
possible to not take this option by leaving the configuration with default 
value(Interger.MAX_VALUE) which guarantees that single instance still accepts 
all tasks(partitions) assigned.

> 3) last but not least, under the hood we are using the standard Java 
> KafkaConsumer: looking at your patch (just briefly), it seems you changed the 
> task assignment – however, this is independent from the partitions assignment 
> of the used consumer – thus, the consumer would still poll all partitions but 
> would not be able to assign records for some partitions as the corresponding 
> tasks are missing.

Hmm, not sure if I'm understanding your explanation correctly but this sounds 
different from what I know.
First, KafkaStreams is providing custom PartitionAssignor; 
StreamPartitionAssignor which takes full control of which partition to assign 
which consumer thread of which instance.
Second, the consuemr polls only partitions which it gets assigned by group 
coordinator that relies on PartitionAssignor to decide the actual assignment. 
So that is, an instance will never get a record from the partition which isn't 
being assigned to it, therefore what you've concerned will never happend IIUC.
Am I misunderstand something?


> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but 

[jira] [Resolved] (KAFKA-2305) Cluster Monitoring and Management UI

2016-06-02 Thread Soumyajit Sahu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Soumyajit Sahu resolved KAFKA-2305.
---
Resolution: Invalid

> Cluster Monitoring and Management UI
> 
>
> Key: KAFKA-2305
> URL: https://issues.apache.org/jira/browse/KAFKA-2305
> Project: Kafka
>  Issue Type: Wish
>  Components: admin, replication
>Reporter: Soumyajit Sahu
>Assignee: Soumyajit Sahu
>Priority: Minor
>
> At present, we don't have a Admin and Monitoring UI for Kafka cluster.
> We need a view from the perspective of the machines in the cluster.
> Following issues need to be addressed using a UI:
> 1) Resource usage and Kafka statistics by machine.
> 2) View of the partition and replication layout by machine.
> 3) View of spindle usage (or different log directories usage) pattern by 
> machine.
> 4) Ability to move replicas among brokers using the UI and by leveraging the 
> Reassign Partitions Tool.
> More details in the doc in the External Issue Url field.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-2305) Cluster Monitoring and Management UI

2016-06-02 Thread Soumyajit Sahu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Soumyajit Sahu reassigned KAFKA-2305:
-

Assignee: Soumyajit Sahu  (was: Neha Narkhede)

> Cluster Monitoring and Management UI
> 
>
> Key: KAFKA-2305
> URL: https://issues.apache.org/jira/browse/KAFKA-2305
> Project: Kafka
>  Issue Type: Wish
>  Components: admin, replication
>Reporter: Soumyajit Sahu
>Assignee: Soumyajit Sahu
>Priority: Minor
>
> At present, we don't have a Admin and Monitoring UI for Kafka cluster.
> We need a view from the perspective of the machines in the cluster.
> Following issues need to be addressed using a UI:
> 1) Resource usage and Kafka statistics by machine.
> 2) View of the partition and replication layout by machine.
> 3) View of spindle usage (or different log directories usage) pattern by 
> machine.
> 4) Ability to move replicas among brokers using the UI and by leveraging the 
> Reassign Partitions Tool.
> More details in the doc in the External Issue Url field.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3786) Avoid unused property from parent configs causing WARN entries

2016-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313547#comment-15313547
 ] 

ASF GitHub Bot commented on KAFKA-3786:
---

GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/1465

KAFKA-3786: Let ConfigDef filter property key value pairs



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka K3786-config-parsing

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1465.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1465


commit 1f5b46c78406b669c1b3e367bc16a257d5ea970c
Author: Guozhang Wang 
Date:   2016-06-03T04:22:20Z

let config def filter property key value pairs




> Avoid unused property from parent configs causing WARN entries
> --
>
> Key: KAFKA-3786
> URL: https://issues.apache.org/jira/browse/KAFKA-3786
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>
> Currently the {{AbstractConfig}}'s constructor accepts the passed property 
> map as well as the {{ConfigDef}}, and maintains the original map as well as 
> the parsed values together. Because of it, with hierarchical config passing 
> like {{StreamsConfig}}, the underlying configs will takes all the key-value 
> pairs when constructed and hence cause WARNING log output.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1465: KAFKA-3786: Let ConfigDef filter property key valu...

2016-06-02 Thread guozhangwang
GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/1465

KAFKA-3786: Let ConfigDef filter property key value pairs



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka K3786-config-parsing

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1465.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1465


commit 1f5b46c78406b669c1b3e367bc16a257d5ea970c
Author: Guozhang Wang 
Date:   2016-06-03T04:22:20Z

let config def filter property key value pairs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-3786) Avoid unused property from parent configs causing WARN entries

2016-06-02 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3786:
-
Summary: Avoid unused property from parent configs causing WARN entries  
(was: AbstractConfig to only retain defined ConfigDef when parsing property map)

> Avoid unused property from parent configs causing WARN entries
> --
>
> Key: KAFKA-3786
> URL: https://issues.apache.org/jira/browse/KAFKA-3786
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>
> Currently the {{AbstractConfig}}'s constructor accepts the passed property 
> map as well as the {{ConfigDef}}, and maintains the original map as well as 
> the parsed values together. Because of it, with hierarchical config passing 
> like {{StreamsConfig}}, the underlying configs will takes all the key-value 
> pairs when constructed and hence cause WARNING log output.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3786) AbstractConfig to only retain defined ConfigDef when parsing property map

2016-06-02 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3786:
-
Description: Currently the {{AbstractConfig}}'s constructor accepts the 
passed property map as well as the {{ConfigDef}}, and maintains the original 
map as well as the parsed values together. Because of it, with hierarchical 
config passing like {{StreamsConfig}}, the underlying configs will takes all 
the key-value pairs when constructed and hence cause WARNING log output.  (was: 
Currently the {{AbstractConfig}}'s constructor accepts the passed property map 
as well as the {{ConfigDef}}, and maintains the original map as well as the 
parsed values together. Because of it, with hierarchical config passing like 
{{StreamsConfig}}, the underlying configs will takes all the key-value pairs 
when constructed and hence cause WARNING log output.

We can consider adding an overloaded constructor in {{AbstractConfig}} to only 
retains its defined configs as in {{ConfigDef}}.)

> AbstractConfig to only retain defined ConfigDef when parsing property map
> -
>
> Key: KAFKA-3786
> URL: https://issues.apache.org/jira/browse/KAFKA-3786
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>
> Currently the {{AbstractConfig}}'s constructor accepts the passed property 
> map as well as the {{ConfigDef}}, and maintains the original map as well as 
> the parsed values together. Because of it, with hierarchical config passing 
> like {{StreamsConfig}}, the underlying configs will takes all the key-value 
> pairs when constructed and hence cause WARNING log output.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3786) AbstractConfig to only retain defined ConfigDef when parsing property map

2016-06-02 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3786:
-
Description: 
Currently the {{AbstractConfig}}'s constructor accepts the passed property map 
as well as the {{ConfigDef}}, and maintains the original map as well as the 
parsed values together. Because of it, with hierarchical config passing like 
{{StreamsConfig}}, the underlying configs will takes all the key-value pairs 
when constructed and hence cause WARNING log output.

We can consider adding an overloaded constructor in {{AbstractConfig}} to only 
retains its defined configs as in {{ConfigDef}}.

  was:
Currently the {{AbstractConfig}}'s constructor accepts the passed property map 
as well as the {{ConfigDef}}, and maintains the original map as well as the 
parsed values together. Because of it, with hierarchical config passing like 
{{StreamsConfig}}, the underlying configs will takes all the key-value pairs 
when constructed and hence cause WARNING log output.

We can consider adding another static constructor in {{AbstractConfig}} to only 
retains its defined configs as in {{ConfigDef}}.


> AbstractConfig to only retain defined ConfigDef when parsing property map
> -
>
> Key: KAFKA-3786
> URL: https://issues.apache.org/jira/browse/KAFKA-3786
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>
> Currently the {{AbstractConfig}}'s constructor accepts the passed property 
> map as well as the {{ConfigDef}}, and maintains the original map as well as 
> the parsed values together. Because of it, with hierarchical config passing 
> like {{StreamsConfig}}, the underlying configs will takes all the key-value 
> pairs when constructed and hence cause WARNING log output.
> We can consider adding an overloaded constructor in {{AbstractConfig}} to 
> only retains its defined configs as in {{ConfigDef}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3786) AbstractConfig to only retain defined ConfigDef when parsing property map

2016-06-02 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-3786:


 Summary: AbstractConfig to only retain defined ConfigDef when 
parsing property map
 Key: KAFKA-3786
 URL: https://issues.apache.org/jira/browse/KAFKA-3786
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang


Currently the {{AbstractConfig}}'s constructor accepts the passed property map 
as well as the {{ConfigDef}}, and maintains the original map as well as the 
parsed values together. Because of it, with hierarchical config passing like 
{{StreamsConfig}}, the underlying configs will takes all the key-value pairs 
when constructed and hence cause WARNING log output.

We can consider adding another static constructor in {{AbstractConfig}} to only 
retains its defined configs as in {{ConfigDef}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-62: Allow consumer to send heartbeats from a background thread

2016-06-02 Thread Guozhang Wang
Hi Ewen,

I think you are right, the rebalance process could potentially involve all
the delayed compute / IO. More specifically, this is what I think of the
rebalance process:

1. Coordinator decides to rebalance, start ticking based on rebalance time
out.
2. Consumer realize rebalance needed when calling poll(); trigger
onPartitionRevoked().
3. Consumer sends JoinGroupRequest;
4. Coordinator send JoinGroupResponse; start ticking on the leader.
5. Leader compute and send SyncGroupRequest
6. Coordinator send SyncGroupResponse; start ticking on session timeout.
7. Consumer get new assignment; trigger onPartitionAssigned().

In the above process: delayed compute / IO is usually done at step 2);
workload initialization is usually done in step 7); and some admin work
(like in Kafka Streams) are likely to be done in step 5). As in the current
KIP proposal the rebalance timeout on the coordinator start ticking on 1)
on everyone in the group, and stop ticking on 3); it start ticking on
leader again on step 4), and stop upon step 5). In this case the delayed
compute / IO contained in step 2) is covered by this rebalance timeout.

That being said, I think for "worst case", the time of processing a single
record would still be similar to rebalancing, since both of which could
result in completing all delayed compute / IO so far. And since "processing
timeout" is used to cover the worst case, it should be still OK?


Guozhang




On Thu, Jun 2, 2016 at 5:55 PM, Ewen Cheslack-Postava 
wrote:

> Jason,
>
> I've been thinking about this more in terms of something like Connect. I
> think the rebalance timeout may be a bit different from the process
> timeout, and even the process timeout is a bit of a misnomer.
>
> We sort of talk about the process timeout as if it can be an indicator of
> maximum processing time for a record/batch. This makes sense for a case of
> a data-dependent load (i.e. you can only load some data from slow storage
> after seeing some data) where that load might be very large compared to
> normal processing time. It also makes sense if you have auto commit enabled
> because you need to be completely finished processing the data before
> calling poll() again, so that time before you call another consumer API
> actually reflects processing time.
>
> It might makes less sense in cases like streams (or any other app) that
> batch writes to disk, or connectors that "process" a message by enqueuing
> the data, but won't commit offsets until data is flushed, possibly during
> some other, much later iteration of processing. In this case I think
> processing time and rebalance time could potentially differ significantly.
> During normal processing, you can potentially pipeline quite a bit,
> buffering up changes, flushing as needed, but then only committing once
> flushing is complete. But rebalancing is different then -- you *must*
> finish flushing all the data or manually choose to discard the data
> (presumably by doing something like watching for the process timeout you
> set and bailing early, only committing the offsets for data you've
> flushed). If you have lots of data built up, the cost for rebalancing could
> be a *lot* higher than the maximum time you would otherwise see between
> calls to consumer APIs to indicate processing progress.
>
> The thing that makes these cases different is that processing isn't
> actually tied to calls to the consumer API. You can queue up / pipeline /
> defer some of the work. (By the way, this is currently a limitation of sink
> connectors that I'm not thrilled about -- offset commit requires a full
> flush, whereas some coordination with the sink connector to not require a
> full flush except on rebalances would be much nicer, albeit more difficult
> for sink connectors to implement.)
>
> -Ewen
>
>
>
> On Thu, Jun 2, 2016 at 5:14 PM, Jason Gustafson 
> wrote:
>
> > Hey Guozhang,
> >
> > I'm actually not too concerned about the time spent in the rebalance
> > callback specifically. Both it and regular processing time in the poll
> loop
> > will delay the rebalance and keep joined consumers idle. However, if we
> > expose the rebalance timeout, then it would give users the option to
> > effective disable the process timeout while still keeping a maximum bound
> > on the rebalance time. If the consumer cannot complete its processing
> fast
> > enough and rejoin, then it would be evicted. This provides something like
> > (2) since the other consumers in the group would be able to complete the
> > rebalance and resume work while the evicted consumer would have to
> rollback
> > progress. This is not too different from rebalancing in the background
> > which also typically would cause commit failure and rollback (though at
> > least the consumer stays in the group).
> >
> > Now that I'm thinking about it more, I'm not sure this would be a great
> > facility to depend on in practice. It might be OK if just one or two of
> the
> > 

Jenkins build is back to normal : kafka-trunk-jdk7 #1335

2016-06-02 Thread Apache Jenkins Server
See 



[jira] [Commented] (KAFKA-3785) Fetcher spending unnecessary time during metrics recording

2016-06-02 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313461#comment-15313461
 ] 

Greg Fodor commented on KAFKA-3785:
---

https://github.com/apache/kafka/pull/1464

> Fetcher spending unnecessary time during metrics recording
> --
>
> Key: KAFKA-3785
> URL: https://issues.apache.org/jira/browse/KAFKA-3785
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Greg Fodor
>
> Profiling a Kafka Streams job revealed some hotspots in the Fetcher during 
> metrics flushing. Previous discussion here:
> https://issues.apache.org/jira/browse/KAFKA-3769



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1464: KAFKA-3785; Fetcher spending unnecessary time duri...

2016-06-02 Thread gfodor
GitHub user gfodor opened a pull request:

https://github.com/apache/kafka/pull/1464

KAFKA-3785; Fetcher spending unnecessary time during metrics recording



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/AltspaceVR/kafka gfodor/kafka-3785

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1464.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1464


commit 7744c23fac633b81de3544614d1ad14f977425bf
Author: Greg Fodor 
Date:   2016-06-03T01:47:52Z

KAFKA-3785; Fetcher spending unnecessary time during metrics recording




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3785) Fetcher spending unnecessary time during metrics recording

2016-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313460#comment-15313460
 ] 

ASF GitHub Bot commented on KAFKA-3785:
---

GitHub user gfodor opened a pull request:

https://github.com/apache/kafka/pull/1464

KAFKA-3785; Fetcher spending unnecessary time during metrics recording



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/AltspaceVR/kafka gfodor/kafka-3785

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1464.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1464


commit 7744c23fac633b81de3544614d1ad14f977425bf
Author: Greg Fodor 
Date:   2016-06-03T01:47:52Z

KAFKA-3785; Fetcher spending unnecessary time during metrics recording




> Fetcher spending unnecessary time during metrics recording
> --
>
> Key: KAFKA-3785
> URL: https://issues.apache.org/jira/browse/KAFKA-3785
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Greg Fodor
>
> Profiling a Kafka Streams job revealed some hotspots in the Fetcher during 
> metrics flushing. Previous discussion here:
> https://issues.apache.org/jira/browse/KAFKA-3769



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3785) Fetcher spending unnecessary time during metrics recording

2016-06-02 Thread Greg Fodor (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Fodor updated KAFKA-3785:
--
Component/s: consumer

> Fetcher spending unnecessary time during metrics recording
> --
>
> Key: KAFKA-3785
> URL: https://issues.apache.org/jira/browse/KAFKA-3785
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Greg Fodor
>
> Profiling a Kafka Streams job revealed some hotspots in the Fetcher during 
> metrics flushing. Previous discussion here:
> https://issues.apache.org/jira/browse/KAFKA-3769



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3785) Fetcher spending unnecessary time during metrics recording

2016-06-02 Thread Greg Fodor (JIRA)
Greg Fodor created KAFKA-3785:
-

 Summary: Fetcher spending unnecessary time during metrics recording
 Key: KAFKA-3785
 URL: https://issues.apache.org/jira/browse/KAFKA-3785
 Project: Kafka
  Issue Type: Improvement
Reporter: Greg Fodor


Profiling a Kafka Streams job revealed some hotspots in the Fetcher during 
metrics flushing. Previous discussion here:

https://issues.apache.org/jira/browse/KAFKA-3769



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3693) Race condition between highwatermark-checkpoint thread and handleLeaderAndIsrRequest at broker start-up

2016-06-02 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313441#comment-15313441
 ] 

Jun Rao commented on KAFKA-3693:


[~maysamyabandeh], yes, I agree it would be better if the broker can be more 
defensive. However, the current logic is a bit subtle. Now only do we require 
the first LeaderAndIsrRequest to contain all the partitions on this broker, but 
also that before the first LeaderAndIsrRequest, the controller sends an 
UpdateMetadataRequest to the newly started broker. This is needed since if 
security is enabled, the newly started broker needs to know all the secure 
ports and that information is currently only propagated through 
UpdateMetadataRequest.

Now, for the situation that you described, when did broker 16 join the cluster? 
Currently, there is synchronization on the controller lock between handling new 
brokers and doing controlled shutdown. So, controller shouldn't be able to see 
broker 16 after the logic of handling new broker has been completed (after 
which point, the newly started broker would have received a LeaderAndIsrRequest 
with all partitions).

> Race condition between highwatermark-checkpoint thread and 
> handleLeaderAndIsrRequest at broker start-up
> ---
>
> Key: KAFKA-3693
> URL: https://issues.apache.org/jira/browse/KAFKA-3693
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Maysam Yabandeh
>
> Upon broker start-up, a race between highwatermark-checkpoint thread to write 
> replication-offset-checkpoint file and handleLeaderAndIsrRequest thread 
> reading from it causes the highwatermark for some partitions to be reset to 
> 0. In the good case, this results the replica to truncate its entire log to 0 
> and hence initiates fetching of terabytes of data from the lead broker, which 
> sometimes leads to hours of downtime. We observed the bad cases that the 
> reset offset can propagate to recovery-point-offset-checkpoint file, making a 
> lead broker to truncate the file. This seems to have the potential to lead to 
> data loss if the truncation happens at both follower and leader brokers.
> This is the particular faulty scenario manifested in our tests:
> # The broker restarts and receive LeaderAndIsr from the controller
> # LeaderAndIsr message however does not contain all the partitions (probably 
> because other brokers were churning at the same time)
> # becomeLeaderOrFollower calls getOrCreatePartition and updates the 
> allPartitions with the partitions included in the LeaderAndIsr message {code}
>   def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
> var partition = allPartitions.get((topic, partitionId))
> if (partition == null) {
>   allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, 
> partitionId, time, this))
> {code}
> # replication-offset-checkpoint jumps in taking a snapshot of (the partial) 
> allReplicas' high watermark into replication-offset-checkpoint file {code}  
> def checkpointHighWatermarks() {
> val replicas = 
> allPartitions.values.map(_.getReplica(config.brokerId)).collect{case 
> Some(replica) => replica}{code} hence rewriting the previous highwatermarks.
> # Later becomeLeaderOrFollower calls makeLeaders and makeFollowers which read 
> the (now partial) file through Partition::getOrCreateReplica {code}
>   val checkpoint = 
> replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
>   val offsetMap = checkpoint.read
>   if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
> info("No checkpointed highwatermark is found for partition 
> [%s,%d]".format(topic, partitionId))
> {code}
> We are not entirely sure whether the initial LeaderAndIsr message including a 
> subset of partitions is critical in making this race condition manifest or 
> not. But it is an important detail since it clarifies that a solution based 
> on not letting the highwatermark-checkpoint thread jumping in the middle of 
> processing a LeaderAndIsr message would not suffice.
> The solution we are thinking of is to force initializing allPartitions by the 
> partitions listed in the replication-offset-checkpoint (and perhaps 
> recovery-point-offset-checkpoint file too) when a server starts.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-0.10.0-jdk7 #112

2016-06-02 Thread Apache Jenkins Server
See 

Changes:

[ismael] MINOR: Fix setting of ACLs and ZK shutdown in test harnesses

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on H10 (docker Ubuntu ubuntu yahoo-not-h2) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url 
 > https://git-wip-us.apache.org/repos/asf/kafka.git # timeout=10
Fetching upstream changes from https://git-wip-us.apache.org/repos/asf/kafka.git
 > git --version # timeout=10
 > git -c core.askpass=true fetch --tags --progress 
 > https://git-wip-us.apache.org/repos/asf/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/0.10.0^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/0.10.0^{commit} # timeout=10
Checking out Revision de23c6376bead90249e1e4344d8b7a5ed148fba3 
(refs/remotes/origin/0.10.0)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f de23c6376bead90249e1e4344d8b7a5ed148fba3
 > git rev-list 946ae60a4c8b694bfad65f20348d8081f103830b # timeout=10
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK_1_7U51_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk-1.7u51
[kafka-0.10.0-jdk7] $ /bin/bash -xe /tmp/hudson4847495676898713649.sh
+ 
/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2/bin/gradle
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
http://gradle.org/docs/2.4-rc-2/userguide/gradle_daemon.html.
Building project 'core' with Scala version 2.10.6
:downloadWrapper

BUILD SUCCESSFUL

Total time: 35.834 secs
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK_1_7U51_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk-1.7u51
[kafka-0.10.0-jdk7] $ /bin/bash -xe /tmp/hudson1839548550475707797.sh
+ export GRADLE_OPTS=-Xmx1024m
+ GRADLE_OPTS=-Xmx1024m
+ ./gradlew -Dorg.gradle.project.maxParallelForks=1 clean jarAll testAll
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
https://docs.gradle.org/2.13/userguide/gradle_daemon.html.
Building project 'core' with Scala version 2.10.6
Build file ': 
line 231
useAnt has been deprecated and is scheduled to be removed in Gradle 3.0. The 
Ant-Based Scala compiler is deprecated, please see 
https://docs.gradle.org/current/userguide/scala_plugin.html.
:clean UP-TO-DATE
:clients:clean UP-TO-DATE
:connect:clean UP-TO-DATE
:core:clean
:examples:clean UP-TO-DATE
:log4j-appender:clean UP-TO-DATE
:streams:clean UP-TO-DATE
:tools:clean UP-TO-DATE
:connect:api:clean UP-TO-DATE
:connect:file:clean UP-TO-DATE
:connect:json:clean UP-TO-DATE
:connect:runtime:clean UP-TO-DATE
:streams:examples:clean UP-TO-DATE
:jar_core_2_10
Building project 'core' with Scala version 2.10.6
:kafka-0.10.0-jdk7:clients:compileJava
:jar_core_2_10 FAILED

FAILURE: Build failed with an exception.

* What went wrong:
org.gradle.api.internal.changedetection.state.FileCollectionSnapshotImpl cannot 
be cast to 
org.gradle.api.internal.changedetection.state.OutputFilesCollectionSnapshotter$OutputFilesSnapshot

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug 
option to get more log output.

BUILD FAILED

Total time: 31.285 secs
Build step 'Execute shell' marked build as failure
Recording test results
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK_1_7U51_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk-1.7u51
ERROR: Step ?Publish JUnit test result report? failed: No test report files 
were found. Configuration error?
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK_1_7U51_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk-1.7u51


[jira] [Commented] (KAFKA-3770) KStream job should be able to specify linger.ms

2016-06-02 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313434#comment-15313434
 ] 

Greg Fodor commented on KAFKA-3770:
---

Cut a new PR since I screwed up the rebase on the previous one. This one adds a 
test, etc.

https://github.com/apache/kafka/pull/1463

> KStream job should be able to specify linger.ms
> ---
>
> Key: KAFKA-3770
> URL: https://issues.apache.org/jira/browse/KAFKA-3770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> The default linger.ms hardcoded into the StreamsConfig class of 100ms is 
> problematic for jobs that have lots of tasks, since this latency can accrue. 
> It seems useful to be able to override the linger.ms in the StreamsConfig. 
> Attached is a PR which allows this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3770) KStream job should be able to specify linger.ms

2016-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313432#comment-15313432
 ] 

ASF GitHub Bot commented on KAFKA-3770:
---

GitHub user gfodor opened a pull request:

https://github.com/apache/kafka/pull/1463

KAFKA-3770: KStream job should be able to specify linger.ms



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/AltspaceVR/kafka gfodor/kafka-3770-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1463.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1463


commit 6e4c50a873efb6c3123c46598c42b6b5d625e150
Author: Greg Fodor 
Date:   2016-06-03T01:14:28Z

KAFKA-3770; Added ability for streams job to override linger.ms and 
enable.auto.commit




> KStream job should be able to specify linger.ms
> ---
>
> Key: KAFKA-3770
> URL: https://issues.apache.org/jira/browse/KAFKA-3770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> The default linger.ms hardcoded into the StreamsConfig class of 100ms is 
> problematic for jobs that have lots of tasks, since this latency can accrue. 
> It seems useful to be able to override the linger.ms in the StreamsConfig. 
> Attached is a PR which allows this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1463: KAFKA-3770: KStream job should be able to specify ...

2016-06-02 Thread gfodor
GitHub user gfodor opened a pull request:

https://github.com/apache/kafka/pull/1463

KAFKA-3770: KStream job should be able to specify linger.ms



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/AltspaceVR/kafka gfodor/kafka-3770-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1463.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1463


commit 6e4c50a873efb6c3123c46598c42b6b5d625e150
Author: Greg Fodor 
Date:   2016-06-03T01:14:28Z

KAFKA-3770; Added ability for streams job to override linger.ms and 
enable.auto.commit




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Build failed in Jenkins: kafka-trunk-jdk8 #671

2016-06-02 Thread Apache Jenkins Server
See 

Changes:

[ismael] MINOR: Fix setting of ACLs and ZK shutdown in test harnesses

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on H11 (docker Ubuntu ubuntu yahoo-not-h2) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url 
 > https://git-wip-us.apache.org/repos/asf/kafka.git # timeout=10
Fetching upstream changes from https://git-wip-us.apache.org/repos/asf/kafka.git
 > git --version # timeout=10
 > git -c core.askpass=true fetch --tags --progress 
 > https://git-wip-us.apache.org/repos/asf/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/trunk^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/trunk^{commit} # timeout=10
Checking out Revision 1029030466f01937d416e11f93562bcaaecce253 
(refs/remotes/origin/trunk)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 1029030466f01937d416e11f93562bcaaecce253
 > git rev-list 245fa2bd8c08bb17c0b2b7aad4aba145d3c49783 # timeout=10
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK1_8_0_66_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk1.8.0_66
[kafka-trunk-jdk8] $ /bin/bash -xe /tmp/hudson6425358562789309357.sh
+ 
/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2/bin/gradle
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
http://gradle.org/docs/2.4-rc-2/userguide/gradle_daemon.html.
Building project 'core' with Scala version 2.10.6
:downloadWrapper

BUILD SUCCESSFUL

Total time: 10.541 secs
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK1_8_0_66_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk1.8.0_66
[kafka-trunk-jdk8] $ /bin/bash -xe /tmp/hudson5738409078862331328.sh
+ export GRADLE_OPTS=-Xmx1024m
+ GRADLE_OPTS=-Xmx1024m
+ ./gradlew -Dorg.gradle.project.maxParallelForks=1 clean jarAll testAll
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
https://docs.gradle.org/2.13/userguide/gradle_daemon.html.
Building project 'core' with Scala version 2.10.6
Build file ': 
line 231
useAnt has been deprecated and is scheduled to be removed in Gradle 3.0. The 
Ant-Based Scala compiler is deprecated, please see 
https://docs.gradle.org/current/userguide/scala_plugin.html.
:clean UP-TO-DATE
:clients:clean
:connect:clean UP-TO-DATE
:core:clean
:examples:clean UP-TO-DATE
:log4j-appender:clean UP-TO-DATE
:streams:clean UP-TO-DATE
:tools:clean UP-TO-DATE
:connect:api:clean UP-TO-DATE
:connect:file:clean UP-TO-DATE
:connect:json:clean UP-TO-DATE
:connect:runtime:clean UP-TO-DATE
:streams:examples:clean UP-TO-DATE
:jar_core_2_10
Building project 'core' with Scala version 2.10.6
:kafka-trunk-jdk8:clients:compileJavawarning: [options] bootstrap class path 
not set in conjunction with -source 1.7
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
1 warning

:kafka-trunk-jdk8:clients:processResources UP-TO-DATE
:kafka-trunk-jdk8:clients:classes
:kafka-trunk-jdk8:clients:determineCommitId UP-TO-DATE
:kafka-trunk-jdk8:clients:createVersionFile
:kafka-trunk-jdk8:clients:jar
:kafka-trunk-jdk8:core:compileJava UP-TO-DATE
:kafka-trunk-jdk8:core:compileScalaJava HotSpot(TM) 64-Bit Server VM warning: 
ignoring option MaxPermSize=512m; support was removed in 8.0

:79:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.

org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP
 ^
:36:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
 commitTimestamp: Long = 
org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,

  ^
:37:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
 

[jira] [Commented] (KAFKA-3770) KStream job should be able to specify linger.ms

2016-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313424#comment-15313424
 ] 

ASF GitHub Bot commented on KAFKA-3770:
---

Github user gfodor closed the pull request at:

https://github.com/apache/kafka/pull/1448


> KStream job should be able to specify linger.ms
> ---
>
> Key: KAFKA-3770
> URL: https://issues.apache.org/jira/browse/KAFKA-3770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> The default linger.ms hardcoded into the StreamsConfig class of 100ms is 
> problematic for jobs that have lots of tasks, since this latency can accrue. 
> It seems useful to be able to override the linger.ms in the StreamsConfig. 
> Attached is a PR which allows this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1448: KAFKA-3770: KStream job should be able to specify ...

2016-06-02 Thread gfodor
Github user gfodor closed the pull request at:

https://github.com/apache/kafka/pull/1448


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-62: Allow consumer to send heartbeats from a background thread

2016-06-02 Thread Guozhang Wang
Okay, now I understand that the usefulness of a third rebalance timeout is
only when users do not care about process timeout (i.e. they just make it
infinity), but still want to let the rebalance to finish in a reasonable
amount of time even some consumers are delayed in processing before their
next "poll".

And I also agree with your previous comment that for this case, users still
need to have some knowledge about the worst case processing latency of a
batch; and since they need to learn that anyways, why not they just set the
processing time accordingly. So this extra config may not be that useful in
terms of “incremental benefits”. So I am still in favor of keeping the
current KIP as is.


Grant, Onur: do you have any other thoughts? If not I would suggest Jason
to start the voting thread some time end of this week.


Guozhang



On Thu, Jun 2, 2016 at 5:14 PM, Jason Gustafson  wrote:

> Hey Guozhang,
>
> I'm actually not too concerned about the time spent in the rebalance
> callback specifically. Both it and regular processing time in the poll loop
> will delay the rebalance and keep joined consumers idle. However, if we
> expose the rebalance timeout, then it would give users the option to
> effective disable the process timeout while still keeping a maximum bound
> on the rebalance time. If the consumer cannot complete its processing fast
> enough and rejoin, then it would be evicted. This provides something like
> (2) since the other consumers in the group would be able to complete the
> rebalance and resume work while the evicted consumer would have to rollback
> progress. This is not too different from rebalancing in the background
> which also typically would cause commit failure and rollback (though at
> least the consumer stays in the group).
>
> Now that I'm thinking about it more, I'm not sure this would be a great
> facility to depend on in practice. It might be OK if just one or two of the
> consumers fall out of the group during the rebalance, but if half the group
> is regularly getting evicted, it would be a problem. So even if we expose
> the rebalance timeout, the user is still going to have to set it with some
> idea in mind about how long processing should take.
>
> Thanks,
> Jason
>
> On Thu, Jun 2, 2016 at 2:46 PM, Guozhang Wang  wrote:
>
> > Hi Jason,
> >
> > With the current usage pattern of:
> >
> > while(..) {
> >
> >   consumer.poll(/* where rebalance happens */)
> >
> >   // process messages
> > }
> >
> > --
> >
> > And since rebalance is till on the caller thread, not the background
> > thread, if coordinator decides to rebalance while user thread is still on
> > processing messages, there is no options but we are forced to go with 1)
> > right? I think the your / Onur's point here, which I agree, is that by
> > reusing process timeout as rebalance timeout, if the rebalance callback
> > could take longer time than processing a batch, users need to set the
> > timeout value to the higher of the two, hence the callback latency, which
> > will make detection of processing stallness less effective, right?
> >
> > As I mentioned  in my previous email, I feel that this case of "callback
> > function time taking loner than processing a batch" would not be frequent
> > in practice, and the processing timeout would usually be a good higher
> > bound on the callback function latency. If that is true, I'd suggest we
> > keep the current proposal and not add a third timeout config for covering
> > this case.
> >
> >
> > Guozhang
> >
> >
> > On Thu, Jun 2, 2016 at 10:40 AM, Jason Gustafson 
> > wrote:
> >
> > > Hey Guozhang,
> > >
> > > I think the problem is that users may not want to sacrifice rebalance
> > > latency because of uncertainty around processing time. As soon as a
> > > rebalance begins, there are basically two choices:
> > >
> > > 1. Block the rebalance until all consumers have finished their current
> > > processing.
> > > 2. Let all consumers rebalance and "rollback" any processing that could
> > not
> > > be committed before the rebalance completes.
> > >
> > > If you choose option (1), then you have an incentive to keep a
> relatively
> > > tight bound on process.timeout.ms in order to reduce the worst-case
> idle
> > > time during a rebalance. But if you fail to set it high enough, then
> > you'll
> > > get spurious rebalances during normal processing. I think Onur is
> saying
> > > that this still sort of sucks for users. On the other hand, if (2) is
> > > acceptable, then users will have more freedom to err on the high side
> > when
> > > setting process.timeout.ms, or even disable it entirely. They will
> have
> > to
> > > deal with rolling back any progress which cannot be committed after the
> > > rebalance completes, but maybe this is less of a problem for some
> users?
> > >
> > > Thanks,
> > > Jason
> > >
> > >
> > >
> > > On Wed, Jun 1, 2016 at 10:23 PM, Guozhang Wang 
> 

Re: [DISCUSS] KIP-62: Allow consumer to send heartbeats from a background thread

2016-06-02 Thread Ewen Cheslack-Postava
Jason,

I've been thinking about this more in terms of something like Connect. I
think the rebalance timeout may be a bit different from the process
timeout, and even the process timeout is a bit of a misnomer.

We sort of talk about the process timeout as if it can be an indicator of
maximum processing time for a record/batch. This makes sense for a case of
a data-dependent load (i.e. you can only load some data from slow storage
after seeing some data) where that load might be very large compared to
normal processing time. It also makes sense if you have auto commit enabled
because you need to be completely finished processing the data before
calling poll() again, so that time before you call another consumer API
actually reflects processing time.

It might makes less sense in cases like streams (or any other app) that
batch writes to disk, or connectors that "process" a message by enqueuing
the data, but won't commit offsets until data is flushed, possibly during
some other, much later iteration of processing. In this case I think
processing time and rebalance time could potentially differ significantly.
During normal processing, you can potentially pipeline quite a bit,
buffering up changes, flushing as needed, but then only committing once
flushing is complete. But rebalancing is different then -- you *must*
finish flushing all the data or manually choose to discard the data
(presumably by doing something like watching for the process timeout you
set and bailing early, only committing the offsets for data you've
flushed). If you have lots of data built up, the cost for rebalancing could
be a *lot* higher than the maximum time you would otherwise see between
calls to consumer APIs to indicate processing progress.

The thing that makes these cases different is that processing isn't
actually tied to calls to the consumer API. You can queue up / pipeline /
defer some of the work. (By the way, this is currently a limitation of sink
connectors that I'm not thrilled about -- offset commit requires a full
flush, whereas some coordination with the sink connector to not require a
full flush except on rebalances would be much nicer, albeit more difficult
for sink connectors to implement.)

-Ewen



On Thu, Jun 2, 2016 at 5:14 PM, Jason Gustafson  wrote:

> Hey Guozhang,
>
> I'm actually not too concerned about the time spent in the rebalance
> callback specifically. Both it and regular processing time in the poll loop
> will delay the rebalance and keep joined consumers idle. However, if we
> expose the rebalance timeout, then it would give users the option to
> effective disable the process timeout while still keeping a maximum bound
> on the rebalance time. If the consumer cannot complete its processing fast
> enough and rejoin, then it would be evicted. This provides something like
> (2) since the other consumers in the group would be able to complete the
> rebalance and resume work while the evicted consumer would have to rollback
> progress. This is not too different from rebalancing in the background
> which also typically would cause commit failure and rollback (though at
> least the consumer stays in the group).
>
> Now that I'm thinking about it more, I'm not sure this would be a great
> facility to depend on in practice. It might be OK if just one or two of the
> consumers fall out of the group during the rebalance, but if half the group
> is regularly getting evicted, it would be a problem. So even if we expose
> the rebalance timeout, the user is still going to have to set it with some
> idea in mind about how long processing should take.
>
> Thanks,
> Jason
>
> On Thu, Jun 2, 2016 at 2:46 PM, Guozhang Wang  wrote:
>
> > Hi Jason,
> >
> > With the current usage pattern of:
> >
> > while(..) {
> >
> >   consumer.poll(/* where rebalance happens */)
> >
> >   // process messages
> > }
> >
> > --
> >
> > And since rebalance is till on the caller thread, not the background
> > thread, if coordinator decides to rebalance while user thread is still on
> > processing messages, there is no options but we are forced to go with 1)
> > right? I think the your / Onur's point here, which I agree, is that by
> > reusing process timeout as rebalance timeout, if the rebalance callback
> > could take longer time than processing a batch, users need to set the
> > timeout value to the higher of the two, hence the callback latency, which
> > will make detection of processing stallness less effective, right?
> >
> > As I mentioned  in my previous email, I feel that this case of "callback
> > function time taking loner than processing a batch" would not be frequent
> > in practice, and the processing timeout would usually be a good higher
> > bound on the callback function latency. If that is true, I'd suggest we
> > keep the current proposal and not add a third timeout config for covering
> > this case.
> >
> >
> > Guozhang
> >
> >
> > On Thu, Jun 2, 2016 at 10:40 AM, Jason 

[jira] [Commented] (KAFKA-3752) Provide a way for KStreams to recover from unclean shutdown

2016-06-02 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313385#comment-15313385
 ] 

Guozhang Wang commented on KAFKA-3752:
--

[~theduderog] I looked through the file locking issue. After the application 
was crashed, all resources including the file locks should be released by the 
OS automatically. But there is a race condition during rebalance which could 
cause one thread re-joining the group much earlier than others to trying to 
grab the lock of its migrated tasks while the other threads have not released 
the tasks, hence the lock. But to make sure your encountered issues is the same 
as I described (KAFKA-3758), I have a couple of questions for you:

1. In your case, how many KafkaStreams instances are you running, and how many 
threads per each instance?
2. When you have time, could you re-run your application and upload the full 
log to this ticket upon seeing this issue again? I tried a simple dummy 
application with SIGKILL but that cannot re-produce this issue.

> Provide a way for KStreams to recover from unclean shutdown
> ---
>
> Key: KAFKA-3752
> URL: https://issues.apache.org/jira/browse/KAFKA-3752
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Roger Hoover
>Assignee: Guozhang Wang
>  Labels: architecture
>
> If a KStream application gets killed with SIGKILL (e.g. by the Linux OOM 
> Killer), it may leave behind lock files and fail to recover.
> It would be useful to have an options (say --force) to tell KStreams to 
> proceed even if it finds old LOCK files.
> {noformat}
> [2016-05-24 17:37:52,886] ERROR Failed to create an active task #0_0 in 
> thread [StreamThread-1]:  
> (org.apache.kafka.streams.processor.internals.StreamThread:583)
> org.apache.kafka.streams.errors.ProcessorStateException: Error while creating 
> the state manager
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>   at 
> 

[jira] [Commented] (KAFKA-3777) Extract the LRU cache out of RocksDBStore

2016-06-02 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313382#comment-15313382
 ] 

Jay Kreps commented on KAFKA-3777:
--

Probably everyone already knows but just want to capture a couple of goals of 
that cache originally:
1. Avoid serialization and deserialization with the read AND write to/from 
RocksDB
2. Avoid the write to kafka and rocksdb for duplicate updates
3. Allow doing larger batch writes to rocksdb which seemed to significantly cut 
down on overhead

A long time ago in Samza we had done some benchmarking of each of these. At 
that time (3) was a big across the board impact, and (1) and (2) really depend 
on the cachability of the read and write stream.

I also think that this was done originally with leveldb and then an early 
version of rocksdb and many things may have changed since then, particularly, 
(a) I am not sure if the batch write thing matters any more, (b) I think they 
added a way to disable logging entirely which should be a big win as long as we 
handle recovery from kafka in cases of unclean shutdown.

> Extract the LRU cache out of RocksDBStore
> -
>
> Key: KAFKA-3777
> URL: https://issues.apache.org/jira/browse/KAFKA-3777
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Eno Thereska
> Fix For: 0.10.1.0
>
>
> The LRU cache that is currently inside the RocksDbStore class. As part of 
> KAFKA-3776 it needs to come outside of RocksDbStore and be a separate 
> component used in:
> 1. KGroupedStream.aggregate() / reduce(), 
> 2. KStream.aggregateByKey() / reduceByKey(),
> 3. KTable.to() (this will be done in KAFKA-3779).
> As all of the above operators can have a cache on top to deduplicate the 
> materialized state store in RocksDB.
> The scope of this JIRA is to extract out the cache of RocksDBStore, and keep 
> them as item 1) and 2) above; and it should be done together / after 
> KAFKA-3780.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1455: MINOR: Fix setting of ACLs and ZK shutdown in test...

2016-06-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1455


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3693) Race condition between highwatermark-checkpoint thread and handleLeaderAndIsrRequest at broker start-up

2016-06-02 Thread Maysam Yabandeh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313379#comment-15313379
 ] 

Maysam Yabandeh commented on KAFKA-3693:


Thanks for looking into this [~junrao]

I agree that the process for shutting down broker 20 makes sense. However what 
I understand from the logs is that broker 16 was trying to join in the middle 
of this process and that is why the first message that broker 16 received from 
the controller did not contain all the topics. (unless there is a logic in the 
code that would prevent these messages to be sent to the new brokers and I have 
missed it?)

Whether this is the actual erroneous scenario or not, one suggestion was that 
we make the broker defensive against any current or future scenario that would 
break the assumption that a broker will receive the complete list of the topics 
in the first LeaderAndIsr message. This assumption being violated (by either 
the current code base or possibly a future patch) results into broker 
truncating its log to 0, and could potentially lead to data loss.

Since reproducing this scenario by mass restarting the cluster is very 
troublesome (many hours of downtime) I had tried to reproduce with smaller toy 
setup of 3 nodes but as the buggy scenario explained above suggests it requires 
a specific timing that is likely to occur when the cluster has >=10 nodes with 
lots of topics. From this point on If we agree that the suggested scenario 
sounds plausible I guess attempting to put together a unit test would be a more 
practical approach to reproduce the bug.

> Race condition between highwatermark-checkpoint thread and 
> handleLeaderAndIsrRequest at broker start-up
> ---
>
> Key: KAFKA-3693
> URL: https://issues.apache.org/jira/browse/KAFKA-3693
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Maysam Yabandeh
>
> Upon broker start-up, a race between highwatermark-checkpoint thread to write 
> replication-offset-checkpoint file and handleLeaderAndIsrRequest thread 
> reading from it causes the highwatermark for some partitions to be reset to 
> 0. In the good case, this results the replica to truncate its entire log to 0 
> and hence initiates fetching of terabytes of data from the lead broker, which 
> sometimes leads to hours of downtime. We observed the bad cases that the 
> reset offset can propagate to recovery-point-offset-checkpoint file, making a 
> lead broker to truncate the file. This seems to have the potential to lead to 
> data loss if the truncation happens at both follower and leader brokers.
> This is the particular faulty scenario manifested in our tests:
> # The broker restarts and receive LeaderAndIsr from the controller
> # LeaderAndIsr message however does not contain all the partitions (probably 
> because other brokers were churning at the same time)
> # becomeLeaderOrFollower calls getOrCreatePartition and updates the 
> allPartitions with the partitions included in the LeaderAndIsr message {code}
>   def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
> var partition = allPartitions.get((topic, partitionId))
> if (partition == null) {
>   allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, 
> partitionId, time, this))
> {code}
> # replication-offset-checkpoint jumps in taking a snapshot of (the partial) 
> allReplicas' high watermark into replication-offset-checkpoint file {code}  
> def checkpointHighWatermarks() {
> val replicas = 
> allPartitions.values.map(_.getReplica(config.brokerId)).collect{case 
> Some(replica) => replica}{code} hence rewriting the previous highwatermarks.
> # Later becomeLeaderOrFollower calls makeLeaders and makeFollowers which read 
> the (now partial) file through Partition::getOrCreateReplica {code}
>   val checkpoint = 
> replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
>   val offsetMap = checkpoint.read
>   if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
> info("No checkpointed highwatermark is found for partition 
> [%s,%d]".format(topic, partitionId))
> {code}
> We are not entirely sure whether the initial LeaderAndIsr message including a 
> subset of partitions is critical in making this race condition manifest or 
> not. But it is an important detail since it clarifies that a solution based 
> on not letting the highwatermark-checkpoint thread jumping in the middle of 
> processing a LeaderAndIsr message would not suffice.
> The solution we are thinking of is to force initializing allPartitions by the 
> partitions listed in the replication-offset-checkpoint (and perhaps 
> recovery-point-offset-checkpoint file 

[jira] [Commented] (KAFKA-3784) TimeWindows#windowsFor misidentifies some windows if TimeWindows#advanceBy is used

2016-06-02 Thread Tom Rybak (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313374#comment-15313374
 ] 

Tom Rybak commented on KAFKA-3784:
--

I made a pull request with the fix, plus a unit test that can be used to verify 
that the problem used to exist. 

https://github.com/apache/kafka/pull/1462

> TimeWindows#windowsFor misidentifies some windows if TimeWindows#advanceBy is 
> used
> --
>
> Key: KAFKA-3784
> URL: https://issues.apache.org/jira/browse/KAFKA-3784
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: Tom Rybak
>Priority: Minor
>
> Using a time window of size 6 minutes with a 5 minute advanceBy results in 
> some of the timestamped data being inserted into the previous overlapping 
> window even though the event's timestamp > that window's end time. 
> The fault lies in TimeWindows#windowsFor which does not check that all the 
> windows it's adding have an endTime > event's timestamp. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3784) TimeWindows#windowsFor misidentifies some windows if TimeWindows#advanceBy is used

2016-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313368#comment-15313368
 ] 

ASF GitHub Bot commented on KAFKA-3784:
---

GitHub user trybak opened a pull request:

https://github.com/apache/kafka/pull/1462

KAFKA-3784 TimeWindows#windowsFor misidentifies some windows if 
TimeWindows#advanceBy is used

- Fixed the logic calculating the windows that are affected by a new …event 
in the case of hopping windows and a small overlap.
- Added a unit test that tests for the issue

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/trybak/kafka 
bugfix/KAFKA-3784-TimeWindows#windowsFor-false-positives

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1462.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1462


commit 6705e3167b548e6e4457b5c99e4fe271f6726501
Author: Tom Rybak 
Date:   2016-06-03T00:10:21Z

- Fixed the logic calculating the windows that are affected by a new event 
in the case of hopping windows and a small overlap.
- Added a unit test that tests for the issue




> TimeWindows#windowsFor misidentifies some windows if TimeWindows#advanceBy is 
> used
> --
>
> Key: KAFKA-3784
> URL: https://issues.apache.org/jira/browse/KAFKA-3784
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: Tom Rybak
>Priority: Minor
>
> Using a time window of size 6 minutes with a 5 minute advanceBy results in 
> some of the timestamped data being inserted into the previous overlapping 
> window even though the event's timestamp > that window's end time. 
> The fault lies in TimeWindows#windowsFor which does not check that all the 
> windows it's adding have an endTime > event's timestamp. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3769) KStream job spending 60% of time writing metrics

2016-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313369#comment-15313369
 ] 

ASF GitHub Bot commented on KAFKA-3769:
---

Github user gfodor closed the pull request at:

https://github.com/apache/kafka/pull/1447


> KStream job spending 60% of time writing metrics
> 
>
> Key: KAFKA-3769
> URL: https://issues.apache.org/jira/browse/KAFKA-3769
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>Priority: Critical
>
> I've been profiling a complex streams job, and found two major hotspots when 
> writing metrics, which take up about 60% of the CPU time of the job. (!) A PR 
> is attached.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1447: KAFKA-3769 - KStream job spending 60% of time writ...

2016-06-02 Thread gfodor
Github user gfodor closed the pull request at:

https://github.com/apache/kafka/pull/1447


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #1462: KAFKA-3784 TimeWindows#windowsFor misidentifies so...

2016-06-02 Thread trybak
GitHub user trybak opened a pull request:

https://github.com/apache/kafka/pull/1462

KAFKA-3784 TimeWindows#windowsFor misidentifies some windows if 
TimeWindows#advanceBy is used

- Fixed the logic calculating the windows that are affected by a new 
…event in the case of hopping windows and a small overlap.
- Added a unit test that tests for the issue

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/trybak/kafka 
bugfix/KAFKA-3784-TimeWindows#windowsFor-false-positives

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1462.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1462


commit 6705e3167b548e6e4457b5c99e4fe271f6726501
Author: Tom Rybak 
Date:   2016-06-03T00:10:21Z

- Fixed the logic calculating the windows that are affected by a new event 
in the case of hopping windows and a small overlap.
- Added a unit test that tests for the issue




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Build failed in Jenkins: kafka-trunk-jdk7 #1334

2016-06-02 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] HOTFIX: updated JavaDoc example for 0.9 tech-prev to 0.10

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on H11 (docker Ubuntu ubuntu yahoo-not-h2) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url 
 > https://git-wip-us.apache.org/repos/asf/kafka.git # timeout=10
Fetching upstream changes from https://git-wip-us.apache.org/repos/asf/kafka.git
 > git --version # timeout=10
 > git -c core.askpass=true fetch --tags --progress 
 > https://git-wip-us.apache.org/repos/asf/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/trunk^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/trunk^{commit} # timeout=10
Checking out Revision 245fa2bd8c08bb17c0b2b7aad4aba145d3c49783 
(refs/remotes/origin/trunk)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 245fa2bd8c08bb17c0b2b7aad4aba145d3c49783
 > git rev-list e6ca328f80a8f7504d740c9f77dfc8b04db20324 # timeout=10
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK_1_7U51_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk-1.7u51
[kafka-trunk-jdk7] $ /bin/bash -xe /tmp/hudson4903174583676124995.sh
+ 
/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2/bin/gradle
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
http://gradle.org/docs/2.4-rc-2/userguide/gradle_daemon.html.
Building project 'core' with Scala version 2.10.6
:downloadWrapper

BUILD SUCCESSFUL

Total time: 18.739 secs
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK_1_7U51_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk-1.7u51
[kafka-trunk-jdk7] $ /bin/bash -xe /tmp/hudson187376687123854714.sh
+ export GRADLE_OPTS=-Xmx1024m
+ GRADLE_OPTS=-Xmx1024m
+ ./gradlew -Dorg.gradle.project.maxParallelForks=1 clean jarAll testAll
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
https://docs.gradle.org/2.13/userguide/gradle_daemon.html.
Building project 'core' with Scala version 2.10.6
Build file ': 
line 231
useAnt has been deprecated and is scheduled to be removed in Gradle 3.0. The 
Ant-Based Scala compiler is deprecated, please see 
https://docs.gradle.org/current/userguide/scala_plugin.html.
:clean UP-TO-DATE
:clients:clean UP-TO-DATE
:connect:clean UP-TO-DATE
:core:clean
:examples:clean UP-TO-DATE
:log4j-appender:clean UP-TO-DATE
:streams:clean UP-TO-DATE
:tools:clean UP-TO-DATE
:connect:api:clean UP-TO-DATE
:connect:file:clean UP-TO-DATE
:connect:json:clean UP-TO-DATE
:connect:runtime:clean UP-TO-DATE
:streams:examples:clean UP-TO-DATE
:jar_core_2_10
Building project 'core' with Scala version 2.10.6
:kafka-trunk-jdk7:clients:compileJava
:jar_core_2_10 FAILED

FAILURE: Build failed with an exception.

* What went wrong:
Failed to capture snapshot of input files for task 'compileJava' during 
up-to-date check.
> Could not add entry 
> '/home/jenkins/.gradle/caches/modules-2/files-2.1/net.jpountz.lz4/lz4/1.3.0/c708bb2590c0652a642236ef45d9f99ff842a2ce/lz4-1.3.0.jar'
>  to cache fileHashes.bin 
> (

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug 
option to get more log output.

BUILD FAILED

Total time: 21.852 secs
Build step 'Execute shell' marked build as failure
Recording test results
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK_1_7U51_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk-1.7u51
ERROR: Step ?Publish JUnit test result report? failed: No test report files 
were found. Configuration error?
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK_1_7U51_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk-1.7u51


[jira] [Created] (KAFKA-3784) TimeWindows#windowsFor misidentifies some windows if TimeWindows#advanceBy is used

2016-06-02 Thread Tom Rybak (JIRA)
Tom Rybak created KAFKA-3784:


 Summary: TimeWindows#windowsFor misidentifies some windows if 
TimeWindows#advanceBy is used
 Key: KAFKA-3784
 URL: https://issues.apache.org/jira/browse/KAFKA-3784
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.10.0.0
Reporter: Tom Rybak
Priority: Minor


Using a time window of size 6 minutes with a 5 minute advanceBy results in some 
of the timestamped data being inserted into the previous overlapping window 
even though the event's timestamp > that window's end time. 

The fault lies in TimeWindows#windowsFor which does not check that all the 
windows it's adding have an endTime > event's timestamp. 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3769) KStream job spending 60% of time writing metrics

2016-06-02 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313295#comment-15313295
 ] 

Guozhang Wang commented on KAFKA-3769:
--

Hello Greg, I will continue the discussion in your PR.

> KStream job spending 60% of time writing metrics
> 
>
> Key: KAFKA-3769
> URL: https://issues.apache.org/jira/browse/KAFKA-3769
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>Priority: Critical
>
> I've been profiling a complex streams job, and found two major hotspots when 
> writing metrics, which take up about 60% of the CPU time of the job. (!) A PR 
> is attached.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk8 #670

2016-06-02 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] HOTFIX: updated JavaDoc example for 0.9 tech-prev to 0.10

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on H11 (docker Ubuntu ubuntu yahoo-not-h2) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url 
 > https://git-wip-us.apache.org/repos/asf/kafka.git # timeout=10
Fetching upstream changes from https://git-wip-us.apache.org/repos/asf/kafka.git
 > git --version # timeout=10
 > git -c core.askpass=true fetch --tags --progress 
 > https://git-wip-us.apache.org/repos/asf/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/trunk^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/trunk^{commit} # timeout=10
Checking out Revision 245fa2bd8c08bb17c0b2b7aad4aba145d3c49783 
(refs/remotes/origin/trunk)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 245fa2bd8c08bb17c0b2b7aad4aba145d3c49783
 > git rev-list e6ca328f80a8f7504d740c9f77dfc8b04db20324 # timeout=10
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK1_8_0_66_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk1.8.0_66
[kafka-trunk-jdk8] $ /bin/bash -xe /tmp/hudson1019104175516430113.sh
+ 
/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2/bin/gradle
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
http://gradle.org/docs/2.4-rc-2/userguide/gradle_daemon.html.
Building project 'core' with Scala version 2.10.6
:downloadWrapper

BUILD SUCCESSFUL

Total time: 20.435 secs
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK1_8_0_66_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk1.8.0_66
[kafka-trunk-jdk8] $ /bin/bash -xe /tmp/hudson4505135543473674597.sh
+ export GRADLE_OPTS=-Xmx1024m
+ GRADLE_OPTS=-Xmx1024m
+ ./gradlew -Dorg.gradle.project.maxParallelForks=1 clean jarAll testAll
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
https://docs.gradle.org/2.13/userguide/gradle_daemon.html.
Building project 'core' with Scala version 2.10.6
Build file ': 
line 231
useAnt has been deprecated and is scheduled to be removed in Gradle 3.0. The 
Ant-Based Scala compiler is deprecated, please see 
https://docs.gradle.org/current/userguide/scala_plugin.html.
:clean UP-TO-DATE
:clients:clean
:connect:clean UP-TO-DATE
:core:clean
:examples:clean UP-TO-DATE
:log4j-appender:clean UP-TO-DATE
:streams:clean UP-TO-DATE
:tools:clean UP-TO-DATE
:connect:api:clean UP-TO-DATE
:connect:file:clean UP-TO-DATE
:connect:json:clean UP-TO-DATE
:connect:runtime:clean UP-TO-DATE
:streams:examples:clean UP-TO-DATE
:jar_core_2_10
Building project 'core' with Scala version 2.10.6
:kafka-trunk-jdk8:clients:compileJavawarning: [options] bootstrap class path 
not set in conjunction with -source 1.7
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
1 warning

:kafka-trunk-jdk8:clients:processResources UP-TO-DATE
:kafka-trunk-jdk8:clients:classes
:kafka-trunk-jdk8:clients:determineCommitId UP-TO-DATE
:kafka-trunk-jdk8:clients:createVersionFile
:kafka-trunk-jdk8:clients:jar
:kafka-trunk-jdk8:core:compileJava UP-TO-DATE
:kafka-trunk-jdk8:core:compileScalaJava HotSpot(TM) 64-Bit Server VM warning: 
ignoring option MaxPermSize=512m; support was removed in 8.0

:79:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.

org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP
 ^
:36:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
 commitTimestamp: Long = 
org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,

  ^
:37:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
 

[jira] [Commented] (KAFKA-3335) Kafka Connect hangs in shutdown hook

2016-06-02 Thread Ben Kirwin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313267#comment-15313267
 ] 

Ben Kirwin commented on KAFKA-3335:
---

Excellent -- thanks for following up on this!

> Kafka Connect hangs in shutdown hook
> 
>
> Key: KAFKA-3335
> URL: https://issues.apache.org/jira/browse/KAFKA-3335
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Ben Kirwin
> Fix For: 0.10.0.0
>
>
> The `Connect` class can run into issues during start, such as:
> {noformat}
> Exception in thread "main" org.apache.kafka.connect.errors.ConnectException: 
> Could not look up partition metadata for offset backing store topic in 
> allotted period. This could indicate a connectivity issue, unavailable topic 
> partitions, or if this is your first use of the topic it may have taken too 
> long to create.
> at 
> org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:130)
> at 
> org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:85)
> at org.apache.kafka.connect.runtime.Worker.start(Worker.java:108)
> at org.apache.kafka.connect.runtime.Connect.start(Connect.java:56)
> at 
> org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:62)
> {noformat}
> This exception halts the startup process. It also triggers the shutdown 
> hook... which blocks waiting for the service to start up before calling stop. 
> This causes the process to hang forever.
> There's a few things that could be done here, but it would be nice to bound 
> the amount of time the process spends trying to exit gracefully.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2016-06-02 Thread Ben Kirwin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313258#comment-15313258
 ] 

Ben Kirwin commented on KAFKA-2260:
---

Hi! I haven't had time to push on this at all in the last few months, but I'm 
still interested as well.

My understanding was that the core team was very focussed on streams / the 
other features in the 0.10 release, but would be interested in looking at 
coordination proposals after that. (This / idempotent producer / etc.) Does 
anyone think working this up into a github PR seems useful, or is there another 
next best step? The existing KIP is already fairly well fleshed-out, I think: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish

> Allow specifying expected offset on produce
> ---
>
> Key: KAFKA-2260
> URL: https://issues.apache.org/jira/browse/KAFKA-2260
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ben Kirwin
>Assignee: Ewen Cheslack-Postava
>Priority: Minor
> Attachments: KAFKA-2260.patch, expected-offsets.patch
>
>
> I'd like to propose a change that adds a simple CAS-like mechanism to the 
> Kafka producer. This update has a small footprint, but enables a bunch of 
> interesting uses in stream processing or as a commit log for process state.
> h4. Proposed Change
> In short:
> - Allow the user to attach a specific offset to each message produced.
> - The server assigns offsets to messages in the usual way. However, if the 
> expected offset doesn't match the actual offset, the server should fail the 
> produce request instead of completing the write.
> This is a form of optimistic concurrency control, like the ubiquitous 
> check-and-set -- but instead of checking the current value of some state, it 
> checks the current offset of the log.
> h4. Motivation
> Much like check-and-set, this feature is only useful when there's very low 
> contention. Happily, when Kafka is used as a commit log or as a 
> stream-processing transport, it's common to have just one producer (or a 
> small number) for a given partition -- and in many of these cases, predicting 
> offsets turns out to be quite useful.
> - We get the same benefits as the 'idempotent producer' proposal: a producer 
> can retry a write indefinitely and be sure that at most one of those attempts 
> will succeed; and if two producers accidentally write to the end of the 
> partition at once, we can be certain that at least one of them will fail.
> - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
> messages consecutively to a partition, even if the list is much larger than 
> the buffer size or the producer has to be restarted.
> - If a process is using Kafka as a commit log -- reading from a partition to 
> bootstrap, then writing any updates to that same partition -- it can be sure 
> that it's seen all of the messages in that partition at the moment it does 
> its first (successful) write.
> There's a bunch of other similar use-cases here, but they all have roughly 
> the same flavour.
> h4. Implementation
> The major advantage of this proposal over other suggested transaction / 
> idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
> currently-unused field, adds no new APIs, and requires very little new code 
> or additional work from the server.
> - Produced messages already carry an offset field, which is currently ignored 
> by the server. This field could be used for the 'expected offset', with a 
> sigil value for the current behaviour. (-1 is a natural choice, since it's 
> already used to mean 'next available offset'.)
> - We'd need a new error and error code for a 'CAS failure'.
> - The server assigns offsets to produced messages in 
> {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
> changed, this method would assign offsets in the same way -- but if they 
> don't match the offset in the message, we'd return an error instead of 
> completing the write.
> - To avoid breaking existing clients, this behaviour would need to live 
> behind some config flag. (Possibly global, but probably more useful 
> per-topic?)
> I understand all this is unsolicited and possibly strange: happy to answer 
> questions, and if this seems interesting, I'd be glad to flesh this out into 
> a full KIP or patch. (And apologies if this is the wrong venue for this sort 
> of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3777) Extract the LRU cache out of RocksDBStore

2016-06-02 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3777:
-
Description: 
The LRU cache that is currently inside the RocksDbStore class. As part of 
KAFKA-3776 it needs to come outside of RocksDbStore and be a separate component 
used in:

1. KGroupedStream.aggregate() / reduce(), 
2. KStream.aggregateByKey() / reduceByKey(),
3. KTable.to() (this will be done in KAFKA-3779).

As all of the above operators can have a cache on top to deduplicate the 
materialized state store in RocksDB.

The scope of this JIRA is to extract out the cache of RocksDBStore, and keep 
them as item 1) and 2) above; and it should be done together / after KAFKA-3780.

  was:
The LRU cache that is currently inside the RocksDbStore class. As part of 
KAFKA-3776 it needs to come outside of RocksDbStore and be a separate component 
used in:

1. KGroupedStream.aggregate() / reduce(), 
2. KStream.aggregateByKey() / reduceByKey(),
3. KTable.to() (this will be done in KAFKA-3779).

As all of the above operators can have a cache on top to deduplicate the 
materialized state store in RocksDB.

The scope of this JIRA is to extract out the cache of RocksDBStore, and keep 
them as item 1) and 2) above.


> Extract the LRU cache out of RocksDBStore
> -
>
> Key: KAFKA-3777
> URL: https://issues.apache.org/jira/browse/KAFKA-3777
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Eno Thereska
> Fix For: 0.10.1.0
>
>
> The LRU cache that is currently inside the RocksDbStore class. As part of 
> KAFKA-3776 it needs to come outside of RocksDbStore and be a separate 
> component used in:
> 1. KGroupedStream.aggregate() / reduce(), 
> 2. KStream.aggregateByKey() / reduceByKey(),
> 3. KTable.to() (this will be done in KAFKA-3779).
> As all of the above operators can have a cache on top to deduplicate the 
> materialized state store in RocksDB.
> The scope of this JIRA is to extract out the cache of RocksDBStore, and keep 
> them as item 1) and 2) above; and it should be done together / after 
> KAFKA-3780.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3777) Extract the LRU cache out of RocksDBStore

2016-06-02 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3777:
-
Description: 
The LRU cache that is currently inside the RocksDbStore class. As part of 
KAFKA-3776 it needs to come outside of RocksDbStore and be a separate component 
used in:

1. KGroupedStream.aggregate() / reduce(), 
2. KStream.aggregateByKey() / reduceByKey(),
3. KTable.to() (this will be done in KAFKA-3779).

As all of the above operators can have a cache on top to deduplicate the 
materialized state store in RocksDB.

The scope of this JIRA is to extract out the cache of RocksDBStore, and keep 
them as item 1) and 2) above.

  was:
The LRU cache that is currently inside the RocksDbStore class. As part of 
KAFKA-3776 it needs to come outside of RocksDbStore and be a separate component 
used in:

1. KGroupedStream.aggregate() / reduce(), 
2. KStream.aggregateByKey() / reduceByKey(),
3. KTable.to()

As all of the above operators can have a cache on top to deduplicate the 
materialized state store in RocksDB.


> Extract the LRU cache out of RocksDBStore
> -
>
> Key: KAFKA-3777
> URL: https://issues.apache.org/jira/browse/KAFKA-3777
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Eno Thereska
> Fix For: 0.10.1.0
>
>
> The LRU cache that is currently inside the RocksDbStore class. As part of 
> KAFKA-3776 it needs to come outside of RocksDbStore and be a separate 
> component used in:
> 1. KGroupedStream.aggregate() / reduce(), 
> 2. KStream.aggregateByKey() / reduceByKey(),
> 3. KTable.to() (this will be done in KAFKA-3779).
> As all of the above operators can have a cache on top to deduplicate the 
> materialized state store in RocksDB.
> The scope of this JIRA is to extract out the cache of RocksDBStore, and keep 
> them as item 1) and 2) above.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3779) Add the LRU cache for KTable.to() operator

2016-06-02 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3779:
-
Description: The KTable.to operator currently does not use a cache. We can 
add a cache to this operator to deduplicate and reduce data traffic as well. 
This is to be done after KAFKA-3777.  (was: The KTable.to operator currently 
does not use a cache. Add a cache to this operator.)

> Add the LRU cache for KTable.to() operator
> --
>
> Key: KAFKA-3779
> URL: https://issues.apache.org/jira/browse/KAFKA-3779
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Eno Thereska
> Fix For: 0.10.1.0
>
>
> The KTable.to operator currently does not use a cache. We can add a cache to 
> this operator to deduplicate and reduce data traffic as well. This is to be 
> done after KAFKA-3777.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3778) Avoidin using range queries of RocksDBWindowStore on KStream windowed aggregations

2016-06-02 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3778:
-
Summary: Avoidin using range queries of RocksDBWindowStore on KStream 
windowed aggregations  (was: Change the RocksDB implementation for KStream 
windowed aggregations, to not use “range queries” but multiple gets)

> Avoidin using range queries of RocksDBWindowStore on KStream windowed 
> aggregations
> --
>
> Key: KAFKA-3778
> URL: https://issues.apache.org/jira/browse/KAFKA-3778
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Eno Thereska
> Fix For: 0.10.1.0
>
>
> RocksDbWindowStore currently does not use caches, but its window segments 
> implemented as RocksDbStore does. However, its range query {{fetch(key, 
> fromTime, toTime)}} will cause all its touched segments' cache to be flushed. 
> After KAFKA-3777, we should change its implementation for 
> KStreamWindowAggregation / KStreamWindowReduce to not use {{fetch}}, but just 
> as multiple {{get}} calls on the underlying segments, one for each affected 
> window range.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3778) Change the RocksDB implementation for KStream windowed aggregations, to not use “range queries” but multiple gets

2016-06-02 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3778:
-
Description: 
RocksDbWindowStore currently does not use caches, but its window segments 
implemented as RocksDbStore does. However, its range query {{fetch(key, 
fromTime, toTime)}} will cause all its touched segments' cache to be flushed. 

After KAFKA-3777, we should change its implementation for 
KStreamWindowAggregation / KStreamWindowReduce to not use {{fetch}}, but just 
as multiple {{get}} calls on the underlying segments, one for each affected 
window range.

  was:RocksDbWindowStore currently does not use caches (while RocksDbStore 
does). Enable caching for RocksDbWindowStore. This might require changing the 
API from using range queries to using multiple gets(), one for each sub-range.


> Change the RocksDB implementation for KStream windowed aggregations, to not 
> use “range queries” but multiple gets
> -
>
> Key: KAFKA-3778
> URL: https://issues.apache.org/jira/browse/KAFKA-3778
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Eno Thereska
> Fix For: 0.10.1.0
>
>
> RocksDbWindowStore currently does not use caches, but its window segments 
> implemented as RocksDbStore does. However, its range query {{fetch(key, 
> fromTime, toTime)}} will cause all its touched segments' cache to be flushed. 
> After KAFKA-3777, we should change its implementation for 
> KStreamWindowAggregation / KStreamWindowReduce to not use {{fetch}}, but just 
> as multiple {{get}} calls on the underlying segments, one for each affected 
> window range.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1450: HOTFIX: updated JavaDoc example for 0.9 tech-prev ...

2016-06-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1450


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-3689) ERROR Processor got uncaught exception. (kafka.network.Processor)

2016-06-02 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-3689:
---
Attachment: kafka-3689-instrumentation.patch

[~buvana.rama...@nokia.com], our current logging doesn't tell you the port from 
the client when it hit that exception. I added a bit instrumentation to the 
code and attached the change as a patch. Could you apply the patch to the 
0.10.0 branch and run your test to reproduce the issue again? The patched 
version will tell us whether the exception is hit repeatedly by the same client 
(i.e. ip+port).

> ERROR Processor got uncaught exception. (kafka.network.Processor)
> -
>
> Key: KAFKA-3689
> URL: https://issues.apache.org/jira/browse/KAFKA-3689
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 0.9.0.1
> Environment: ubuntu 14.04,
> java version "1.7.0_95"
> OpenJDK Runtime Environment (IcedTea 2.6.4) (7u95-2.6.4-0ubuntu0.14.04.2)
> OpenJDK 64-Bit Server VM (build 24.95-b01, mixed mode)
> 3 broker cluster (all 3 servers identical -  Intel Xeon E5-2670 @2.6GHz, 
> 8cores, 16 threads 64 GB RAM & 1 TB Disk)
> Kafka Cluster is managed by 3 server ZK cluster (these servers are different 
> from Kafka broker servers). All 6 servers are connected via 10G switch. 
> Producers run from external servers.
>Reporter: Buvaneswari Ramanan
>Assignee: Jun Rao
>Priority: Minor
> Fix For: 0.10.1.0, 0.10.0.1
>
> Attachments: kafka-3689-instrumentation.patch
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> As per Ismael Juma's suggestion in email thread to us...@kafka.apache.org 
> with the same subject, I am creating this bug report.
> The following error occurs in one of the brokers in our 3 broker cluster, 
> which serves about 8000 topics. These topics are single partitioned with a 
> replication factor = 3. Each topic gets data at a low rate  – 200 bytes per 
> sec.  Leaders are balanced across the topics.
> Producers run from external servers (4 Ubuntu servers with same config as the 
> brokers), each producing to 2000 topics utilizing kafka-python library.
> This error message occurs repeatedly in one of the servers. Between the hours 
> of 10:30am and 1:30pm on 5/9/16, there were about 10 Million such 
> occurrences. This was right after a cluster restart.
> This is not the first time we got this error in this broker. In those 
> instances, error occurred hours / days after cluster restart.
> =
> [2016-05-09 10:38:43,932] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.IllegalArgumentException: Attempted to decrease connection count 
> for address with no connections, address: /X.Y.Z.144 (actual network address 
> masked)
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:59)
> at kafka.network.ConnectionQuotas.dec(SocketServer.scala:564)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:450)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:445)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at kafka.network.Processor.run(SocketServer.scala:445)
> at java.lang.Thread.run(Thread.java:745)
> [2016-05-09 10:38:43,932] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.IllegalArgumentException: Attempted to decrease connection count 
> for address with no connections, address: /X.Y.Z.144
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:59)
> at kafka.network.ConnectionQuotas.dec(SocketServer.scala:564)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:450)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:445)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at 

Re: [DISCUSS] KIP-62: Allow consumer to send heartbeats from a background thread

2016-06-02 Thread Guozhang Wang
Hi Jason,

With the current usage pattern of:

while(..) {

  consumer.poll(/* where rebalance happens */)

  // process messages
}

--

And since rebalance is till on the caller thread, not the background
thread, if coordinator decides to rebalance while user thread is still on
processing messages, there is no options but we are forced to go with 1)
right? I think the your / Onur's point here, which I agree, is that by
reusing process timeout as rebalance timeout, if the rebalance callback
could take longer time than processing a batch, users need to set the
timeout value to the higher of the two, hence the callback latency, which
will make detection of processing stallness less effective, right?

As I mentioned  in my previous email, I feel that this case of "callback
function time taking loner than processing a batch" would not be frequent
in practice, and the processing timeout would usually be a good higher
bound on the callback function latency. If that is true, I'd suggest we
keep the current proposal and not add a third timeout config for covering
this case.


Guozhang


On Thu, Jun 2, 2016 at 10:40 AM, Jason Gustafson  wrote:

> Hey Guozhang,
>
> I think the problem is that users may not want to sacrifice rebalance
> latency because of uncertainty around processing time. As soon as a
> rebalance begins, there are basically two choices:
>
> 1. Block the rebalance until all consumers have finished their current
> processing.
> 2. Let all consumers rebalance and "rollback" any processing that could not
> be committed before the rebalance completes.
>
> If you choose option (1), then you have an incentive to keep a relatively
> tight bound on process.timeout.ms in order to reduce the worst-case idle
> time during a rebalance. But if you fail to set it high enough, then you'll
> get spurious rebalances during normal processing. I think Onur is saying
> that this still sort of sucks for users. On the other hand, if (2) is
> acceptable, then users will have more freedom to err on the high side when
> setting process.timeout.ms, or even disable it entirely. They will have to
> deal with rolling back any progress which cannot be committed after the
> rebalance completes, but maybe this is less of a problem for some users?
>
> Thanks,
> Jason
>
>
>
> On Wed, Jun 1, 2016 at 10:23 PM, Guozhang Wang  wrote:
>
> > Hi Onur, Jason:
> >
> > Here are some thoughts about reusing process timeout as server-side
> > rebalance timeout: First of all, my understanding is that
> >
> > 1) session timeout is for detecting consumer crash / hard failures (in
> this
> > case the heartbeat thread will be dead as well, hence coordinator
> realized
> > within session timeout value).
> >
> > 2) process timeout is for checking liveness of the user thread that calls
> > the consumer as well as does the processing: when no consumer calls are
> > made within the process timeout, heartbeat thread stop working and hence
> it
> > will be detected by coordinator.
> >
> > 3) a potential server-side rebalance timeout would be used to detect
> > consumer liveness during the rebalance period, in which the user thread
> is
> > tied with the "poll" call and also the callback function, to prevent a
> slow
> > / stalled consumer in their rebalance callback to cause the rebalance
> > taking forever.
> >
> > I think we generally have two cases in practice regarding 3) above: user
> > either does almost nothing and hence should never be stalled (unless
> there
> > is a long GC), or they do various external IOs for maintaining their own
> > states, for example, which could be taking long or even cause the thread
> to
> > stall. We do not need to worry too much about the former case, and as for
> > latter case if the process timeout value should usually be a good higher
> > bound on the rebalance latency.
> >
> > That being said, if we observe that there is indeed a common usage where
> 2)
> > and 3) would require very different timeout values which overwhelms the
> > complexity of three timeout values, we can consider adding a third one
> > then: it is easier to add more configs later.
> >
> >
> > What do you think?
> >
> > Guozhang
> >
> >
> > On Tue, May 31, 2016 at 2:35 PM, Jason Gustafson 
> > wrote:
> >
> > > Hey Onur,
> > >
> > > Thanks for the detailed response. I think the problem of controlling
> > > rebalance times is the main (known) gap in the proposal as it stands.
> > >
> > > This burden goes away if you loosen the liveness property by having a
> > > > required rebalance time and optional processing time where rebalance
> > > > happens in the background thread as stated in the KIP.
> > >
> > >
> > > Just to clarify, the current KIP only allows rebalances to complete in
> > the
> > > foreground. When I suggested above in reply to Grant was that we could
> > add
> > > a separate rebalance timeout setting, the behavior I had in mind was to
> > let
> > > the consumer fall 

[GitHub] kafka pull request #1461: KAFKA-3783: Catch proper exception on path delete

2016-06-02 Thread slaunay
GitHub user slaunay opened a pull request:

https://github.com/apache/kafka/pull/1461

KAFKA-3783: Catch proper exception on path delete

- ZkClient is used for conditional path deletion and wraps 
`KeeperException.BadVersionException` into `ZkBadVersionException`
- add unit test to `SimpleAclAuthorizerTest` to reproduce the issue and 
catch potential future regression

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/slaunay/kafka 
bugfix/KAFKA-3783-zk-conditional-delete-path

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1461.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1461


commit d4668ceed50f634fae764a8bb09983a0f8d363d5
Author: Sebastien Launay 
Date:   2016-06-02T20:32:44Z

KAFKA-3783: Catch proper exception on path delete

ZkClient is used for conditional path deletion and wraps
KeeperException.BadVersionException into ZkBadVersionException




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3783) Race condition on last ACL removal for a resource fails with a ZkBadVersionException

2016-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313063#comment-15313063
 ] 

ASF GitHub Bot commented on KAFKA-3783:
---

GitHub user slaunay opened a pull request:

https://github.com/apache/kafka/pull/1461

KAFKA-3783: Catch proper exception on path delete

- ZkClient is used for conditional path deletion and wraps 
`KeeperException.BadVersionException` into `ZkBadVersionException`
- add unit test to `SimpleAclAuthorizerTest` to reproduce the issue and 
catch potential future regression

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/slaunay/kafka 
bugfix/KAFKA-3783-zk-conditional-delete-path

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1461.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1461


commit d4668ceed50f634fae764a8bb09983a0f8d363d5
Author: Sebastien Launay 
Date:   2016-06-02T20:32:44Z

KAFKA-3783: Catch proper exception on path delete

ZkClient is used for conditional path deletion and wraps
KeeperException.BadVersionException into ZkBadVersionException




> Race condition on last ACL removal for a resource fails with a 
> ZkBadVersionException
> 
>
> Key: KAFKA-3783
> URL: https://issues.apache.org/jira/browse/KAFKA-3783
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Sébastien Launay
>Priority: Minor
>
> When removing the last ACL for a given resource, the znode storing the ACLs 
> will get removed.
> The version number of the znode is used for optimistic locking in a loop to 
> provide atomic changes across brokers.
> Unfortunately the exception thrown when the operation fails because of a 
> different version number is the wrong one 
> ({{KeeperException.BadVersionException}} instead of ZkClient 
> {{ZkBadVersionException}})  and does not get caught resulting in the 
> following stack trace:
> {noformat}
> org.I0Itec.zkclient.exception.ZkBadVersionException: 
> org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = 
> BadVersion for /kafka-acl/Topic/e6df8028-f268-408c-814e-d418e943b2fa
>   at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:51)
>   at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:1000)
>   at org.I0Itec.zkclient.ZkClient.delete(ZkClient.java:1047)
>   at kafka.utils.ZkUtils.conditionalDeletePath(ZkUtils.scala:522)
>   at 
> kafka.security.auth.SimpleAclAuthorizer.kafka$security$auth$SimpleAclAuthorizer$$updateResourceAcls(SimpleAclAuthorizer.scala:282)
>   at 
> kafka.security.auth.SimpleAclAuthorizer$$anonfun$removeAcls$1.apply$mcZ$sp(SimpleAclAuthorizer.scala:187)
>   at 
> kafka.security.auth.SimpleAclAuthorizer$$anonfun$removeAcls$1.apply(SimpleAclAuthorizer.scala:187)
>   at 
> kafka.security.auth.SimpleAclAuthorizer$$anonfun$removeAcls$1.apply(SimpleAclAuthorizer.scala:187)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:239)
>   at 
> kafka.security.auth.SimpleAclAuthorizer.removeAcls(SimpleAclAuthorizer.scala:186)
>   ...
> Caused by: org.apache.zookeeper.KeeperException$BadVersionException: 
> KeeperErrorCode = BadVersion for 
> /kafka-acl/Topic/e6df8028-f268-408c-814e-d418e943b2fa
>   at org.apache.zookeeper.KeeperException.create(KeeperException.java:115)
>   at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
>   at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:873)
>   at org.I0Itec.zkclient.ZkConnection.delete(ZkConnection.java:109)
>   at org.I0Itec.zkclient.ZkClient$11.call(ZkClient.java:1051)
>   at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:990)
>   ... 18 more
> {noformat}
> I noticed this behaviour while working on another fix when running the 
> {{SimpleAclAuthorizerTest}} unit tests but this can happens when running 
> simultaneously the {{kafka-acls.sh}} command on different brokers in rare 
> cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3783) Race condition on last ACL removal for a resource fails with a ZkBadVersionException

2016-06-02 Thread JIRA
Sébastien Launay created KAFKA-3783:
---

 Summary: Race condition on last ACL removal for a resource fails 
with a ZkBadVersionException
 Key: KAFKA-3783
 URL: https://issues.apache.org/jira/browse/KAFKA-3783
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.0.0, 0.9.0.1
Reporter: Sébastien Launay
Priority: Minor


When removing the last ACL for a given resource, the znode storing the ACLs 
will get removed.
The version number of the znode is used for optimistic locking in a loop to 
provide atomic changes across brokers.

Unfortunately the exception thrown when the operation fails because of a 
different version number is the wrong one 
({{KeeperException.BadVersionException}} instead of ZkClient 
{{ZkBadVersionException}})  and does not get caught resulting in the following 
stack trace:
{noformat}
org.I0Itec.zkclient.exception.ZkBadVersionException: 
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = 
BadVersion for /kafka-acl/Topic/e6df8028-f268-408c-814e-d418e943b2fa
at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:51)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:1000)
at org.I0Itec.zkclient.ZkClient.delete(ZkClient.java:1047)
at kafka.utils.ZkUtils.conditionalDeletePath(ZkUtils.scala:522)
at 
kafka.security.auth.SimpleAclAuthorizer.kafka$security$auth$SimpleAclAuthorizer$$updateResourceAcls(SimpleAclAuthorizer.scala:282)
at 
kafka.security.auth.SimpleAclAuthorizer$$anonfun$removeAcls$1.apply$mcZ$sp(SimpleAclAuthorizer.scala:187)
at 
kafka.security.auth.SimpleAclAuthorizer$$anonfun$removeAcls$1.apply(SimpleAclAuthorizer.scala:187)
at 
kafka.security.auth.SimpleAclAuthorizer$$anonfun$removeAcls$1.apply(SimpleAclAuthorizer.scala:187)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:239)
at 
kafka.security.auth.SimpleAclAuthorizer.removeAcls(SimpleAclAuthorizer.scala:186)
...
Caused by: org.apache.zookeeper.KeeperException$BadVersionException: 
KeeperErrorCode = BadVersion for 
/kafka-acl/Topic/e6df8028-f268-408c-814e-d418e943b2fa
at org.apache.zookeeper.KeeperException.create(KeeperException.java:115)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:873)
at org.I0Itec.zkclient.ZkConnection.delete(ZkConnection.java:109)
at org.I0Itec.zkclient.ZkClient$11.call(ZkClient.java:1051)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:990)
... 18 more
{noformat}

I noticed this behaviour while working on another fix when running the 
{{SimpleAclAuthorizerTest}} unit tests but this can happens when running 
simultaneously the {{kafka-acls.sh}} command on different brokers in rare cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3755) tightening the offset check in ReplicaFetcherThread

2016-06-02 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313028#comment-15313028
 ] 

Jun Rao commented on KAFKA-3755:


[~imandhan], we can just add a unit test that calls log.append() with 
assignOffsets=false and a message with an offset smaller than the log end 
offset. The call should fail.

> tightening the offset check in ReplicaFetcherThread
> ---
>
> Key: KAFKA-3755
> URL: https://issues.apache.org/jira/browse/KAFKA-3755
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Ishita Mandhan
>
> Currently, in ReplicaFetcherThread.processPartitionData(), we have the 
> following code to make sure that the fetchOffset matches the log end offset.
>   if (fetchOffset != replica.logEndOffset.messageOffset)
> throw new RuntimeException("Offset mismatch for partition %s: fetched 
> offset = %d, log end offset = %d.".format(topicAndPartition, fetchOffset, 
> replica.logEndOffset.messageOffset))
> It would be useful to further assert that the first offset in the messageSet 
> to be appended to the log is >= than the log end offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Kafka take too long to update the client with metadata when a broker is gone

2016-06-02 Thread safique ahemad
Hi All,

We are using Kafka broker cluster in our data center.
Recently, It is realized that when a Kafka broker goes down then client try
to refresh the metadata but it get stale metadata upto near 30 seconds.

After near 30-35 seconds, updated metadata is obtained by client. This is
really a large time for the client continuously gets send failure for so
long.

Kindly, reply if any configuration may help here or something else or
required.


-- 

Regards,
Safique Ahemad


[jira] [Commented] (KAFKA-3689) ERROR Processor got uncaught exception. (kafka.network.Processor)

2016-06-02 Thread Buvaneswari Ramanan (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312948#comment-15312948
 ] 

Buvaneswari Ramanan commented on KAFKA-3689:


One more scenario under which this occurs (observed once, but not sure if it 
can be recreated):
1) 8000 producers & 16000 consumers, all utilizing kafka-python library
2) all producers go thru abnormal shutdown - easy way to create this scenario:
 a) kill -9
3) start a subset (~1000) of the producers
4) network.Processor error message appears in atleast one of the brokers within 
a few hours



> ERROR Processor got uncaught exception. (kafka.network.Processor)
> -
>
> Key: KAFKA-3689
> URL: https://issues.apache.org/jira/browse/KAFKA-3689
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 0.9.0.1
> Environment: ubuntu 14.04,
> java version "1.7.0_95"
> OpenJDK Runtime Environment (IcedTea 2.6.4) (7u95-2.6.4-0ubuntu0.14.04.2)
> OpenJDK 64-Bit Server VM (build 24.95-b01, mixed mode)
> 3 broker cluster (all 3 servers identical -  Intel Xeon E5-2670 @2.6GHz, 
> 8cores, 16 threads 64 GB RAM & 1 TB Disk)
> Kafka Cluster is managed by 3 server ZK cluster (these servers are different 
> from Kafka broker servers). All 6 servers are connected via 10G switch. 
> Producers run from external servers.
>Reporter: Buvaneswari Ramanan
>Assignee: Jun Rao
>Priority: Minor
> Fix For: 0.10.1.0, 0.10.0.1
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> As per Ismael Juma's suggestion in email thread to us...@kafka.apache.org 
> with the same subject, I am creating this bug report.
> The following error occurs in one of the brokers in our 3 broker cluster, 
> which serves about 8000 topics. These topics are single partitioned with a 
> replication factor = 3. Each topic gets data at a low rate  – 200 bytes per 
> sec.  Leaders are balanced across the topics.
> Producers run from external servers (4 Ubuntu servers with same config as the 
> brokers), each producing to 2000 topics utilizing kafka-python library.
> This error message occurs repeatedly in one of the servers. Between the hours 
> of 10:30am and 1:30pm on 5/9/16, there were about 10 Million such 
> occurrences. This was right after a cluster restart.
> This is not the first time we got this error in this broker. In those 
> instances, error occurred hours / days after cluster restart.
> =
> [2016-05-09 10:38:43,932] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.IllegalArgumentException: Attempted to decrease connection count 
> for address with no connections, address: /X.Y.Z.144 (actual network address 
> masked)
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:59)
> at kafka.network.ConnectionQuotas.dec(SocketServer.scala:564)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:450)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:445)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at kafka.network.Processor.run(SocketServer.scala:445)
> at java.lang.Thread.run(Thread.java:745)
> [2016-05-09 10:38:43,932] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.IllegalArgumentException: Attempted to decrease connection count 
> for address with no connections, address: /X.Y.Z.144
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:59)
> at kafka.network.ConnectionQuotas.dec(SocketServer.scala:564)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:450)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:445)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at 

[jira] [Comment Edited] (KAFKA-3689) ERROR Processor got uncaught exception. (kafka.network.Processor)

2016-06-02 Thread Buvaneswari Ramanan (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312945#comment-15312945
 ] 

Buvaneswari Ramanan edited comment on KAFKA-3689 at 6/2/16 7:53 PM:


All connections are shown to be in ESTABLISHED state.

Here is the scenario under which this arises:
1) 8000 producers & 16000 consumers, all utilizing kafka-python library
2) brokers go thru abnormal shutdown - easy way to create this scenario:
 a) clean shutdown all the zookeepers while brokers are running
 b) await for the following message in broker log: INFO [Kafka Server ], 
shutting down (kafka.server.KafkaServer). 
As you are probably aware, even though they start the shutdown, 
the process takes a while.
&  brokers will continue to be on until zks are back.
 c) now restart zks so that brokers will shutdown eventually
3) now start brokers
4) network.Processor error message appears in atleast one of the brokers within 
a few hours
netstat shows  all connections to be ESTABLISHED at the broker end.



was (Author: buvana.rama...@nokia.com):
All connections are shown to be in ESTABLISHED state.

Here is the scenario under which this arises:
* 8000 producers & 16000 consumers, all utilizing kafka-python library
* brokers go thru abnormal shutdown - easy way to create this scenario:
 * clean shutdown all the zookeepers while brokers are running
 * await for the following message in broker log: INFO [Kafka Server ], 
shutting down (kafka.server.KafkaServer). 
As you are probably aware, even though they start the shutdown, 
the process takes a while.
&  brokers will continue to be on until zks are back.
 * now restart zks so that brokers will shutdown eventually
* now start brokers
* network.Processor error message appears in atleast one of the brokers within 
a few hours
netstat shows  all connections to be ESTABLISHED at the broker end.


> ERROR Processor got uncaught exception. (kafka.network.Processor)
> -
>
> Key: KAFKA-3689
> URL: https://issues.apache.org/jira/browse/KAFKA-3689
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 0.9.0.1
> Environment: ubuntu 14.04,
> java version "1.7.0_95"
> OpenJDK Runtime Environment (IcedTea 2.6.4) (7u95-2.6.4-0ubuntu0.14.04.2)
> OpenJDK 64-Bit Server VM (build 24.95-b01, mixed mode)
> 3 broker cluster (all 3 servers identical -  Intel Xeon E5-2670 @2.6GHz, 
> 8cores, 16 threads 64 GB RAM & 1 TB Disk)
> Kafka Cluster is managed by 3 server ZK cluster (these servers are different 
> from Kafka broker servers). All 6 servers are connected via 10G switch. 
> Producers run from external servers.
>Reporter: Buvaneswari Ramanan
>Assignee: Jun Rao
>Priority: Minor
> Fix For: 0.10.1.0, 0.10.0.1
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> As per Ismael Juma's suggestion in email thread to us...@kafka.apache.org 
> with the same subject, I am creating this bug report.
> The following error occurs in one of the brokers in our 3 broker cluster, 
> which serves about 8000 topics. These topics are single partitioned with a 
> replication factor = 3. Each topic gets data at a low rate  – 200 bytes per 
> sec.  Leaders are balanced across the topics.
> Producers run from external servers (4 Ubuntu servers with same config as the 
> brokers), each producing to 2000 topics utilizing kafka-python library.
> This error message occurs repeatedly in one of the servers. Between the hours 
> of 10:30am and 1:30pm on 5/9/16, there were about 10 Million such 
> occurrences. This was right after a cluster restart.
> This is not the first time we got this error in this broker. In those 
> instances, error occurred hours / days after cluster restart.
> =
> [2016-05-09 10:38:43,932] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.IllegalArgumentException: Attempted to decrease connection count 
> for address with no connections, address: /X.Y.Z.144 (actual network address 
> masked)
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:59)
> at kafka.network.ConnectionQuotas.dec(SocketServer.scala:564)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:450)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:445)
> at 

[jira] [Commented] (KAFKA-3689) ERROR Processor got uncaught exception. (kafka.network.Processor)

2016-06-02 Thread Buvaneswari Ramanan (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312945#comment-15312945
 ] 

Buvaneswari Ramanan commented on KAFKA-3689:


All connections are shown to be in ESTABLISHED state.

Here is the scenario under which this arises:
* 8000 producers & 16000 consumers, all utilizing kafka-python library
* brokers go thru abnormal shutdown - easy way to create this scenario:
 * clean shutdown all the zookeepers while brokers are running
 * await for the following message in broker log: INFO [Kafka Server ], 
shutting down (kafka.server.KafkaServer). 
As you are probably aware, even though they start the shutdown, 
the process takes a while.
&  brokers will continue to be on until zks are back.
 * now restart zks so that brokers will shutdown eventually
* now start brokers
* network.Processor error message appears in atleast one of the brokers within 
a few hours
netstat shows  all connections to be ESTABLISHED at the broker end.


> ERROR Processor got uncaught exception. (kafka.network.Processor)
> -
>
> Key: KAFKA-3689
> URL: https://issues.apache.org/jira/browse/KAFKA-3689
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 0.9.0.1
> Environment: ubuntu 14.04,
> java version "1.7.0_95"
> OpenJDK Runtime Environment (IcedTea 2.6.4) (7u95-2.6.4-0ubuntu0.14.04.2)
> OpenJDK 64-Bit Server VM (build 24.95-b01, mixed mode)
> 3 broker cluster (all 3 servers identical -  Intel Xeon E5-2670 @2.6GHz, 
> 8cores, 16 threads 64 GB RAM & 1 TB Disk)
> Kafka Cluster is managed by 3 server ZK cluster (these servers are different 
> from Kafka broker servers). All 6 servers are connected via 10G switch. 
> Producers run from external servers.
>Reporter: Buvaneswari Ramanan
>Assignee: Jun Rao
>Priority: Minor
> Fix For: 0.10.1.0, 0.10.0.1
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> As per Ismael Juma's suggestion in email thread to us...@kafka.apache.org 
> with the same subject, I am creating this bug report.
> The following error occurs in one of the brokers in our 3 broker cluster, 
> which serves about 8000 topics. These topics are single partitioned with a 
> replication factor = 3. Each topic gets data at a low rate  – 200 bytes per 
> sec.  Leaders are balanced across the topics.
> Producers run from external servers (4 Ubuntu servers with same config as the 
> brokers), each producing to 2000 topics utilizing kafka-python library.
> This error message occurs repeatedly in one of the servers. Between the hours 
> of 10:30am and 1:30pm on 5/9/16, there were about 10 Million such 
> occurrences. This was right after a cluster restart.
> This is not the first time we got this error in this broker. In those 
> instances, error occurred hours / days after cluster restart.
> =
> [2016-05-09 10:38:43,932] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.IllegalArgumentException: Attempted to decrease connection count 
> for address with no connections, address: /X.Y.Z.144 (actual network address 
> masked)
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:59)
> at kafka.network.ConnectionQuotas.dec(SocketServer.scala:564)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:450)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:445)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at kafka.network.Processor.run(SocketServer.scala:445)
> at java.lang.Thread.run(Thread.java:745)
> [2016-05-09 10:38:43,932] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.IllegalArgumentException: Attempted to decrease connection count 
> for address with no connections, address: /X.Y.Z.144
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:59)
> at 

[jira] [Updated] (KAFKA-3782) Transient failure with kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_bounce.clean=True

2016-06-02 Thread Liquan Pei (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liquan Pei updated KAFKA-3782:
--
Description: 
For commit 946ae60

max() arg is an empty sequence
Traceback (most recent call last):
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py",
 line 106, in run_all_tests
data = self.run_single_test()
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py",
 line 162, in run_single_test
return self.current_test_context.function(self.current_test)
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/mark/_mark.py",
 line 331, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py",
 line 321, in test_bounce
sink_seqno_max = max(sink_seqnos)
ValueError: max() arg is an empty sequence

  was:
max() arg is an empty sequence
Traceback (most recent call last):
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py",
 line 106, in run_all_tests
data = self.run_single_test()
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py",
 line 162, in run_single_test
return self.current_test_context.function(self.current_test)
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/mark/_mark.py",
 line 331, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py",
 line 321, in test_bounce
sink_seqno_max = max(sink_seqnos)
ValueError: max() arg is an empty sequence


> Transient failure with 
> kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_bounce.clean=True
> -
>
> Key: KAFKA-3782
> URL: https://issues.apache.org/jira/browse/KAFKA-3782
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Liquan Pei
>Assignee: Ewen Cheslack-Postava
>
> For commit 946ae60
> max() arg is an empty sequence
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py",
>  line 106, in run_all_tests
> data = self.run_single_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py",
>  line 162, in run_single_test
> return self.current_test_context.function(self.current_test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/mark/_mark.py",
>  line 331, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py",
>  line 321, in test_bounce
> sink_seqno_max = max(sink_seqnos)
> ValueError: max() arg is an empty sequence



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3764) Error processing append operation on partition

2016-06-02 Thread Grant Henke (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-3764:
--

Assignee: Grant Henke

> Error processing append operation on partition
> --
>
> Key: KAFKA-3764
> URL: https://issues.apache.org/jira/browse/KAFKA-3764
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Martin Nowak
>Assignee: Grant Henke
>
> After updating Kafka from 0.9.0.1 to 0.10.0.0 I'm getting plenty of `Error 
> processing append operation on partition` errors. This happens with 
> ruby-kafka as producer and enabled snappy compression.
> {noformat}
> [2016-05-27 20:00:11,074] ERROR [Replica Manager on Broker 2]: Error 
> processing append operation on partition m2m-0 (kafka.server.ReplicaManager)
> kafka.common.KafkaException: 
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:159)
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:85)
> at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:64)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:56)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNextOuter(ByteBufferMessageSet.scala:357)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:369)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:324)
> at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:64)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:56)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> at 
> kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:427)
> at kafka.log.Log.liftedTree1$1(Log.scala:339)
> at kafka.log.Log.append(Log.scala:338)
> at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:443)
> at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:429)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
> at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:237)
> at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:406)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:392)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:392)
> at 
> kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:328)
> at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:405)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:76)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: failed to read chunk
> at 
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:433)
> at 
> org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:167)
> at java.io.DataInputStream.readFully(DataInputStream.java:195)
> at java.io.DataInputStream.readLong(DataInputStream.java:416)
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.readMessageFromStream(ByteBufferMessageSet.scala:118)
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:153)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3764) Error processing append operation on partition

2016-06-02 Thread Grant Henke (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312916#comment-15312916
 ] 

Grant Henke commented on KAFKA-3764:


It looks like this is likely caused by 
https://github.com/xerial/snappy-java/issues/142 and is fixed in [snappy-java 
1.1.2.6|http://search.maven.org/#artifactdetails%7Corg.xerial.snappy%7Csnappy-java%7C1.1.2.6%7Cbundle].
 This has also been identified as the cause of 
https://github.com/edenhill/librdkafka/issues/645

I can upgrade to snappy-java 1.1.2.6, test with librdkafka and send a PR.

> Error processing append operation on partition
> --
>
> Key: KAFKA-3764
> URL: https://issues.apache.org/jira/browse/KAFKA-3764
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Martin Nowak
>
> After updating Kafka from 0.9.0.1 to 0.10.0.0 I'm getting plenty of `Error 
> processing append operation on partition` errors. This happens with 
> ruby-kafka as producer and enabled snappy compression.
> {noformat}
> [2016-05-27 20:00:11,074] ERROR [Replica Manager on Broker 2]: Error 
> processing append operation on partition m2m-0 (kafka.server.ReplicaManager)
> kafka.common.KafkaException: 
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:159)
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:85)
> at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:64)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:56)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNextOuter(ByteBufferMessageSet.scala:357)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:369)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:324)
> at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:64)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:56)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> at 
> kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:427)
> at kafka.log.Log.liftedTree1$1(Log.scala:339)
> at kafka.log.Log.append(Log.scala:338)
> at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:443)
> at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:429)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
> at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:237)
> at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:406)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:392)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:392)
> at 
> kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:328)
> at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:405)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:76)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: failed to read chunk
> at 
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:433)
> at 
> org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:167)
> at java.io.DataInputStream.readFully(DataInputStream.java:195)
> at java.io.DataInputStream.readLong(DataInputStream.java:416)
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.readMessageFromStream(ByteBufferMessageSet.scala:118)
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:153)
> {noformat}



--
This message was 

[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped

2016-06-02 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312913#comment-15312913
 ] 

Greg Fodor commented on KAFKA-3758:
---

also, if we did not run at an elevated number of threads, we were hitting that 
issue due to the timeout happening before all tasks had initialized.

> KStream job fails to recover after Kafka broker stopped
> ---
>
> Key: KAFKA-3758
> URL: https://issues.apache.org/jira/browse/KAFKA-3758
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
> Attachments: muon.log.1.gz
>
>
> We've been doing some testing of a fairly complex KStreams job and under load 
> it seems the job fails to rebalance + recover if we shut down one of the 
> kafka brokers. The test we were running had a 3-node kafka cluster where each 
> topic had at least a replication factor of 2, and we terminated one of the 
> nodes.
> Attached is the full log, the root exception seems to be contention on the 
> lock on the state directory. The job continues to try to recover but throws 
> errors relating to locks over and over. Restarting the job itself resolves 
> the problem.
>  1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> creating the state manager
>  1703 at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>  1704 at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>  1705 at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>  1706 at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>  1707 at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>  1708 at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>  1709 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>  1710 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>  1711 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>  1712 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1713 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1714 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>  1715 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1716 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1717 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>  1718 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>  1719 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>  1720 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>  1721 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  1722 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1723 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1724 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>  1725 at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>  1726 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>  1727 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  1728 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>  1729 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>  1730 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
> 

[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped

2016-06-02 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312912#comment-15312912
 ] 

Greg Fodor commented on KAFKA-3758:
---

Hey, we're running 16 threads -- for this job we have 25 topics, approx ~350 
topic-partitions involved, but for most of the job there isn't much I/O against 
most of these. Basically we are taking in a very small % of the incoming data 
at the top of the job and processing it, and discarding most of it early.

> KStream job fails to recover after Kafka broker stopped
> ---
>
> Key: KAFKA-3758
> URL: https://issues.apache.org/jira/browse/KAFKA-3758
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
> Attachments: muon.log.1.gz
>
>
> We've been doing some testing of a fairly complex KStreams job and under load 
> it seems the job fails to rebalance + recover if we shut down one of the 
> kafka brokers. The test we were running had a 3-node kafka cluster where each 
> topic had at least a replication factor of 2, and we terminated one of the 
> nodes.
> Attached is the full log, the root exception seems to be contention on the 
> lock on the state directory. The job continues to try to recover but throws 
> errors relating to locks over and over. Restarting the job itself resolves 
> the problem.
>  1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> creating the state manager
>  1703 at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>  1704 at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>  1705 at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>  1706 at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>  1707 at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>  1708 at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>  1709 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>  1710 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>  1711 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>  1712 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1713 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1714 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>  1715 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1716 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1717 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>  1718 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>  1719 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>  1720 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>  1721 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  1722 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1723 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1724 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>  1725 at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>  1726 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>  1727 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  1728 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>  1729 at 
> 

[jira] [Commented] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped

2016-06-02 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312888#comment-15312888
 ] 

Guozhang Wang commented on KAFKA-3758:
--

Hi Greg, I have a few more questions that will help me investigating this 
issue. Would be appreciated if you can remember the answer:

1. How many threads (i.e. the {{num.stream.threads}} config) did you use per 
each KafkaStreams instance?
2. From your logs it seems you have at least two topic groups, and with at 
least 82 partitions for the largest topic. How many topic-partitions do you 
have in total?

> KStream job fails to recover after Kafka broker stopped
> ---
>
> Key: KAFKA-3758
> URL: https://issues.apache.org/jira/browse/KAFKA-3758
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
> Attachments: muon.log.1.gz
>
>
> We've been doing some testing of a fairly complex KStreams job and under load 
> it seems the job fails to rebalance + recover if we shut down one of the 
> kafka brokers. The test we were running had a 3-node kafka cluster where each 
> topic had at least a replication factor of 2, and we terminated one of the 
> nodes.
> Attached is the full log, the root exception seems to be contention on the 
> lock on the state directory. The job continues to try to recover but throws 
> errors relating to locks over and over. Restarting the job itself resolves 
> the problem.
>  1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> creating the state manager
>  1703 at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>  1704 at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>  1705 at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>  1706 at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>  1707 at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>  1708 at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>  1709 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>  1710 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>  1711 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>  1712 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1713 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1714 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>  1715 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1716 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1717 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>  1718 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>  1719 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>  1720 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>  1721 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  1722 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1723 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1724 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>  1725 at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>  1726 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>  1727 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  1728 at 
> 

[jira] [Created] (KAFKA-3782) Transient failure with kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_bounce.clean=True

2016-06-02 Thread Liquan Pei (JIRA)
Liquan Pei created KAFKA-3782:
-

 Summary: Transient failure with 
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_bounce.clean=True
 Key: KAFKA-3782
 URL: https://issues.apache.org/jira/browse/KAFKA-3782
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 0.10.0.0
Reporter: Liquan Pei
Assignee: Ewen Cheslack-Postava


max() arg is an empty sequence
Traceback (most recent call last):
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py",
 line 106, in run_all_tests
data = self.run_single_test()
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py",
 line 162, in run_single_test
return self.current_test_context.function(self.current_test)
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/mark/_mark.py",
 line 331, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py",
 line 321, in test_bounce
sink_seqno_max = max(sink_seqnos)
ValueError: max() arg is an empty sequence



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3764) Error processing append operation on partition

2016-06-02 Thread Taro L. Saito (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312782#comment-15312782
 ] 

Taro L. Saito commented on KAFKA-3764:
--

Released snappy-java-1.1.2.6, which would resolve this issue. 
https://github.com/xerial/snappy-java/issues/142

> Error processing append operation on partition
> --
>
> Key: KAFKA-3764
> URL: https://issues.apache.org/jira/browse/KAFKA-3764
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Martin Nowak
>
> After updating Kafka from 0.9.0.1 to 0.10.0.0 I'm getting plenty of `Error 
> processing append operation on partition` errors. This happens with 
> ruby-kafka as producer and enabled snappy compression.
> {noformat}
> [2016-05-27 20:00:11,074] ERROR [Replica Manager on Broker 2]: Error 
> processing append operation on partition m2m-0 (kafka.server.ReplicaManager)
> kafka.common.KafkaException: 
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:159)
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:85)
> at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:64)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:56)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNextOuter(ByteBufferMessageSet.scala:357)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:369)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:324)
> at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:64)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:56)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> at 
> kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:427)
> at kafka.log.Log.liftedTree1$1(Log.scala:339)
> at kafka.log.Log.append(Log.scala:338)
> at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:443)
> at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:429)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
> at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:237)
> at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:406)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:392)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:392)
> at 
> kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:328)
> at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:405)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:76)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: failed to read chunk
> at 
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:433)
> at 
> org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:167)
> at java.io.DataInputStream.readFully(DataInputStream.java:195)
> at java.io.DataInputStream.readLong(DataInputStream.java:416)
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.readMessageFromStream(ByteBufferMessageSet.scala:118)
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:153)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-62: Allow consumer to send heartbeats from a background thread

2016-06-02 Thread Jason Gustafson
Hey Guozhang,

I think the problem is that users may not want to sacrifice rebalance
latency because of uncertainty around processing time. As soon as a
rebalance begins, there are basically two choices:

1. Block the rebalance until all consumers have finished their current
processing.
2. Let all consumers rebalance and "rollback" any processing that could not
be committed before the rebalance completes.

If you choose option (1), then you have an incentive to keep a relatively
tight bound on process.timeout.ms in order to reduce the worst-case idle
time during a rebalance. But if you fail to set it high enough, then you'll
get spurious rebalances during normal processing. I think Onur is saying
that this still sort of sucks for users. On the other hand, if (2) is
acceptable, then users will have more freedom to err on the high side when
setting process.timeout.ms, or even disable it entirely. They will have to
deal with rolling back any progress which cannot be committed after the
rebalance completes, but maybe this is less of a problem for some users?

Thanks,
Jason



On Wed, Jun 1, 2016 at 10:23 PM, Guozhang Wang  wrote:

> Hi Onur, Jason:
>
> Here are some thoughts about reusing process timeout as server-side
> rebalance timeout: First of all, my understanding is that
>
> 1) session timeout is for detecting consumer crash / hard failures (in this
> case the heartbeat thread will be dead as well, hence coordinator realized
> within session timeout value).
>
> 2) process timeout is for checking liveness of the user thread that calls
> the consumer as well as does the processing: when no consumer calls are
> made within the process timeout, heartbeat thread stop working and hence it
> will be detected by coordinator.
>
> 3) a potential server-side rebalance timeout would be used to detect
> consumer liveness during the rebalance period, in which the user thread is
> tied with the "poll" call and also the callback function, to prevent a slow
> / stalled consumer in their rebalance callback to cause the rebalance
> taking forever.
>
> I think we generally have two cases in practice regarding 3) above: user
> either does almost nothing and hence should never be stalled (unless there
> is a long GC), or they do various external IOs for maintaining their own
> states, for example, which could be taking long or even cause the thread to
> stall. We do not need to worry too much about the former case, and as for
> latter case if the process timeout value should usually be a good higher
> bound on the rebalance latency.
>
> That being said, if we observe that there is indeed a common usage where 2)
> and 3) would require very different timeout values which overwhelms the
> complexity of three timeout values, we can consider adding a third one
> then: it is easier to add more configs later.
>
>
> What do you think?
>
> Guozhang
>
>
> On Tue, May 31, 2016 at 2:35 PM, Jason Gustafson 
> wrote:
>
> > Hey Onur,
> >
> > Thanks for the detailed response. I think the problem of controlling
> > rebalance times is the main (known) gap in the proposal as it stands.
> >
> > This burden goes away if you loosen the liveness property by having a
> > > required rebalance time and optional processing time where rebalance
> > > happens in the background thread as stated in the KIP.
> >
> >
> > Just to clarify, the current KIP only allows rebalances to complete in
> the
> > foreground. When I suggested above in reply to Grant was that we could
> add
> > a separate rebalance timeout setting, the behavior I had in mind was to
> let
> > the consumer fall out of the group if the timeout is reached while the
> > consumer is still processing. I was specifically trying to avoid moving
> the
> > rebalance to the background thread since this significantly increases the
> > complexity of the implementation. We'd also have to think about
> > compatibility a bit more. For example, what are the implications of
> having
> > the rebalance listener execute in a separate thread?
> >
> > Putting that issue aside, I think we need to convince ourselves that a
> > separate rebalance timeout is really necessary since every new timeout
> adds
> > some conceptual noise which all users will see. My thought in this KIP
> was
> > that users who didn't want the burden of tuning the process timeout could
> > use a relatively large value without a major impact because group
> > rebalances themselves will typically be infrequent. The main concern is
> for
> > users who have highly variant processing times and want to ensure a tight
> > bound on rebalance times (even if it means having to discard some
> > processing that cannot be completed before the rebalance finishes). These
> > users will be left trying to tune process.timeout.ms and
> max.poll.records,
> > which is basically the same position they are currently in. The problem
> is
> > I don't know how common this case is, so I'm not sure how it weighs
> against
> > 

Changing default logger to RollingFileAppender (KAFKA-2394)

2016-06-02 Thread Dustin Cote
Hi all,

I'm looking at changing the Kafka default logging setup to use the
RollingFileAppender instead of the DailyRollingFileAppender in an effort to
accomplish two goals:
1) Avoid filling up users' disks if the log files grow unexpectedly
2) Move off the admittedly unreliable DailyRollingFileAppender

I wanted to know if the community has any feedback around this before
moving forward.  The main drawback with going to the RollingFileAppender is
that the log file names will no longer have timestamps, but instead be of
the form server.log, server.log.1, etc.  What users are depending on the
file name convention and would need to rollback the log4j configuration
should the default change in a later version?  What sort of feedback can
those users provide to help us document this the right way?

Thanks,

-- 
Dustin Cote
confluent.io


[jira] [Assigned] (KAFKA-2394) Use RollingFileAppender by default in log4j.properties

2016-06-02 Thread Dustin Cote (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dustin Cote reassigned KAFKA-2394:
--

Assignee: Dustin Cote  (was: jin xing)

> Use RollingFileAppender by default in log4j.properties
> --
>
> Key: KAFKA-2394
> URL: https://issues.apache.org/jira/browse/KAFKA-2394
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Dustin Cote
>Priority: Minor
>  Labels: newbie
> Attachments: log4j.properties.patch
>
>
> The default log4j.properties bundled with Kafka uses ConsoleAppender and 
> DailyRollingFileAppender, which offer no protection to users from spammy 
> logging. In extreme cases (such as when issues like KAFKA-1461 are 
> encountered), the logs can exhaust the local disk space. This could be a 
> problem for Kafka adoption since new users are less likely to adjust the 
> logging properties themselves, and are more likely to have configuration 
> problems which result in log spam. 
> To fix this, we can use RollingFileAppender, which offers two settings for 
> controlling the maximum space that log files will use.
> maxBackupIndex: how many backup files to retain
> maxFileSize: the max size of each log file
> One question is whether this change is a compatibility concern? The backup 
> strategy and filenames used by RollingFileAppender are different from those 
> used by DailyRollingFileAppender, so any tools which depend on the old format 
> will break. If we think this is a serious problem, one solution would be to 
> provide two versions of log4j.properties and add a flag to enable the new 
> one. Another solution would be to include the RollingFileAppender 
> configuration in the default log4j.properties, but commented out.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2394) Use RollingFileAppender by default in log4j.properties

2016-06-02 Thread Dustin Cote (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312409#comment-15312409
 ] 

Dustin Cote commented on KAFKA-2394:


Thanks [~ewencp].  I went ahead and sent out to users@ and dev@ and hopefully 
the impacted users will speak up :)

> Use RollingFileAppender by default in log4j.properties
> --
>
> Key: KAFKA-2394
> URL: https://issues.apache.org/jira/browse/KAFKA-2394
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: jin xing
>Priority: Minor
>  Labels: newbie
> Attachments: log4j.properties.patch
>
>
> The default log4j.properties bundled with Kafka uses ConsoleAppender and 
> DailyRollingFileAppender, which offer no protection to users from spammy 
> logging. In extreme cases (such as when issues like KAFKA-1461 are 
> encountered), the logs can exhaust the local disk space. This could be a 
> problem for Kafka adoption since new users are less likely to adjust the 
> logging properties themselves, and are more likely to have configuration 
> problems which result in log spam. 
> To fix this, we can use RollingFileAppender, which offers two settings for 
> controlling the maximum space that log files will use.
> maxBackupIndex: how many backup files to retain
> maxFileSize: the max size of each log file
> One question is whether this change is a compatibility concern? The backup 
> strategy and filenames used by RollingFileAppender are different from those 
> used by DailyRollingFileAppender, so any tools which depend on the old format 
> will break. If we think this is a serious problem, one solution would be to 
> provide two versions of log4j.properties and add a flag to enable the new 
> one. Another solution would be to include the RollingFileAppender 
> configuration in the default log4j.properties, but commented out.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3764) Error processing append operation on partition

2016-06-02 Thread Martin Nowak (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312402#comment-15312402
 ] 

Martin Nowak edited comment on KAFKA-3764 at 6/2/16 2:34 PM:
-

Good guess, there is a bug report for snappy-java w/ a similar Kafka stacktrace.
https://github.com/xerial/snappy-java/issues/142


was (Author: dawg):
Good guess, there is a bug report for snappy-java w/ a similar Kafka stacktrace.
[bad handling of the MAGIC HEADER · Issue #142 · 
xerial/snappy-java](https://github.com/xerial/snappy-java/issues/142)

> Error processing append operation on partition
> --
>
> Key: KAFKA-3764
> URL: https://issues.apache.org/jira/browse/KAFKA-3764
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Martin Nowak
>
> After updating Kafka from 0.9.0.1 to 0.10.0.0 I'm getting plenty of `Error 
> processing append operation on partition` errors. This happens with 
> ruby-kafka as producer and enabled snappy compression.
> {noformat}
> [2016-05-27 20:00:11,074] ERROR [Replica Manager on Broker 2]: Error 
> processing append operation on partition m2m-0 (kafka.server.ReplicaManager)
> kafka.common.KafkaException: 
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:159)
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:85)
> at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:64)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:56)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNextOuter(ByteBufferMessageSet.scala:357)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:369)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:324)
> at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:64)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:56)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> at 
> kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:427)
> at kafka.log.Log.liftedTree1$1(Log.scala:339)
> at kafka.log.Log.append(Log.scala:338)
> at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:443)
> at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:429)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
> at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:237)
> at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:406)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:392)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:392)
> at 
> kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:328)
> at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:405)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:76)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: failed to read chunk
> at 
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:433)
> at 
> org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:167)
> at java.io.DataInputStream.readFully(DataInputStream.java:195)
> at java.io.DataInputStream.readLong(DataInputStream.java:416)
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.readMessageFromStream(ByteBufferMessageSet.scala:118)
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:153)
> {noformat}



--
This message 

[jira] [Commented] (KAFKA-3764) Error processing append operation on partition

2016-06-02 Thread Martin Nowak (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312402#comment-15312402
 ] 

Martin Nowak commented on KAFKA-3764:
-

Good guess, there is a bug report for snappy-java w/ a similar Kafka stacktrace.
[bad handling of the MAGIC HEADER · Issue #142 · 
xerial/snappy-java](https://github.com/xerial/snappy-java/issues/142)

> Error processing append operation on partition
> --
>
> Key: KAFKA-3764
> URL: https://issues.apache.org/jira/browse/KAFKA-3764
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Martin Nowak
>
> After updating Kafka from 0.9.0.1 to 0.10.0.0 I'm getting plenty of `Error 
> processing append operation on partition` errors. This happens with 
> ruby-kafka as producer and enabled snappy compression.
> {noformat}
> [2016-05-27 20:00:11,074] ERROR [Replica Manager on Broker 2]: Error 
> processing append operation on partition m2m-0 (kafka.server.ReplicaManager)
> kafka.common.KafkaException: 
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:159)
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:85)
> at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:64)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:56)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNextOuter(ByteBufferMessageSet.scala:357)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:369)
> at 
> kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:324)
> at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:64)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:56)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> at 
> kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:427)
> at kafka.log.Log.liftedTree1$1(Log.scala:339)
> at kafka.log.Log.append(Log.scala:338)
> at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:443)
> at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:429)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
> at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:237)
> at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:406)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:392)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:392)
> at 
> kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:328)
> at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:405)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:76)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: failed to read chunk
> at 
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:433)
> at 
> org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:167)
> at java.io.DataInputStream.readFully(DataInputStream.java:195)
> at java.io.DataInputStream.readLong(DataInputStream.java:416)
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.readMessageFromStream(ByteBufferMessageSet.scala:118)
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:153)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3725) Update documentation with regards to XFS

2016-06-02 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312288#comment-15312288
 ] 

Todd Palino commented on KAFKA-3725:


I'll take a look at this and put together a PR with some updates.

> Update documentation with regards to XFS
> 
>
> Key: KAFKA-3725
> URL: https://issues.apache.org/jira/browse/KAFKA-3725
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Todd Palino
>
> Our documentation currently states that only Ext4 has been tried (by 
> LinkedIn):
> "Ext4 may or may not be the best filesystem for Kafka. Filesystems like XFS 
> supposedly handle locking during fsync better. We have only tried Ext4, 
> though."
> http://kafka.apache.org/documentation.html#ext4
> I think this is no longer true, so we should update the documentation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3725) Update documentation with regards to XFS

2016-06-02 Thread Todd Palino (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Todd Palino reassigned KAFKA-3725:
--

Assignee: Todd Palino

> Update documentation with regards to XFS
> 
>
> Key: KAFKA-3725
> URL: https://issues.apache.org/jira/browse/KAFKA-3725
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Todd Palino
>
> Our documentation currently states that only Ext4 has been tried (by 
> LinkedIn):
> "Ext4 may or may not be the best filesystem for Kafka. Filesystems like XFS 
> supposedly handle locking during fsync better. We have only tried Ext4, 
> though."
> http://kafka.apache.org/documentation.html#ext4
> I think this is no longer true, so we should update the documentation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: KStreams Rewind Offset

2016-06-02 Thread Matthias J. Sax
Hi Mike,

currently, this is not possible. We are already discussing some changes
with regard to reprocess. However, I doubt that going back to a specific
offset of a specific partition will be supported as it would be too
difficult to reset the internal data structures and intermediate results
correctly (also with regard to committing)

What is your exact use case? What kind of feature are you looking for?
We are always interested to get feedback/idea from users.


-Matthias

On 06/01/2016 08:21 PM, Michael D. Coon wrote:
> All,
>   I think it's great that the ProcessorContext offers the partition and 
> offset of the current record being processed; however, it offers no way for 
> me to actually use the information. I would like to be able to rewind to a 
> particular offset on a partition if I needed to. The consumer is also not 
> exposed to me so I couldn't access things directly that way either. Is this 
> in the works or would it interfere with rebalancing/auto-commits?
> Mike
> 
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-02 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312250#comment-15312250
 ] 

Matthias J. Sax commented on KAFKA-3775:


I have some concerns about this:
 1) a KStreams application should process the whole topic and not parts of it 
-- limiting the number of partitions is kinda artificial from my point of view
 2) even if we limit the number of partitions, it is quite random which would 
get processed which not -- I would assume that users would like to have a more 
transparent assignment
 3) last but not least, under the hood we are using the standard Java 
KafkaConsumer: looking at your patch (just briefly), it seems you changed the 
task assignment -- however, this is independent from the partitions assignment 
of the used consumer -- thus, the consumer would still poll all partitions but 
would not be able to assign records for some partitions as the corresponding 
tasks are missing.

Haven elaborated 3) this raises a more complex problem: Right now, KafkaStreams 
relies on Kafka's internal partition assignment. If you want to assign only 
some partitions, we cannot use standard high level Java KafkaConsumer and would 
need to implement an own assignment strategy to allow for partial assignment 
within a consumer group (ie, allow a consumer group that does not assign all 
partitions).
 

> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3560) Kafka is not working on Solaris

2016-06-02 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-3560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312230#comment-15312230
 ] 

Marcus Gründler commented on KAFKA-3560:


With Kafka 0.10.0.0 everything works fine on Solaris with the quick start 
tutorial!

> Kafka is not working on Solaris
> ---
>
> Key: KAFKA-3560
> URL: https://issues.apache.org/jira/browse/KAFKA-3560
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0, 0.9.0.1
> Environment: Solaris 5.10, i386
>Reporter: Marcus Gründler
>Priority: Blocker
> Attachments: controller.log, log-cleaner.log, server.log, 
> state-change.log
>
>
> We are trying to run Kafka 0.9.0.x on Solaris 5.10 but Kafka fails to connect 
> controller to broker at startup. We have no problem running kafka 0.8.2.x on 
> the very same machine. Due to this bug kafka is completely unusable on 
> Solaris.
> We use 1 Broker with the default configuration just as described in the 
> quickstart guide.
> The problem can easily be reproduced by the following steps:
> 1. Download kafka_2.11-0.9.0.1.tgz and unpack.
> 2. Start zookeeper: 
> {noformat}
> bin/zookeeper-server-start.sh config/zookeeper.properties
> {noformat}
> 3. Start kafka: 
> {noformat}
> bin/kafka-server-start.sh config/server.properties
> {noformat}
> 4. Wait 30 seconds
> 5. See timouts in logs/controller.log
> {noformat}
> [2016-04-14 17:01:42,752] WARN [Controller-0-to-broker-0-send-thread], 
> Controller 0's connection to broker Node(0, srvs010.ac.aixigo.de, 9092) was 
> unsuccessful (kafka.controller.RequestSendThread)
> java.net.SocketTimeoutException: Failed to connect within 3 ms
> at 
> kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:228)
> at 
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:172)
> at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:171)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> {noformat}
> We can create topics with:
> {noformat}
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 
> 1 --partitions 1 --topic test
> {noformat}
> And we can list topics with:
> {noformat}
> bin/kafka-topics.sh --list --zookeeper localhost:2181
> {noformat}
> But we can *not* write data into the topic:
> {noformat}
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> {noformat}
> Our environment is:
> {noformat}
> > uname -a
> SunOS serverXXX 5.10 Generic_147441-16 i86pc i386 i86pc Solaris
> > java -version
> java version "1.8.0_45"
> Java(TM) SE Runtime Environment (build 1.8.0_45-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)
> {noformat}
> Filesystems we tested are
> * Local file system UFS
> * Network filesystem NFS
> I will provide log files in a minute.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3101) Optimize Aggregation Outputs

2016-06-02 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-3101:

Assignee: (was: Eno Thereska)

> Optimize Aggregation Outputs
> 
>
> Key: KAFKA-3101
> URL: https://issues.apache.org/jira/browse/KAFKA-3101
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture
> Fix For: 0.10.1.0
>
>
> Today we emit one output record for each incoming message for Table / 
> Windowed Stream Aggregations. For example, say we have a sequence of 
> aggregate outputs computed from the input stream (assuming there is no agg 
> value for this key before):
> V1, V2, V3, V4, V5
> Then the aggregator will output the following sequence of Change oldValue>:
> , , , , 
> where could cost a lot of CPU overhead computing the intermediate results. 
> Instead if we can let the underlying state store to "remember" the last 
> emitted old value, we can reduce the number of emits based on some configs. 
> More specifically, we can add one more field in the KV store engine storing 
> the last emitted old value, which only get updated when we emit to the 
> downstream processor. For example:
> At Beginning: 
> Store: key => empty (no agg values yet)
> V1 computed: 
> Update Both in Store: key => (V1, V1), Emit 
> V2 computed: 
> Update NewValue in Store: key => (V2, V1), No Emit
> V3 computed: 
> Update NewValue in Store: key => (V3, V1), No Emit
> V4 computed: 
> Update Both in Store: key => (V4, V4), Emit 
> V5 computed: 
> Update NewValue in Store: key => (V5, V4), No Emit
> One more thing to consider is that, we need a "closing" time control on the 
> not-yet-emitted keys; when some time has elapsed (or the window is to be 
> closed), we need to check for any key if their current materialized pairs 
> have not been emitted (for example  in the above example). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3101) Optimize Aggregation Outputs

2016-06-02 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312133#comment-15312133
 ] 

Eno Thereska commented on KAFKA-3101:
-

[~h...@pinterest.com], [~bbejeck], I just added a KIP around this since it will 
involve adding a new configuration parameter. Any comments on KIP-63 
appreciated (on the dev mailing list). Thanks.

> Optimize Aggregation Outputs
> 
>
> Key: KAFKA-3101
> URL: https://issues.apache.org/jira/browse/KAFKA-3101
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.10.1.0
>
>
> Today we emit one output record for each incoming message for Table / 
> Windowed Stream Aggregations. For example, say we have a sequence of 
> aggregate outputs computed from the input stream (assuming there is no agg 
> value for this key before):
> V1, V2, V3, V4, V5
> Then the aggregator will output the following sequence of Change oldValue>:
> , , , , 
> where could cost a lot of CPU overhead computing the intermediate results. 
> Instead if we can let the underlying state store to "remember" the last 
> emitted old value, we can reduce the number of emits based on some configs. 
> More specifically, we can add one more field in the KV store engine storing 
> the last emitted old value, which only get updated when we emit to the 
> downstream processor. For example:
> At Beginning: 
> Store: key => empty (no agg values yet)
> V1 computed: 
> Update Both in Store: key => (V1, V1), Emit 
> V2 computed: 
> Update NewValue in Store: key => (V2, V1), No Emit
> V3 computed: 
> Update NewValue in Store: key => (V3, V1), No Emit
> V4 computed: 
> Update Both in Store: key => (V4, V4), Emit 
> V5 computed: 
> Update NewValue in Store: key => (V5, V4), No Emit
> One more thing to consider is that, we need a "closing" time control on the 
> not-yet-emitted keys; when some time has elapsed (or the window is to be 
> closed), we need to check for any key if their current materialized pairs 
> have not been emitted (for example  in the above example). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3780) Add new config cache.max.bytes.buffering to the streams configuration

2016-06-02 Thread Eno Thereska (JIRA)
Eno Thereska created KAFKA-3780:
---

 Summary: Add new config cache.max.bytes.buffering to the streams 
configuration
 Key: KAFKA-3780
 URL: https://issues.apache.org/jira/browse/KAFKA-3780
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Affects Versions: 0.10.1.0
Reporter: Eno Thereska
 Fix For: 0.10.1.0


Add a new configuration cache.max.bytes.buffering to the streams configuration 
options as described in KIP-63 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3779) Add the LRU cache for KTable.to() operator

2016-06-02 Thread Eno Thereska (JIRA)
Eno Thereska created KAFKA-3779:
---

 Summary: Add the LRU cache for KTable.to() operator
 Key: KAFKA-3779
 URL: https://issues.apache.org/jira/browse/KAFKA-3779
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Affects Versions: 0.10.1.0
Reporter: Eno Thereska
 Fix For: 0.10.1.0


The KTable.to operator currently does not use a cache. Add a cache to this 
operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3778) Change the RocksDB implementation for KStream windowed aggregations, to not use “range queries” but multiple gets

2016-06-02 Thread Eno Thereska (JIRA)
Eno Thereska created KAFKA-3778:
---

 Summary: Change the RocksDB implementation for KStream windowed 
aggregations, to not use “range queries” but multiple gets
 Key: KAFKA-3778
 URL: https://issues.apache.org/jira/browse/KAFKA-3778
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Affects Versions: 0.10.1.0
Reporter: Eno Thereska
 Fix For: 0.10.1.0


RocksDbWindowStore currently does not use caches (while RocksDbStore does). 
Enable caching for RocksDbWindowStore. This might require changing the API from 
using range queries to using multiple gets(), one for each sub-range.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3776) Unify store and downstream caching in streams

2016-06-02 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-3776:

Assignee: (was: Guozhang Wang)

> Unify store and downstream caching in streams
> -
>
> Key: KAFKA-3776
> URL: https://issues.apache.org/jira/browse/KAFKA-3776
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Eno Thereska
> Fix For: 0.10.1.0
>
>
> This is an umbrella story for capturing changes to processor caching in 
> Streams as first described in KIP-63. 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3777) Extract the LRU cache out of RocksDBStore

2016-06-02 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-3777:

Assignee: (was: Guozhang Wang)

> Extract the LRU cache out of RocksDBStore
> -
>
> Key: KAFKA-3777
> URL: https://issues.apache.org/jira/browse/KAFKA-3777
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Eno Thereska
> Fix For: 0.10.1.0
>
>
> The LRU cache that is currently inside the RocksDbStore needs to come outside 
> of RocksDbStore and be a separate component used in 
> KGroupedStream.aggregate() / reduce(), and KStream.aggregateByKey() / 
> reduceByKey().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3776) Unify store and downstream caching in streams

2016-06-02 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-3776:

Reviewer: Eno Thereska

> Unify store and downstream caching in streams
> -
>
> Key: KAFKA-3776
> URL: https://issues.apache.org/jira/browse/KAFKA-3776
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Eno Thereska
> Fix For: 0.10.1.0
>
>
> This is an umbrella story for capturing changes to processor caching in 
> Streams as first described in KIP-63. 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3777) Extract the LRU cache out of RocksDBStore

2016-06-02 Thread Eno Thereska (JIRA)
Eno Thereska created KAFKA-3777:
---

 Summary: Extract the LRU cache out of RocksDBStore
 Key: KAFKA-3777
 URL: https://issues.apache.org/jira/browse/KAFKA-3777
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Affects Versions: 0.10.1.0
Reporter: Eno Thereska
Assignee: Guozhang Wang
 Fix For: 0.10.1.0


The LRU cache that is currently inside the RocksDbStore needs to come outside 
of RocksDbStore and be a separate component used in KGroupedStream.aggregate() 
/ reduce(), and KStream.aggregateByKey() / reduceByKey().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3776) Unify store and downstream caching in streams

2016-06-02 Thread Eno Thereska (JIRA)
Eno Thereska created KAFKA-3776:
---

 Summary: Unify store and downstream caching in streams
 Key: KAFKA-3776
 URL: https://issues.apache.org/jira/browse/KAFKA-3776
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Affects Versions: 0.10.1.0
Reporter: Eno Thereska
Assignee: Guozhang Wang
 Fix For: 0.10.1.0


This is an umbrella story for capturing changes to processor caching in Streams 
as first described in KIP-63. 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-02 Thread Eno Thereska
Hi there,

I have created KIP-63: Unify store and downstream caching in streams
https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams
 



Feedback is appreciated.

Thank you
Eno

[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-02 Thread Yuto Kawamura (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311838#comment-15311838
 ] 

Yuto Kawamura commented on KAFKA-3775:
--

[~guozhang] What do you think?

> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-02 Thread Yuto Kawamura (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuto Kawamura updated KAFKA-3775:
-
Status: Patch Available  (was: In Progress)

> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311836#comment-15311836
 ] 

ASF GitHub Bot commented on KAFKA-3775:
---

GitHub user kawamuray opened a pull request:

https://github.com/apache/kafka/pull/1460

KAFKA-3775: Throttle maximum number of tasks assigned to a single 
KafkaStreams

Issue: https://issues.apache.org/jira/browse/KAFKA-3775

POC. Discussion in progress.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kawamuray/kafka KAFKA-3775-throttle-tasks

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1460.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1460


commit fefe259b2c97bb1bbf14b572533ca74348651c0d
Author: Yuto Kawamura 
Date:   2016-06-02T03:46:51Z

MINOR: Add toString() to ClientState for debugging

commit c4f363d32d9a496c0f4b4e66ee846429a2a2eda5
Author: Yuto Kawamura 
Date:   2016-06-02T03:51:34Z

MINOR: Remove meanglessly repeated assertions in unit test

commit 3c173fa5d029277e5d1974c104d7e66939b5cd17
Author: Yuto Kawamura 
Date:   2016-06-02T03:55:10Z

KAFKA-3775: Intorduce new streams configuration max.tasks.assigned

This configuration limits the maximum number of tasks assigned to a single 
KafkaStreams instance.
As a task consists of single partition for more than 1 topic, setting this 
value to lower is useful
to prevent huge number of partitions are assigned to an instance which 
started first.




> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1460: KAFKA-3775: Throttle maximum number of tasks assig...

2016-06-02 Thread kawamuray
GitHub user kawamuray opened a pull request:

https://github.com/apache/kafka/pull/1460

KAFKA-3775: Throttle maximum number of tasks assigned to a single 
KafkaStreams

Issue: https://issues.apache.org/jira/browse/KAFKA-3775

POC. Discussion in progress.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kawamuray/kafka KAFKA-3775-throttle-tasks

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1460.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1460


commit fefe259b2c97bb1bbf14b572533ca74348651c0d
Author: Yuto Kawamura 
Date:   2016-06-02T03:46:51Z

MINOR: Add toString() to ClientState for debugging

commit c4f363d32d9a496c0f4b4e66ee846429a2a2eda5
Author: Yuto Kawamura 
Date:   2016-06-02T03:51:34Z

MINOR: Remove meanglessly repeated assertions in unit test

commit 3c173fa5d029277e5d1974c104d7e66939b5cd17
Author: Yuto Kawamura 
Date:   2016-06-02T03:55:10Z

KAFKA-3775: Intorduce new streams configuration max.tasks.assigned

This configuration limits the maximum number of tasks assigned to a single 
KafkaStreams instance.
As a task consists of single partition for more than 1 topic, setting this 
value to lower is useful
to prevent huge number of partitions are assigned to an instance which 
started first.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Work started] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-02 Thread Yuto Kawamura (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3775 started by Yuto Kawamura.

> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-02 Thread Yuto Kawamura (JIRA)
Yuto Kawamura created KAFKA-3775:


 Summary: Throttle maximum number of tasks assigned to a single 
KafkaStreams
 Key: KAFKA-3775
 URL: https://issues.apache.org/jira/browse/KAFKA-3775
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.0.0
Reporter: Yuto Kawamura
Assignee: Yuto Kawamura
 Fix For: 0.10.1.0


As of today, if I start a Kafka Streams app on a single machine which consists 
of single KafkaStreams instance, that instance gets all partitions of the 
target topic assigned.
As we're using it to process topics which has huge number of partitions and 
message traffic, it is a problem that we don't have a way of throttling the 
maximum amount of partitions assigned to a single instance.

In fact, when we started a Kafka Streams app which consumes a topic which has 
more than 10MB/sec traffic of each partition we saw that all partitions 
assigned to the first instance and soon the app dead by OOM.
I know that there's some workarounds considerable here. for example:

- Start multiple instances at once so the partitions distributed evenly.
  => Maybe works. but as Kafka Streams is a library but not an execution 
framework, there's no predefined procedure of starting Kafka Streams apps so 
some users might wanna take an option to start the first single instance and 
check if it works as expected with lesster number of partitions(I want :p)
- Adjust config parameters such as {{buffered.records.per.partition}}, 
{{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
pressure.
  => Maybe works. but still have two problems IMO:
  - Still leads traffic explosion with high throughput processing as it accepts 
all incoming messages from hundreads of partitions.
  - In the first place, by the distributed system principle, it's wired that 
users don't have a away to control maximum "partitions" assigned to a single 
shard(an instance of KafkaStreams here). Users should be allowed to provide the 
maximum amount of partitions that is considered as possible to be processed 
with single instance(or host).

Here, I'd like to introduce a new configuration parameter 
{{max.tasks.assigned}}, which limits the number of tasks(a notion of partition) 
assigned to the processId(which is the notion of single KafkaStreams instance).
At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
tolerate the incomplete assignment. That is, Kafka Streams should continue 
working for the part of partitions even there are some partitions left 
unassigned, in order to satisfy this> "user may want to take an option to start 
the first single instance and check if it works as expected with lesster number 
of partitions(I want :p)".

I've implemented the rough POC for this. PTAL and if it make sense I will 
continue sophisticating it.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk7 #1333

2016-06-02 Thread Apache Jenkins Server
See 

Changes:

[me] KAFKA-3774: Make 'time' an optional argument of GetOffsetShell

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on H11 (docker Ubuntu ubuntu yahoo-not-h2) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url 
 > https://git-wip-us.apache.org/repos/asf/kafka.git # timeout=10
Fetching upstream changes from https://git-wip-us.apache.org/repos/asf/kafka.git
 > git --version # timeout=10
 > git -c core.askpass=true fetch --tags --progress 
 > https://git-wip-us.apache.org/repos/asf/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/trunk^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/trunk^{commit} # timeout=10
Checking out Revision e6ca328f80a8f7504d740c9f77dfc8b04db20324 
(refs/remotes/origin/trunk)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f e6ca328f80a8f7504d740c9f77dfc8b04db20324
 > git rev-list 107205a7f17a4b1e235089668cb283bf16fbb88a # timeout=10
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK_1_7U51_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk-1.7u51
[kafka-trunk-jdk7] $ /bin/bash -xe /tmp/hudson934689125542867894.sh
+ 
/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2/bin/gradle
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
http://gradle.org/docs/2.4-rc-2/userguide/gradle_daemon.html.
Building project 'core' with Scala version 2.10.6
:downloadWrapper

BUILD SUCCESSFUL

Total time: 14.694 secs
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK_1_7U51_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk-1.7u51
[kafka-trunk-jdk7] $ /bin/bash -xe /tmp/hudson3237632604655878499.sh
+ export GRADLE_OPTS=-Xmx1024m
+ GRADLE_OPTS=-Xmx1024m
+ ./gradlew -Dorg.gradle.project.maxParallelForks=1 clean jarAll testAll
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
https://docs.gradle.org/2.13/userguide/gradle_daemon.html.
Building project 'core' with Scala version 2.10.6
Build file ': 
line 231
useAnt has been deprecated and is scheduled to be removed in Gradle 3.0. The 
Ant-Based Scala compiler is deprecated, please see 
https://docs.gradle.org/current/userguide/scala_plugin.html.
:clean UP-TO-DATE
:clients:clean UP-TO-DATE
:connect:clean UP-TO-DATE
:core:clean
:examples:clean UP-TO-DATE
:log4j-appender:clean UP-TO-DATE
:streams:clean UP-TO-DATE
:tools:clean UP-TO-DATE
:connect:api:clean UP-TO-DATE
:connect:file:clean UP-TO-DATE
:connect:json:clean UP-TO-DATE
:connect:runtime:clean UP-TO-DATE
:streams:examples:clean UP-TO-DATE
:jar_core_2_10
Building project 'core' with Scala version 2.10.6
:kafka-trunk-jdk7:clients:compileJava
:jar_core_2_10 FAILED

FAILURE: Build failed with an exception.

* What went wrong:
Failed to capture snapshot of input files for task 'compileJava' during 
up-to-date check.
> Could not add entry 
> '/home/jenkins/.gradle/caches/modules-2/files-2.1/net.jpountz.lz4/lz4/1.3.0/c708bb2590c0652a642236ef45d9f99ff842a2ce/lz4-1.3.0.jar'
>  to cache fileHashes.bin 
> (

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug 
option to get more log output.

BUILD FAILED

Total time: 17.287 secs
Build step 'Execute shell' marked build as failure
Recording test results
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK_1_7U51_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk-1.7u51
ERROR: Step ?Publish JUnit test result report? failed: No test report files 
were found. Configuration error?
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK_1_7U51_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk-1.7u51


Build failed in Jenkins: kafka-trunk-jdk8 #669

2016-06-02 Thread Apache Jenkins Server
See 

Changes:

[me] KAFKA-3774: Make 'time' an optional argument of GetOffsetShell

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on ubuntu-us1 (Ubuntu ubuntu ubuntu-us golang-ppa) in 
workspace 
 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url 
 > https://git-wip-us.apache.org/repos/asf/kafka.git # timeout=10
Fetching upstream changes from https://git-wip-us.apache.org/repos/asf/kafka.git
 > git --version # timeout=10
 > git -c core.askpass=true fetch --tags --progress 
 > https://git-wip-us.apache.org/repos/asf/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/trunk^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/trunk^{commit} # timeout=10
Checking out Revision e6ca328f80a8f7504d740c9f77dfc8b04db20324 
(refs/remotes/origin/trunk)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f e6ca328f80a8f7504d740c9f77dfc8b04db20324
 > git rev-list 107205a7f17a4b1e235089668cb283bf16fbb88a # timeout=10
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK1_8_0_66_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk1.8.0_66
[kafka-trunk-jdk8] $ /bin/bash -xe /tmp/hudson7056211262860277466.sh
+ 
/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2/bin/gradle
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
http://gradle.org/docs/2.4-rc-2/userguide/gradle_daemon.html.
Building project 'core' with Scala version 2.10.6
:downloadWrapper

BUILD SUCCESSFUL

Total time: 11.38 secs
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK1_8_0_66_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk1.8.0_66
[kafka-trunk-jdk8] $ /bin/bash -xe /tmp/hudson2591719614740900203.sh
+ export GRADLE_OPTS=-Xmx1024m
+ GRADLE_OPTS=-Xmx1024m
+ ./gradlew -Dorg.gradle.project.maxParallelForks=1 clean jarAll testAll
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
https://docs.gradle.org/2.13/userguide/gradle_daemon.html.
Building project 'core' with Scala version 2.10.6
Build file ': 
line 231
useAnt has been deprecated and is scheduled to be removed in Gradle 3.0. The 
Ant-Based Scala compiler is deprecated, please see 
https://docs.gradle.org/current/userguide/scala_plugin.html.
:clean UP-TO-DATE
:clients:clean UP-TO-DATE
:connect:clean UP-TO-DATE
:core:clean
:examples:clean UP-TO-DATE
:log4j-appender:clean UP-TO-DATE
:streams:clean UP-TO-DATE
:tools:clean UP-TO-DATE
:connect:api:clean UP-TO-DATE
:connect:file:clean UP-TO-DATE
:connect:json:clean UP-TO-DATE
:connect:runtime:clean UP-TO-DATE
:streams:examples:clean UP-TO-DATE
:jar_core_2_10
Building project 'core' with Scala version 2.10.6
:kafka-trunk-jdk8:clients:compileJava
:jar_core_2_10 FAILED

FAILURE: Build failed with an exception.

* What went wrong:
Failed to capture snapshot of input files for task 'compileJava' during 
up-to-date check.
> Could not add entry 
> '/home/jenkins/.gradle/caches/modules-2/files-2.1/net.jpountz.lz4/lz4/1.3.0/c708bb2590c0652a642236ef45d9f99ff842a2ce/lz4-1.3.0.jar'
>  to cache fileHashes.bin 
> (

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug 
option to get more log output.

BUILD FAILED

Total time: 10.254 secs
Build step 'Execute shell' marked build as failure
Recording test results
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK1_8_0_66_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk1.8.0_66
ERROR: Step ‘Publish JUnit test result report’ failed: No test report files 
were found. Configuration error?
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
Setting 
JDK1_8_0_66_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk1.8.0_66