Build failed in Jenkins: kafka-trunk-jdk7 #1113

2016-03-13 Thread Apache Jenkins Server
See 

Changes:

[harsha] KAFKA-2551; Update Unclean leader election docs

--
[...truncated 2974 lines...]

kafka.log.LogTest > testTruncateTo PASSED

kafka.log.LogTest > testCleanShutdownFile PASSED

kafka.log.OffsetIndexTest > lookupExtremeCases PASSED

kafka.log.OffsetIndexTest > appendTooMany PASSED

kafka.log.OffsetIndexTest > randomLookupTest PASSED

kafka.log.OffsetIndexTest > testReopen PASSED

kafka.log.OffsetIndexTest > appendOutOfOrder PASSED

kafka.log.OffsetIndexTest > truncate PASSED

kafka.log.LogSegmentTest > testRecoveryWithCorruptMessage PASSED

kafka.log.LogSegmentTest > testRecoveryFixesCorruptIndex PASSED

kafka.log.LogSegmentTest > testReadFromGap PASSED

kafka.log.LogSegmentTest > testTruncate PASSED

kafka.log.LogSegmentTest > testReadBeforeFirstOffset PASSED

kafka.log.LogSegmentTest > testCreateWithInitFileSizeAppendMessage PASSED

kafka.log.LogSegmentTest > testChangeFileSuffixes PASSED

kafka.log.LogSegmentTest > testMaxOffset PASSED

kafka.log.LogSegmentTest > testNextOffsetCalculation PASSED

kafka.log.LogSegmentTest > testReadOnEmptySegment PASSED

kafka.log.LogSegmentTest > testReadAfterLast PASSED

kafka.log.LogSegmentTest > testCreateWithInitFileSizeClearShutdown PASSED

kafka.log.LogSegmentTest > testTruncateFull PASSED

kafka.log.CleanerTest > testBuildOffsetMap PASSED

kafka.log.CleanerTest > testSegmentGrouping PASSED

kafka.log.CleanerTest > testCleanSegmentsWithAbort PASSED

kafka.log.CleanerTest > testSegmentGroupingWithSparseOffsets PASSED

kafka.log.CleanerTest > testRecoveryAfterCrash PASSED

kafka.log.CleanerTest > testLogToClean PASSED

kafka.log.CleanerTest > testCleaningWithDeletes PASSED

kafka.log.CleanerTest > testCleanSegments PASSED

kafka.log.CleanerTest > testCleaningWithUnkeyedMessages PASSED

kafka.log.OffsetMapTest > testClear PASSED

kafka.log.OffsetMapTest > testBasicValidation PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testHeartbeatWrongCoordinator 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testDescribeGroupStable PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testHeartbeatIllegalGeneration 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testDescribeGroupWrongCoordinator PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testDescribeGroupRebalancing 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testLeaderFailureInSyncGroup 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testGenerationIdIncrementsOnRebalance PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testSyncGroupFromIllegalGeneration PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testInvalidGroupId PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testHeartbeatUnknownGroup 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testListGroupsIncludesStableGroups PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testHeartbeatDuringRebalanceCausesRebalanceInProgress PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupInconsistentGroupProtocol PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupSessionTimeoutTooLarge PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupSessionTimeoutTooSmall PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testSyncGroupEmptyAssignment 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testCommitOffsetWithDefaultGeneration PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupFromUnchangedLeaderShouldRebalance PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testHeartbeatRebalanceInProgress PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testLeaveGroupUnknownGroup 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testListGroupsIncludesRebalancingGroups PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testSyncGroupFollowerAfterLeader PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testCommitOffsetInAwaitingSync 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testJoinGroupWrongCoordinator 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupUnknownConsumerExistingGroup PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testSyncGroupFromUnknownGroup 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupInconsistentProtocolType PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testCommitOffsetFromUnknownGroup PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testLeaveGroupWrongCoordinator 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testLeaveGroupUnknownConsumerExistingGroup PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupUnknownConsumerNewGroup PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupFromUnchangedFollowerDoesNotRebalance PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 

Build failed in Jenkins: kafka-trunk-jdk8 #441

2016-03-13 Thread Apache Jenkins Server
See 

Changes:

[harsha] KAFKA-2551; Update Unclean leader election docs

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on ubuntu-us1 (Ubuntu ubuntu ubuntu-us golang-ppa) in 
workspace 
 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url 
 > https://git-wip-us.apache.org/repos/asf/kafka.git # timeout=10
Fetching upstream changes from https://git-wip-us.apache.org/repos/asf/kafka.git
 > git --version # timeout=10
 > git -c core.askpass=true fetch --tags --progress 
 > https://git-wip-us.apache.org/repos/asf/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/trunk^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/trunk^{commit} # timeout=10
Checking out Revision ac7b2e95d342972e3499d203bc23e1675e90c591 
(refs/remotes/origin/trunk)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f ac7b2e95d342972e3499d203bc23e1675e90c591
 > git rev-list c9311d5f4ec3b135cb6c0f87008da946863daaa2 # timeout=10
Setting 
JDK1_8_0_45_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk1.8.0_45
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
[kafka-trunk-jdk8] $ /bin/bash -xe /tmp/hudson5488454698118872084.sh
+ 
/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2/bin/gradle
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
http://gradle.org/docs/2.4-rc-2/userguide/gradle_daemon.html.
Building project 'core' with Scala version 2.10.6
:downloadWrapper

BUILD SUCCESSFUL

Total time: 15.423 secs
Setting 
JDK1_8_0_45_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk1.8.0_45
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
[kafka-trunk-jdk8] $ /bin/bash -xe /tmp/hudson2053046211844560437.sh
+ export GRADLE_OPTS=-Xmx1024m
+ GRADLE_OPTS=-Xmx1024m
+ ./gradlew -Dorg.gradle.project.maxParallelForks=1 clean jarAll testAll
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
https://docs.gradle.org/2.11/userguide/gradle_daemon.html.
Building project 'core' with Scala version 2.10.6
:clean UP-TO-DATE
:clients:clean UP-TO-DATE
:connect:clean UP-TO-DATE
:core:clean UP-TO-DATE
:examples:clean UP-TO-DATE
:log4j-appender:clean UP-TO-DATE
:streams:clean UP-TO-DATE
:tools:clean UP-TO-DATE
:connect:api:clean UP-TO-DATE
:connect:file:clean UP-TO-DATE
:connect:json:clean UP-TO-DATE
:connect:runtime:clean UP-TO-DATE
:streams:examples:clean UP-TO-DATE
:jar_core_2_10
Building project 'core' with Scala version 2.10.6
:kafka-trunk-jdk8:clients:compileJava
:jar_core_2_10 FAILED

FAILURE: Build failed with an exception.

* What went wrong:
Failed to capture snapshot of input files for task 'compileJava' during 
up-to-date check.
> Could not add entry 
> '/home/jenkins/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-api/1.7.18/b631d286463ced7cc42ee2171fe3beaed2836823/slf4j-api-1.7.18.jar'
>  to cache fileHashes.bin 
> (

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug 
option to get more log output.

BUILD FAILED

Total time: 12.178 secs
Build step 'Execute shell' marked build as failure
Recording test results
Setting 
JDK1_8_0_45_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk1.8.0_45
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
ERROR: Step ‘Publish JUnit test result report’ failed: No test report files 
were found. Configuration error?
Setting 
JDK1_8_0_45_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk1.8.0_45
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2


[jira] [Commented] (KAFKA-2551) Unclean leader election docs outdated

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192703#comment-15192703
 ] 

ASF GitHub Bot commented on KAFKA-2551:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1054


> Unclean leader election docs outdated
> -
>
> Key: KAFKA-2551
> URL: https://issues.apache.org/jira/browse/KAFKA-2551
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Affects Versions: 0.8.2.2
>Reporter: Stevo Slavic
>Assignee: Manikumar Reddy
>Priority: Trivial
>  Labels: documentation, newbie
> Fix For: 0.10.0.0
>
>
> Current unclean leader election docs state:
> {quote}
> In the future, we would like to make this configurable to better support use 
> cases where downtime is preferable to inconsistency.
> {quote}
> Since 0.8.2.0, unclean leader election strategy (whether to allow it or not) 
> is already configurable via {{unclean.leader.election.enable}} broker config 
> property.
> That sentence is in both 
> https://svn.apache.org/repos/asf/kafka/site/083/design.html and 
> https://svn.apache.org/repos/asf/kafka/site/082/design.html near the end of 
> "Unclean leader election: What if they all die?" section. Next section, 
> "Availability and Durability Guarantees", mentions ability to disable unclean 
> leader election, so likely just this one reference needs to be updated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: KAFKA-2551: Update Unclean leader election doc...

2016-03-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1054


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request: Update design.html

2016-03-13 Thread stevejs
GitHub user stevejs opened a pull request:

https://github.com/apache/kafka/pull/1057

Update design.html

Fix typo.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/stevejs/kafka patch-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1057.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1057


commit c421522913941a87389d165c02ef2e6f369cdc4f
Author: Steve Sharp 
Date:   2016-03-14T02:06:53Z

Update design.html

Fix typo




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3334) First message on new topic not actually being sent, no exception thrown

2016-03-13 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192592#comment-15192592
 ] 

Ismael Juma commented on KAFKA-3334:


It doesn't seem to me that documenting this is enough, we should not be failing 
silently.

> First message on new topic not actually being sent, no exception thrown
> ---
>
> Key: KAFKA-3334
> URL: https://issues.apache.org/jira/browse/KAFKA-3334
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
> Environment: Linux, Java
>Reporter: Aleksandar Stojadinovic
>Assignee: Ashish K Singh
> Fix For: 0.10.0.0
>
>
> Although I've seen this issue pop around the internet in a few forms, I'm not 
> sure it is yet properly fixed. 
> When publishing to a new topic, with auto create-enabled, the java client 
> (0.9.0) shows this WARN message in the log, and the message is not sent 
> obviously:
> org.apache.kafka.clients.NetworkClient - Error while fetching metadata with 
> correlation id 0 : {file.topic=LEADER_NOT_AVAILABLE}
> In the meantime I see in the console the message that a log for partition is 
> created. The next messages are patched through normally, but the first one is 
> never sent. No exception is ever thrown, either by calling get on the future, 
> or with the async usage, like everything is perfect.
> I notice when I leave my application blocked on the get call, in the 
> debugger, then the message may be processed, but with significant delay. This 
> is consistent with another issue I found for the python client. Also, if I 
> call partitionsFor previously, the topic is created and the message is sent. 
> But it seems silly to call it every time, just to mitigate this issue.
> {code}
> Future recordMetadataFuture = producer.send(new 
> ProducerRecord<>(topic, key, file));
> RecordMetadata recordMetadata = recordMetadataFuture.get(30, 
> TimeUnit.SECONDS);
> {code}
> I hope I'm clear enough.
> Related similar (but not same) issues:
> https://issues.apache.org/jira/browse/KAFKA-1124
> https://github.com/dpkp/kafka-python/issues/150
> http://stackoverflow.com/questions/35187933/how-to-resolve-leader-not-available-kafka-error-when-trying-to-consume



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-2464) Client-side assignment and group generalization

2016-03-13 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192580#comment-15192580
 ] 

Jiangjie Qin edited comment on KAFKA-2464 at 3/14/16 12:09 AM:
---

[~hachikuji] I just noticed that in KafkaConfig, we have the following 
inconsistency in the variable names regarding min/max group session timeout.
{code}
  .define(GroupMinSessionTimeoutMsProp, INT, 
Defaults.ConsumerMinSessionTimeoutMs, MEDIUM, ConsumerMinSessionTimeoutMsDoc)
  .define(GroupMaxSessionTimeoutMsProp, INT, 
Defaults.ConsumerMaxSessionTimeoutMs, MEDIUM, ConsumerMaxSessionTimeoutMsDoc)
{code}
Was this intended?


was (Author: becket_qin):
[~hachikuji] I just noticed that in KafkaConfig, we have the following 
inconsistency in the variable names regarding min/max group session timeout.
{code}
.define(GroupMinSessionTimeoutMsProp, INT, 
Defaults.ConsumerMinSessionTimeoutMs, MEDIUM, ConsumerMinSessionTimeoutMsDoc)
  .define(GroupMaxSessionTimeoutMsProp, INT, 
Defaults.ConsumerMaxSessionTimeoutMs, MEDIUM, ConsumerMaxSessionTimeoutMsDoc)
{code}
Was this intended?

> Client-side assignment and group generalization
> ---
>
> Key: KAFKA-2464
> URL: https://issues.apache.org/jira/browse/KAFKA-2464
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 0.9.0.0
>
>
> Add support for client-side assignment and generalization of join group 
> protocol as documented here: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2464) Client-side assignment and group generalization

2016-03-13 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192580#comment-15192580
 ] 

Jiangjie Qin commented on KAFKA-2464:
-

[~hachikuji] I just noticed that in KafkaConfig, we have the following 
inconsistency in the variable names regarding min/max group session timeout.
{code}
.define(GroupMinSessionTimeoutMsProp, INT, 
Defaults.ConsumerMinSessionTimeoutMs, MEDIUM, ConsumerMinSessionTimeoutMsDoc)
  .define(GroupMaxSessionTimeoutMsProp, INT, 
Defaults.ConsumerMaxSessionTimeoutMs, MEDIUM, ConsumerMaxSessionTimeoutMsDoc)
{code}
Was this intended?

> Client-side assignment and group generalization
> ---
>
> Key: KAFKA-2464
> URL: https://issues.apache.org/jira/browse/KAFKA-2464
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 0.9.0.0
>
>
> Add support for client-side assignment and generalization of join group 
> protocol as documented here: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3388) Producer should only timeout a batch in the accumulator when metadata is missing.

2016-03-13 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-3388:

Reviewer: Jun Rao

> Producer should only timeout a batch in the accumulator when metadata is 
> missing.
> -
>
> Key: KAFKA-3388
> URL: https://issues.apache.org/jira/browse/KAFKA-3388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> In KIP-19 we are reusing the request.timeout.ms to timeout the batches in the 
> accumulator. We were intended to avoid the case that the batches sitting in 
> the accumulator forever when topic metadata is missing.
> Currently we are not checking if metadata is available or not when we timeout 
> the batches in the accumulator (although the comments says we will check the 
> metadata). This causes problem that once the previous batch hit a request 
> timeout and got retried, all the subsequent batches will fail with timeout 
> exception. We should only timeout the batches in the accumulator when the 
> metadata of the partition is missing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: KAFKA-3388: Fix expiration of batches sitting ...

2016-03-13 Thread becketqin
GitHub user becketqin opened a pull request:

https://github.com/apache/kafka/pull/1056

KAFKA-3388: Fix expiration of batches sitting in the accumulator



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/becketqin/kafka KAFKA-3388

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1056.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1056


commit 7561b81254d4361122c3467b741f98f37a769b29
Author: Jiangjie Qin 
Date:   2016-03-13T23:57:52Z

KAFKA-3388: Fix expiration of batches sitting in the accumulator




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3388) Producer should only timeout a batch in the accumulator when metadata is missing.

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192579#comment-15192579
 ] 

ASF GitHub Bot commented on KAFKA-3388:
---

GitHub user becketqin opened a pull request:

https://github.com/apache/kafka/pull/1056

KAFKA-3388: Fix expiration of batches sitting in the accumulator



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/becketqin/kafka KAFKA-3388

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1056.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1056


commit 7561b81254d4361122c3467b741f98f37a769b29
Author: Jiangjie Qin 
Date:   2016-03-13T23:57:52Z

KAFKA-3388: Fix expiration of batches sitting in the accumulator




> Producer should only timeout a batch in the accumulator when metadata is 
> missing.
> -
>
> Key: KAFKA-3388
> URL: https://issues.apache.org/jira/browse/KAFKA-3388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> In KIP-19 we are reusing the request.timeout.ms to timeout the batches in the 
> accumulator. We were intended to avoid the case that the batches sitting in 
> the accumulator forever when topic metadata is missing.
> Currently we are not checking if metadata is available or not when we timeout 
> the batches in the accumulator (although the comments says we will check the 
> metadata). This causes problem that once the previous batch hit a request 
> timeout and got retried, all the subsequent batches will fail with timeout 
> exception. We should only timeout the batches in the accumulator when the 
> metadata of the partition is missing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3388) Producer should only timeout a batch in the accumulator when metadata is missing.

2016-03-13 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-3388:

Status: Patch Available  (was: Open)

> Producer should only timeout a batch in the accumulator when metadata is 
> missing.
> -
>
> Key: KAFKA-3388
> URL: https://issues.apache.org/jira/browse/KAFKA-3388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> In KIP-19 we are reusing the request.timeout.ms to timeout the batches in the 
> accumulator. We were intended to avoid the case that the batches sitting in 
> the accumulator forever when topic metadata is missing.
> Currently we are not checking if metadata is available or not when we timeout 
> the batches in the accumulator (although the comments says we will check the 
> metadata). This causes problem that once the previous batch hit a request 
> timeout and got retried, all the subsequent batches will fail with timeout 
> exception. We should only timeout the batches in the accumulator when the 
> metadata of the partition is missing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: KStreams Proposal

2016-03-13 Thread Bill Bejeck
Hi Guozhang,

Possibly, but the use case I'm working with  is having a of collector
object , for aggregate statistics for example, that would output results
intermittently (via punctuate).

The issue for me is that 'transform(..)'  returns a key-value pair for each
message, possibly of a different type.

I've achieved something similar in the KStream api using the form of
 map(...).aggregateByKey().to(...)  but using that approach I need to
map each message to an intermediate form and do the periodic aggregations
of "stats" objects.

What I'd really like is a way to attach a sink to a processor.

With that in mind, instead of introducing a "proccessTo" method, another
option could to change the return type of "process" from void to
KStream.

Then the use case becomes 'process(..).to(...)', similar to
'transform(..).to(..).

I've made those changes locally and everything compiles fine and running my
simple drive program achieves the desired results.

I know I could be splitting hairs here,  but in my opinion, it would be
nice to have.

Thanks for your time!

Bill


On Sun, Mar 13, 2016 at 4:28 PM, Guozhang Wang  wrote:

> Hello Bill,
>
> We added transform() together with process() to support any user-customized
> stateful processor that can still concatenate to another KStream.
>
> So for your case, would `transform(...).to(topic)` provide the same
> functionality as "processTo(topic, ...)"?
>
> Guozhang
>
>
> On Sat, Mar 12, 2016 at 12:20 PM, Bill Bejeck  wrote:
>
> > Hi All,
> >
> > While working with KStream/KStreamImp I discovered that there does not
> seem
> > to be any way to connect the results of the KStream.process method with a
> > sink node.
> >
> > I'd like to propose an addition to the API a "processTo" method.
> >
> > I've looked at and used the "transform", "reduceByKey" and
> "aggregateByKey"
> >  methods, but "processTo" would work like a more general purpose
> collector
> > terminating the KStream and allow for writing out results to an arbitrary
> > topic (regardless of key type).
> >
> >
> >  I've done a quick prototype and some  initial testing locally on my
> fork.
> > If you think this could be useful I can add unit tests and create a PR.
> > I've included the proposed code changes and the test driver code below
> >
> >
> > KStream.java additions
> >
> > void processTo(String topic,  ProcessorSupplier processorSupplier,
> > String... stateStoreNames);
> >
> > void processTo(String topic, ProcessorSupplier processorSupplier,
> >  Serializer keySerializer, Serializer valSerializer, String...
> > stateStoreNames);
> >
> >
> > KStreamImpl.java additions
> >
> >  @Override
> > public void processTo(String topic, ProcessorSupplier
> > processorSupplier,  String... stateStoreNames) {
> > processTo(topic, processorSupplier, null, null, stateStoreNames);
> > }
> >
> > @SuppressWarnings("unchecked")
> > @Override
> > public void processTo(String topic,ProcessorSupplier
> > processorSupplier,  Serializer keySerializer, Serializer
> > valSerializer, String... stateStoreNames) {
> > String processorName = topology.newName(PROCESSOR_NAME);
> > String sinkName = topology.newName(SINK_NAME);
> > StreamPartitioner streamPartitioner = null;
> >
> > if (keySerializer != null && keySerializer instanceof
> > WindowedSerializer) {
> > WindowedSerializer windowedSerializer =
> > (WindowedSerializer) keySerializer;
> > streamPartitioner = (StreamPartitioner) new
> > WindowedStreamPartitioner(windowedSerializer);
> > }
> >
> > topology.addProcessor(processorName, processorSupplier,
> this.name
> > );
> > topology.addSink(sinkName,topic, keySerializer, valSerializer,
> > streamPartitioner, processorName);
> > topology.connectProcessorAndStateStores(processorName,
> > stateStoreNames);
> > }
> >
> >
> > Test Driver
> >
> > public class TestDriver {
> >
> > public static void main(String[] args) {
> > StreamsConfig config = new StreamsConfig(getProperties());
> > KStreamBuilder kStreamBuilder = new KStreamBuilder();
> >
> > KStream transactionKStream =
> >  kStreamBuilder.stream("input");
> >
> > transactionKStream.processTo("output",UpperCaseProcessor::new);
> >
> > System.out.println("Starting process-to Example");
> > KafkaStreams kafkaStreams = new
> > KafkaStreams(kStreamBuilder,config);
> > kafkaStreams.start();
> > System.out.println("Now started process-to Example");
> > }
> >
> > private static class UpperCaseProcessor extends
> > AbstractProcessor {
> > @Override
> > public void process(String key, String value) {
> > context().forward(key, value.toUpperCase());
> > context().commit();
> > }
> > }
> >
> > private static Properties 

[jira] [Commented] (KAFKA-3388) Producer should only timeout a batch in the accumulator when metadata is missing.

2016-03-13 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192539#comment-15192539
 ] 

Jiangjie Qin commented on KAFKA-3388:
-

[~mgharat] Can you take a look and see if I missed something, If a batch is got 
retried and re-enqueued, it looks it will be expired almost immediately because 
{{batch.maybeExpire()}} will return true.

> Producer should only timeout a batch in the accumulator when metadata is 
> missing.
> -
>
> Key: KAFKA-3388
> URL: https://issues.apache.org/jira/browse/KAFKA-3388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> In KIP-19 we are reusing the request.timeout.ms to timeout the batches in the 
> accumulator. We were intended to avoid the case that the batches sitting in 
> the accumulator forever when topic metadata is missing.
> Currently we are not checking if metadata is available or not when we timeout 
> the batches in the accumulator (although the comments says we will check the 
> metadata). This causes problem that once the previous batch hit a request 
> timeout and got retried, all the subsequent batches will fail with timeout 
> exception. We should only timeout the batches in the accumulator when the 
> metadata of the partition is missing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3388) Producer should only timeout a batch in the accumulator when metadata is missing.

2016-03-13 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192536#comment-15192536
 ] 

Ismael Juma commented on KAFKA-3388:


Perfect, thanks.

> Producer should only timeout a batch in the accumulator when metadata is 
> missing.
> -
>
> Key: KAFKA-3388
> URL: https://issues.apache.org/jira/browse/KAFKA-3388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> In KIP-19 we are reusing the request.timeout.ms to timeout the batches in the 
> accumulator. We were intended to avoid the case that the batches sitting in 
> the accumulator forever when topic metadata is missing.
> Currently we are not checking if metadata is available or not when we timeout 
> the batches in the accumulator (although the comments says we will check the 
> metadata). This causes problem that once the previous batch hit a request 
> timeout and got retried, all the subsequent batches will fail with timeout 
> exception. We should only timeout the batches in the accumulator when the 
> metadata of the partition is missing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3388) Producer should only timeout a batch in the accumulator when metadata is missing.

2016-03-13 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192535#comment-15192535
 ] 

Jiangjie Qin commented on KAFKA-3388:
-

[~ijuma] I am currently blocked on this issue and working on this. Besides the 
issue in reported in this ticket, the batch expiration checking code seems not 
following the design in KIP-19. We are supposed to let each retry have a 
separate request timeout, but the code doesn't seem doing so. I will submit the 
patch shortly.

> Producer should only timeout a batch in the accumulator when metadata is 
> missing.
> -
>
> Key: KAFKA-3388
> URL: https://issues.apache.org/jira/browse/KAFKA-3388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Jiangjie Qin
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> In KIP-19 we are reusing the request.timeout.ms to timeout the batches in the 
> accumulator. We were intended to avoid the case that the batches sitting in 
> the accumulator forever when topic metadata is missing.
> Currently we are not checking if metadata is available or not when we timeout 
> the batches in the accumulator (although the comments says we will check the 
> metadata). This causes problem that once the previous batch hit a request 
> timeout and got retried, all the subsequent batches will fail with timeout 
> exception. We should only timeout the batches in the accumulator when the 
> metadata of the partition is missing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3388) Producer should only timeout a batch in the accumulator when metadata is missing.

2016-03-13 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned KAFKA-3388:
---

Assignee: Jiangjie Qin

> Producer should only timeout a batch in the accumulator when metadata is 
> missing.
> -
>
> Key: KAFKA-3388
> URL: https://issues.apache.org/jira/browse/KAFKA-3388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> In KIP-19 we are reusing the request.timeout.ms to timeout the batches in the 
> accumulator. We were intended to avoid the case that the batches sitting in 
> the accumulator forever when topic metadata is missing.
> Currently we are not checking if metadata is available or not when we timeout 
> the batches in the accumulator (although the comments says we will check the 
> metadata). This causes problem that once the previous batch hit a request 
> timeout and got retried, all the subsequent batches will fail with timeout 
> exception. We should only timeout the batches in the accumulator when the 
> metadata of the partition is missing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: KStreams Proposal

2016-03-13 Thread Guozhang Wang
Hello Bill,

We added transform() together with process() to support any user-customized
stateful processor that can still concatenate to another KStream.

So for your case, would `transform(...).to(topic)` provide the same
functionality as "processTo(topic, ...)"?

Guozhang


On Sat, Mar 12, 2016 at 12:20 PM, Bill Bejeck  wrote:

> Hi All,
>
> While working with KStream/KStreamImp I discovered that there does not seem
> to be any way to connect the results of the KStream.process method with a
> sink node.
>
> I'd like to propose an addition to the API a "processTo" method.
>
> I've looked at and used the "transform", "reduceByKey" and "aggregateByKey"
>  methods, but "processTo" would work like a more general purpose collector
> terminating the KStream and allow for writing out results to an arbitrary
> topic (regardless of key type).
>
>
>  I've done a quick prototype and some  initial testing locally on my fork.
> If you think this could be useful I can add unit tests and create a PR.
> I've included the proposed code changes and the test driver code below
>
>
> KStream.java additions
>
> void processTo(String topic,  ProcessorSupplier processorSupplier,
> String... stateStoreNames);
>
> void processTo(String topic, ProcessorSupplier processorSupplier,
>  Serializer keySerializer, Serializer valSerializer, String...
> stateStoreNames);
>
>
> KStreamImpl.java additions
>
>  @Override
> public void processTo(String topic, ProcessorSupplier
> processorSupplier,  String... stateStoreNames) {
> processTo(topic, processorSupplier, null, null, stateStoreNames);
> }
>
> @SuppressWarnings("unchecked")
> @Override
> public void processTo(String topic,ProcessorSupplier
> processorSupplier,  Serializer keySerializer, Serializer
> valSerializer, String... stateStoreNames) {
> String processorName = topology.newName(PROCESSOR_NAME);
> String sinkName = topology.newName(SINK_NAME);
> StreamPartitioner streamPartitioner = null;
>
> if (keySerializer != null && keySerializer instanceof
> WindowedSerializer) {
> WindowedSerializer windowedSerializer =
> (WindowedSerializer) keySerializer;
> streamPartitioner = (StreamPartitioner) new
> WindowedStreamPartitioner(windowedSerializer);
> }
>
> topology.addProcessor(processorName, processorSupplier, this.name
> );
> topology.addSink(sinkName,topic, keySerializer, valSerializer,
> streamPartitioner, processorName);
> topology.connectProcessorAndStateStores(processorName,
> stateStoreNames);
> }
>
>
> Test Driver
>
> public class TestDriver {
>
> public static void main(String[] args) {
> StreamsConfig config = new StreamsConfig(getProperties());
> KStreamBuilder kStreamBuilder = new KStreamBuilder();
>
> KStream transactionKStream =
>  kStreamBuilder.stream("input");
>
> transactionKStream.processTo("output",UpperCaseProcessor::new);
>
> System.out.println("Starting process-to Example");
> KafkaStreams kafkaStreams = new
> KafkaStreams(kStreamBuilder,config);
> kafkaStreams.start();
> System.out.println("Now started process-to Example");
> }
>
> private static class UpperCaseProcessor extends
> AbstractProcessor {
> @Override
> public void process(String key, String value) {
> context().forward(key, value.toUpperCase());
> context().commit();
> }
> }
>
> private static Properties getProperties() {
> Properties props = new Properties();
> props.put(StreamsConfig.CLIENT_ID_CONFIG, "Process-to-test");
> props.put("group.id", "test-streams");
> props.put(StreamsConfig.JOB_ID_CONFIG, "processor_to_test");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
> props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> "localhost:2181");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
> props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class);
> props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class);
> props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> StringDeserializer.class);
> props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> StringDeserializer.class);
> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> WallclockTimestampExtractor.class);
> return props;
> }
>
> }
>



-- 
-- Guozhang


Re: [DISCUSS] KIP-43: Kafka SASL enhancements

2016-03-13 Thread Harsha
Agree with Gwen here. I feel like these additional pluggable Login
Modules are making this KIP complex. Since the main goal of the KIP is
to enable additional mechanism , can we limit the scope to that and If
we feel necessary for pluggable Login and callback handler classes we
can address in another JIRA.

Adding digest-md5 ,password callbacks can be done to existing
callbackhandler without  expose it as pluggable class. It would be
useful to have broker support multiple mechanisms.  I haven't seen
anyone using more than this in hadoop . It might be different for Kafka
but I personally haven't seen anyone asking for this yet. 

Thanks,
Harsha


On Thu, Mar 10, 2016, at 01:44 AM, Rajini Sivaram wrote:
> Gwen,
> 
> Just to be clear, the alternative would be:
> 
> *jaas.conf:*
> 
> GssapiKafkaServer {
> 
> com.ibm.security.auth.module.Krb5LoginModule required
> credsType=both
> useKeytab="file:/kafka/key.tab"
> principal="kafka/localh...@example.com ";
> 
> };
> 
> SmartcardKafkaServer {
> 
>   example.SmartcardLoginModule required
> 
>   cardNumber=123;
> 
> };
> 
> 
> *KafkaConfig*
> 
> 
> 
>- login.context.map={"GSSAPI="GssapiKafkaServer",
>   "SMARTCARD"=SmartcardKafkaServer}
>   - login.class.map={"GSSAPI=GssapiLogin.class,
>   "SMARTCARD"=SmartcardLogin.class}
>   - callback.handler.class.map={"GSSAPI"=GssapiCallbackHandler.class,
>   "SMARTCARD"=SmartcardCallbackHandler.class}
> 
> *Client Config *
> Same as the server, but with only one entry allowed in each map and
> jaas.conf
> 
> 
> 
> This is a different model from the Java standard for supporting multiple
> logins. As a developer, I am inclined to stick with approaches that are
> widely in use like JSSE. But this alternative can be made to work if the
> Kafka community feels it is more appropriate for Kafka. If you know of
> other systems which use this approach, that would be helpful.
> 
> 
> 
> On Thu, Mar 10, 2016 at 2:07 AM, Gwen Shapira  wrote:
> 
> > What I'm hearing is that:
> >
> > 1. In order to support authentication mechanisms that were not written
> > specifically with Kafka in mind, someone will need to write the
> > integration between the mechanism and Kafka. This may include Login
> > and CallbackHandler classes. This can be the mechanism vendor, the
> > user or a 3rd party vendor.
> > 2. If someone wrote the code to support a mechanism in Kafka, and a
> > user will want to use more than one mechanism, they will still need to
> > write a wrapper.
> > 3. In reality, #2 will not be necessary ("edge-case") because Kafka
> > will actually already provide the callback needed (and presumably also
> > the code to load the LoginModule provided by Example.com)?
> >
> > Tradeoff #1 sounds reasonable.
> > #2 and #3 do not sound reasonable considering one of the goals of the
> > patch is to support multiple mechanisms. I don't think we should force
> > our users to write code just to avoid writing it ourselves.
> > Configuring security is complex enough as is.
> > Furthermore, if we believe that "Smartcard is likely to use standard
> > NameCallback and PasswordCallback already implemented in Kafka" - why
> > do we even provide configuration for Login and CallbackHandler
> > classes? Either we support multiple mechanisms written by different
> > vendors, or we don't.
> >
> > Gwen
> >
> >
> > On Wed, Mar 9, 2016 at 12:32 AM, Rajini Sivaram
> >  wrote:
> > > I am not saying that the developer at Example Inc. would develop a Login
> > > implementation that combines Smartcard and Kerberos because Retailer uses
> > > both. I am saying that Example Inc develops the LoginModule (similar to
> > JVM
> > > security providers developing Kerberos modules). But there is no standard
> > > interface for Login to allow ticket refresh. So, it is very unlikely that
> > > Example Inc would develop a Login implementation that works with an
> > Apache
> > > Kafka defined interface ( Kafka developers wrote this code for Kerberos).
> > > For a custom integration, the user (i.e. Retailer) would be expected to
> > > develop this code if required.
> > >
> > > You could imagine that Smartcard is a commonly used mechanism and a 3rd
> > > party develops code for integrating Smartcard with Kafka and makes the
> > > integration code (Login and CallbackHandler implementation) widely
> > > available, If Retailer wants to use clients or a broker with just
> > Smartcard
> > > enabled in their broker, they configure Kafka to use the 3rd party code,
> > > with no additional code development. But to combine Smartcard and
> > Kerberos,
> > > Retailer needs to write a few lines of code to incorporate both Smartcard
> > > and Kerberos. I believe this is an edge case.
> > >
> > > Smartcard is likely to use standard NameCallback and PasswordCallback
> > > already implemented in Kafka and Kerberos support exists in Kafka. So it
> > is
> > > very likely that Retailer doesn't need to override 

[jira] [Updated] (KAFKA-3392) ConsumerRecords iterator throws NoSuchElementException when a TopicPartition is empty

2016-03-13 Thread Drausin Wulsin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Drausin Wulsin updated KAFKA-3392:
--
Reviewer: Jay Kreps
  Status: Patch Available  (was: Open)

> ConsumerRecords iterator throws NoSuchElementException when a TopicPartition 
> is empty
> -
>
> Key: KAFKA-3392
> URL: https://issues.apache.org/jira/browse/KAFKA-3392
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.9.0.1, 0.9.0.0
>Reporter: Drausin Wulsin
>Assignee: Neha Narkhede
>  Labels: newbie
> Fix For: 0.10.0.0
>
>
> The 
> [makeNext|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java#L103]
>  method in the private static ConcatenatedIterable class of ConsumerRecords 
> assumes that each TopicPartition is non-empty. When this is not the case we 
> get a NoSuchElementException when getting the next element (see the 
> [ConsumerRecordsTest|https://github.com/drausin/kafka/blob/bugfix/consumer-records-iterator/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java]
>  class I created):
> {noformat}
> java.util.NoSuchElementException
>   at java.util.ArrayList$Itr.next(ArrayList.java:854)
>   at 
> org.apache.kafka.clients.consumer.ConsumerRecords$ConcatenatedIterable$1.makeNext(ConsumerRecords.java:110)
>   at 
> org.apache.kafka.clients.consumer.ConsumerRecords$ConcatenatedIterable$1.makeNext(ConsumerRecords.java:99)
>   at 
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
>   at 
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
>   at 
> org.apache.kafka.clients.consumer.ConsumerRecordsTest.iterator(ConsumerRecordsTest.java:53)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:68)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:310)
>   at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:89)
>   at 
> org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:97)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.executeTest(PowerMockJUnit44RunnerDelegateImpl.java:294)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTestInSuper(PowerMockJUnit47RunnerDelegateImpl.java:127)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTest(PowerMockJUnit47RunnerDelegateImpl.java:82)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runBeforesThenTestThenAfters(PowerMockJUnit44RunnerDelegateImpl.java:282)
>   at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:87)
>   at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:50)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.invokeTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:207)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.runMethods(PowerMockJUnit44RunnerDelegateImpl.java:146)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$1.run(PowerMockJUnit44RunnerDelegateImpl.java:120)
>   at 
> org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:34)
>   at 
> org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:44)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.run(PowerMockJUnit44RunnerDelegateImpl.java:122)
>   at 
> org.powermock.modules.junit4.common.internal.impl.JUnit4TestSuiteChunkerImpl.run(JUnit4TestSuiteChunkerImpl.java:106)
>   at 
> org.powermock.modules.junit4.common.internal.impl.AbstractCommonPowerMockRunner.run(AbstractCommonPowerMockRunner.java:53)
>   at 
> org.powermock.modules.junit4.PowerMockRunner.run(PowerMockRunner.java:59)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> 

[GitHub] kafka pull request: KAFKA-3392: ConsumerRecords iterator throws No...

2016-03-13 Thread drausin
GitHub user drausin opened a pull request:

https://github.com/apache/kafka/pull/1055

KAFKA-3392: ConsumerRecords iterator throws NoSuchElementException when a 
TopicPartition is empty

This contribution is my original work, and I license it under the project's 
open source license.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/drausin/kafka bugfix/consumer-records-iterator

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1055.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1055


commit f53dc3a621ef741edc8baedf1ab32fd7fa2e0b8f
Author: Drausin Wulsin 
Date:   2016-03-13T17:03:03Z

fix bug ConsumerRecords iterator makeNext method




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3392) ConsumerRecords iterator throws NoSuchElementException when a TopicPartition is empty

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192419#comment-15192419
 ] 

ASF GitHub Bot commented on KAFKA-3392:
---

GitHub user drausin opened a pull request:

https://github.com/apache/kafka/pull/1055

KAFKA-3392: ConsumerRecords iterator throws NoSuchElementException when a 
TopicPartition is empty

This contribution is my original work, and I license it under the project's 
open source license.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/drausin/kafka bugfix/consumer-records-iterator

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1055.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1055


commit f53dc3a621ef741edc8baedf1ab32fd7fa2e0b8f
Author: Drausin Wulsin 
Date:   2016-03-13T17:03:03Z

fix bug ConsumerRecords iterator makeNext method




> ConsumerRecords iterator throws NoSuchElementException when a TopicPartition 
> is empty
> -
>
> Key: KAFKA-3392
> URL: https://issues.apache.org/jira/browse/KAFKA-3392
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.9.0.0, 0.9.0.1
>Reporter: Drausin Wulsin
>Assignee: Neha Narkhede
>  Labels: newbie
> Fix For: 0.10.0.0
>
>
> The 
> [makeNext|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java#L103]
>  method in the private static ConcatenatedIterable class of ConsumerRecords 
> assumes that each TopicPartition is non-empty. When this is not the case we 
> get a NoSuchElementException when getting the next element (see the 
> [ConsumerRecordsTest|https://github.com/drausin/kafka/blob/bugfix/consumer-records-iterator/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java]
>  class I created):
> {noformat}
> java.util.NoSuchElementException
>   at java.util.ArrayList$Itr.next(ArrayList.java:854)
>   at 
> org.apache.kafka.clients.consumer.ConsumerRecords$ConcatenatedIterable$1.makeNext(ConsumerRecords.java:110)
>   at 
> org.apache.kafka.clients.consumer.ConsumerRecords$ConcatenatedIterable$1.makeNext(ConsumerRecords.java:99)
>   at 
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
>   at 
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
>   at 
> org.apache.kafka.clients.consumer.ConsumerRecordsTest.iterator(ConsumerRecordsTest.java:53)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:68)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:310)
>   at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:89)
>   at 
> org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:97)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.executeTest(PowerMockJUnit44RunnerDelegateImpl.java:294)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTestInSuper(PowerMockJUnit47RunnerDelegateImpl.java:127)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTest(PowerMockJUnit47RunnerDelegateImpl.java:82)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runBeforesThenTestThenAfters(PowerMockJUnit44RunnerDelegateImpl.java:282)
>   at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:87)
>   at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:50)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.invokeTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:207)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.runMethods(PowerMockJUnit44RunnerDelegateImpl.java:146)
>   at 
> 

[jira] [Created] (KAFKA-3392) ConsumerRecords iterator throws NoSuchElementException when a TopicPartition is empty

2016-03-13 Thread Drausin Wulsin (JIRA)
Drausin Wulsin created KAFKA-3392:
-

 Summary: ConsumerRecords iterator throws NoSuchElementException 
when a TopicPartition is empty
 Key: KAFKA-3392
 URL: https://issues.apache.org/jira/browse/KAFKA-3392
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Affects Versions: 0.9.0.1, 0.9.0.0
Reporter: Drausin Wulsin
Assignee: Neha Narkhede
 Fix For: 0.10.0.0


The 
[makeNext|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java#L103]
 method in the private static ConcatenatedIterable class of ConsumerRecords 
assumes that each TopicPartition is non-empty. When this is not the case we get 
a NoSuchElementException when getting the next element (see the 
[ConsumerRecordsTest|https://github.com/drausin/kafka/blob/bugfix/consumer-records-iterator/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java]
 class I created):

{noformat}
java.util.NoSuchElementException
at java.util.ArrayList$Itr.next(ArrayList.java:854)
at 
org.apache.kafka.clients.consumer.ConsumerRecords$ConcatenatedIterable$1.makeNext(ConsumerRecords.java:110)
at 
org.apache.kafka.clients.consumer.ConsumerRecords$ConcatenatedIterable$1.makeNext(ConsumerRecords.java:99)
at 
org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
at 
org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
at 
org.apache.kafka.clients.consumer.ConsumerRecordsTest.iterator(ConsumerRecordsTest.java:53)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:68)
at 
org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:310)
at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:89)
at 
org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:97)
at 
org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.executeTest(PowerMockJUnit44RunnerDelegateImpl.java:294)
at 
org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTestInSuper(PowerMockJUnit47RunnerDelegateImpl.java:127)
at 
org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTest(PowerMockJUnit47RunnerDelegateImpl.java:82)
at 
org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runBeforesThenTestThenAfters(PowerMockJUnit44RunnerDelegateImpl.java:282)
at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:87)
at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:50)
at 
org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.invokeTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:207)
at 
org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.runMethods(PowerMockJUnit44RunnerDelegateImpl.java:146)
at 
org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$1.run(PowerMockJUnit44RunnerDelegateImpl.java:120)
at 
org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:34)
at 
org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:44)
at 
org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.run(PowerMockJUnit44RunnerDelegateImpl.java:122)
at 
org.powermock.modules.junit4.common.internal.impl.JUnit4TestSuiteChunkerImpl.run(JUnit4TestSuiteChunkerImpl.java:106)
at 
org.powermock.modules.junit4.common.internal.impl.AbstractCommonPowerMockRunner.run(AbstractCommonPowerMockRunner.java:53)
at 
org.powermock.modules.junit4.PowerMockRunner.run(PowerMockRunner.java:59)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:119)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
at 
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 

[jira] [Updated] (KAFKA-3391) Kafka to ZK timeout

2016-03-13 Thread Karthik Reddy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Karthik Reddy updated KAFKA-3391:
-
Description: 
Hi Team,

We have seen the below messages in the Kafka logs, indicating there was a 
timeout on ZK.

Could you please advise us on how to tune or better optimize the Kafka-ZK 
communication.

Kafka and ZK are on separate servers.Currently, we have the ZK timeout set to 
6000 ms.
Kafka servers have EBS volumes as the disk.

We had to restart our consumers and ZK to resolve this issue.

[2016-03-10 02:29:25,858] INFO Unable to read additional data from server 
sessionid 0x5531d0003f30030, likely server has closed socket, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2016-03-10 02:29:25,958] INFO zookeeper state changed (Disconnected) 
(org.I0Itec.zkclient.ZkClient)
[2016-03-10 02:29:26,381] INFO Opening socket connection to server 
10.200.77.74/10.200.77.74:8164. Will not attempt to authenticate using SASL 
(unknown error) (org.apache.zookeeper.ClientCnxn)
[2016-03-10 02:29:26,382] INFO Socket connection established to 
10.200.77.74/10.200.77.74:8164, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2016-03-10 02:29:26,385] INFO Session establishment complete on server 
10.200.77.74/10.200.77.74:8164, sessionid = 0x5531d0003f30030, negotiated 
timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2016-03-10 02:29:26,385] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
[2016-03-10 02:29:30,961] INFO conflict in /controller data: 
{"version":1,"brokerid":3,"timestamp":"1457594970952"} stored data: 
{"version":1,"brokerid":5,"timestamp":"1457594970043"} (kafka.utils.ZkUtils$)
[2016-03-10 02:29:30,969] INFO New leader is 5 
(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-03-10 02:29:31,620] INFO [ReplicaFetcherManager on broker 3] Removed 
fetcher for partitions 
[__consumer_offsets,0],[fulfillment.payments.autopay.mongooperation.response,1],[__consumer_offsets,20],[__consumer_offsets,40]
 (kafka.server.ReplicaFetcherManager)
[2016-03-10 02:29:31,621] INFO [ReplicaFetcherManager on broker 3] Removed 
fetcher for partitions 
[efit.framework.notification.error,1],[__consumer_offsets,15],[fulfillment.payments.autopay.processexception.notification,1],[__consumer_offsets,35]
 (kafka.server.ReplicaFetcherManager)
[2016-03-10 02:29:31,621] INFO Truncating log 
efit.framework.notification.error-1 to offset 637. (kafka.log.Log)
[2016-03-10 02:29:31,621] INFO Truncating log __consumer_offsets-15 to offset 
0. (kafka.log.Log)
[2016-03-10 02:29:31,622] INFO Truncating log 
fulfillment.payments.autopay.processexception.notification-1 to offset 0. 
(kafka.log.Log)
[2016-03-10 02:29:31,622] INFO Truncating log __consumer_offsets-35 to offset 
0. (kafka.log.Log)
[2016-03-10 02:29:31,623] INFO Loading offsets from [__consumer_offsets,0] 
(kafka.server.OffsetManager)
[2016-03-10 02:29:31,624] INFO Loading offsets from [__consumer_offsets,20] 
(kafka.server.OffsetManager)
[2016-03-10 02:29:31,624] INFO Finished loading offsets from 
[__consumer_offsets,0] in 1 milliseconds. (kafka.server.OffsetManager)
[2016-03-10 02:29:31,625] INFO Loading offsets from [__consumer_offsets,40] 
(kafka.server.OffsetManager)
[2016-03-10 02:29:31,625] INFO Finished loading offsets from 
[__consumer_offsets,20] in 1 milliseconds. (kafka.server.OffsetManager)
[2016-03-10 02:29:31,625] INFO Finished loading offsets from 
[__consumer_offsets,40] in 0 milliseconds. (kafka.server.OffsetManager)
[2016-03-10 02:29:31,627] INFO [ReplicaFetcherManager on broker 3] Added 
fetcher for partitions List([[efit.framework.notification.error,1], initOffset 
637 to broker id:1,host:10.200.77.78,port:8165] , [[__consumer_offsets,15], 
initOffset 0 to broker id:1,host:10.200.77.78,port:8165] , 
[[fulfillment.payments.autopay.processexception.notification,1], initOffset 0 
to broker id:5,host:10.200.75.150,port:8165] , [[__consumer_offsets,35], 
initOffset 0 to broker id:1,host:10.200.77.78,port:8165] ) 
(kafka.server.ReplicaFetcherManager)
[2016-03-10 02:29:31,627] INFO [ReplicaFetcherThread-0-2], Shutting down 
(kafka.server.ReplicaFetcherThread

Thanks,
Karthik

  was:
Hi Team,

We have seen the below messages in the Kafka logs, indicating there was a 
timeout on ZK.

Could you please advise us on how to tune or better optimize the Kafka-ZK 
communication.

[2016-03-10 02:29:25,858] INFO Unable to read additional data from server 
sessionid 0x5531d0003f30030, likely server has closed socket, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2016-03-10 02:29:25,958] INFO zookeeper state changed (Disconnected) 
(org.I0Itec.zkclient.ZkClient)
[2016-03-10 02:29:26,381] INFO Opening socket connection to server 
10.200.77.74/10.200.77.74:8164. Will not attempt to authenticate using SASL 
(unknown error) (org.apache.zookeeper.ClientCnxn)

[jira] [Created] (KAFKA-3391) Kafka to ZK timeout

2016-03-13 Thread Karthik Reddy (JIRA)
Karthik Reddy created KAFKA-3391:


 Summary: Kafka to ZK timeout 
 Key: KAFKA-3391
 URL: https://issues.apache.org/jira/browse/KAFKA-3391
 Project: Kafka
  Issue Type: Bug
  Components: consumer, zkclient
Affects Versions: 0.8.2.0
 Environment: RHEL 7.2, AWS EC2 compute instance
Reporter: Karthik Reddy
Assignee: Neha Narkhede
Priority: Critical


Hi Team,

We have seen the below messages in the Kafka logs, indicating there was a 
timeout on ZK.

Could you please advise us on how to tune or better optimize the Kafka-ZK 
communication.

[2016-03-10 02:29:25,858] INFO Unable to read additional data from server 
sessionid 0x5531d0003f30030, likely server has closed socket, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2016-03-10 02:29:25,958] INFO zookeeper state changed (Disconnected) 
(org.I0Itec.zkclient.ZkClient)
[2016-03-10 02:29:26,381] INFO Opening socket connection to server 
10.200.77.74/10.200.77.74:8164. Will not attempt to authenticate using SASL 
(unknown error) (org.apache.zookeeper.ClientCnxn)
[2016-03-10 02:29:26,382] INFO Socket connection established to 
10.200.77.74/10.200.77.74:8164, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2016-03-10 02:29:26,385] INFO Session establishment complete on server 
10.200.77.74/10.200.77.74:8164, sessionid = 0x5531d0003f30030, negotiated 
timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2016-03-10 02:29:26,385] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
[2016-03-10 02:29:30,961] INFO conflict in /controller data: 
{"version":1,"brokerid":3,"timestamp":"1457594970952"} stored data: 
{"version":1,"brokerid":5,"timestamp":"1457594970043"} (kafka.utils.ZkUtils$)
[2016-03-10 02:29:30,969] INFO New leader is 5 
(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-03-10 02:29:31,620] INFO [ReplicaFetcherManager on broker 3] Removed 
fetcher for partitions 
[__consumer_offsets,0],[fulfillment.payments.autopay.mongooperation.response,1],[__consumer_offsets,20],[__consumer_offsets,40]
 (kafka.server.ReplicaFetcherManager)
[2016-03-10 02:29:31,621] INFO [ReplicaFetcherManager on broker 3] Removed 
fetcher for partitions 
[efit.framework.notification.error,1],[__consumer_offsets,15],[fulfillment.payments.autopay.processexception.notification,1],[__consumer_offsets,35]
 (kafka.server.ReplicaFetcherManager)
[2016-03-10 02:29:31,621] INFO Truncating log 
efit.framework.notification.error-1 to offset 637. (kafka.log.Log)
[2016-03-10 02:29:31,621] INFO Truncating log __consumer_offsets-15 to offset 
0. (kafka.log.Log)
[2016-03-10 02:29:31,622] INFO Truncating log 
fulfillment.payments.autopay.processexception.notification-1 to offset 0. 
(kafka.log.Log)
[2016-03-10 02:29:31,622] INFO Truncating log __consumer_offsets-35 to offset 
0. (kafka.log.Log)
[2016-03-10 02:29:31,623] INFO Loading offsets from [__consumer_offsets,0] 
(kafka.server.OffsetManager)
[2016-03-10 02:29:31,624] INFO Loading offsets from [__consumer_offsets,20] 
(kafka.server.OffsetManager)
[2016-03-10 02:29:31,624] INFO Finished loading offsets from 
[__consumer_offsets,0] in 1 milliseconds. (kafka.server.OffsetManager)
[2016-03-10 02:29:31,625] INFO Loading offsets from [__consumer_offsets,40] 
(kafka.server.OffsetManager)
[2016-03-10 02:29:31,625] INFO Finished loading offsets from 
[__consumer_offsets,20] in 1 milliseconds. (kafka.server.OffsetManager)
[2016-03-10 02:29:31,625] INFO Finished loading offsets from 
[__consumer_offsets,40] in 0 milliseconds. (kafka.server.OffsetManager)
[2016-03-10 02:29:31,627] INFO [ReplicaFetcherManager on broker 3] Added 
fetcher for partitions List([[efit.framework.notification.error,1], initOffset 
637 to broker id:1,host:10.200.77.78,port:8165] , [[__consumer_offsets,15], 
initOffset 0 to broker id:1,host:10.200.77.78,port:8165] , 
[[fulfillment.payments.autopay.processexception.notification,1], initOffset 0 
to broker id:5,host:10.200.75.150,port:8165] , [[__consumer_offsets,35], 
initOffset 0 to broker id:1,host:10.200.77.78,port:8165] ) 
(kafka.server.ReplicaFetcherManager)
[2016-03-10 02:29:31,627] INFO [ReplicaFetcherThread-0-2], Shutting down 
(kafka.server.ReplicaFetcherThread

Thanks,
Karthik



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3390) ReplicaManager may infinitely try-fail to shrink ISR set of deleted partition

2016-03-13 Thread Stevo Slavic (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stevo Slavic updated KAFKA-3390:

Description: 
For a topic whose deletion has been requested, Kafka replica manager may end up 
infinitely trying and failing to shrink ISR.

Here is fragment from server.log where this recurring and never ending 
condition has been noticed:

{noformat}
[2016-03-04 09:42:13,894] INFO Partition [foo,0] on broker 1: Shrinking ISR for 
partition [foo,0] from 1,3,2 to 1 (kafka.cluster.Partition)
[2016-03-04 09:42:13,897] WARN Conditional update of path 
/brokers/topics/foo/partitions/0/state with data 
{"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1]} and 
expected version 68 failed due to 
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode 
for /brokers/topics/foo/partitions/0/state (kafka.utils.ZkUtils)
[2016-03-04 09:42:13,898] INFO Partition [foo,0] on broker 1: Cached zkVersion 
[68] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2016-03-04 09:42:23,894] INFO Partition [foo,0] on broker 1: Shrinking ISR for 
partition [foo,0] from 1,3,2 to 1 (kafka.cluster.Partition)
[2016-03-04 09:42:23,897] WARN Conditional update of path 
/brokers/topics/foo/partitions/0/state with data 
{"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1]} and 
expected version 68 failed due to 
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode 
for /brokers/topics/foo/partitions/0/state (kafka.utils.ZkUtils)
[2016-03-04 09:42:23,897] INFO Partition [foo,0] on broker 1: Cached zkVersion 
[68] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2016-03-04 09:42:33,894] INFO Partition [foo,0] on broker 1: Shrinking ISR for 
partition [foo,0] from 1,3,2 to 1 (kafka.cluster.Partition)
[2016-03-04 09:42:33,897] WARN Conditional update of path 
/brokers/topics/foo/partitions/0/state with data 
{"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1]} and 
expected version 68 failed due to 
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode 
for /brokers/topics/foo/partitions/0/state (kafka.utils.ZkUtils)
[2016-03-04 09:42:33,897] INFO Partition [foo,0] on broker 1: Cached zkVersion 
[68] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
...
{noformat}

Before topic deletion was requested, this was state in ZK of its sole partition:
{noformat}
Zxid:   0x181045
Cxid:   0xc92
Client id:  0x3532dd88fd2
Time:   Mon Feb 29 16:46:23 CET 2016
Operation:  setData
Path:   /brokers/topics/foo/partitions/0/state
Data:   
{"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1,3,2]}
Version:68
{noformat}

Topic (sole partition) had no data ever published to it. I guess at some point 
after topic deletion has been requested, partition state first got updated and 
this was updated state:
{noformat}
Zxid:   0x18b0be
Cxid:   0x141e4
Client id:  0x3532dd88fd2
Time:   Fri Mar 04 9:41:52 CET 2016
Operation:  setData
Path:   /brokers/topics/foo/partitions/0/state
Data:   
{"controller_epoch":54,"leader":1,"version":1,"leader_epoch":35,"isr":[1,3]}
Version:69
{noformat}

For whatever reason replica manager (some cache it uses, I guess 
ReplicaManager.allPartitions) never sees this update, nor does it see that the 
partition state, partition, partitions node and finally topic node got deleted:
{noformat}
Zxid:   0x18b0bf
Cxid:   0x40fb
Client id:  0x3532dd88fd2000a
Time:   Fri Mar 04 9:41:52 CET 2016
Operation:  delete
Path:   /brokers/topics/foo/partitions/0/state
---
Zxid:   0x18b0c0
Cxid:   0x40fe
Client id:  0x3532dd88fd2000a
Time:   Fri Mar 04 9:41:52 CET 2016
Operation:  delete
Path:   /brokers/topics/foo/partitions/0
---
Zxid:   0x18b0c1
Cxid:   0x4100
Client id:  0x3532dd88fd2000a
Time:   Fri Mar 04 9:41:52 CET 2016
Operation:  delete
Path:   /brokers/topics/foo/partitions
---
Zxid:   0x18b0c2
Cxid:   0x4102
Client id:  0x3532dd88fd2000a
Time:   Fri Mar 04 9:41:52 CET 2016
Operation:  delete
Path:   /brokers/topics/foo
{noformat}

it just keeps on trying, every {{replica.lag.time.max.ms}}, to shrink ISR even 
for partition/topic that has been deleted.

Broker 1 was controller in the cluster; notice that the same broker was lead 
for the partition before it was deleted.

  was:
For a topic whose deletion has been requested, Kafka replica manager may end up 
infinitely trying and failing to shrink ISR.

Here is fragment from server.log where this recurring and never ending 
condition has been noticed:

{noformat}

[jira] [Created] (KAFKA-3390) ReplicaManager may infinitely try-fail to shrink ISR set of deleted partition

2016-03-13 Thread Stevo Slavic (JIRA)
Stevo Slavic created KAFKA-3390:
---

 Summary: ReplicaManager may infinitely try-fail to shrink ISR set 
of deleted partition
 Key: KAFKA-3390
 URL: https://issues.apache.org/jira/browse/KAFKA-3390
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.9.0.1
Reporter: Stevo Slavic


For a topic whose deletion has been requested, Kafka replica manager may end up 
infinitely trying and failing to shrink ISR.

Here is fragment from server.log where this recurring and never ending 
condition has been noticed:

{noformat}
[2016-03-04 09:42:13,894] INFO Partition [foo,0] on broker 1: Shrinking ISR for 
partition [foo,0] from 1,3,2 to 1 (kafka.cluster.Partition)
[2016-03-04 09:42:13,897] WARN Conditional update of path 
/brokers/topics/foo/partitions/0/state with data 
{"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1]} and 
expected version 68 failed due to 
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode 
for /brokers/topics/foo/partitions/0/state (kafka.utils.ZkUtils)
[2016-03-04 09:42:13,898] INFO Partition [foo,0] on broker 1: Cached zkVersion 
[68] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2016-03-04 09:42:23,894] INFO Partition [foo,0] on broker 1: Shrinking ISR for 
partition [foo,0] from 1,3,2 to 1 (kafka.cluster.Partition)
[2016-03-04 09:42:23,897] WARN Conditional update of path 
/brokers/topics/foo/partitions/0/state with data 
{"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1]} and 
expected version 68 failed due to 
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode 
for /brokers/topics/foo/partitions/0/state (kafka.utils.ZkUtils)
[2016-03-04 09:42:23,897] INFO Partition [foo,0] on broker 1: Cached zkVersion 
[68] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2016-03-04 09:42:33,894] INFO Partition [foo,0] on broker 1: Shrinking ISR for 
partition [foo,0] from 1,3,2 to 1 (kafka.cluster.Partition)
[2016-03-04 09:42:33,897] WARN Conditional update of path 
/brokers/topics/foo/partitions/0/state with data 
{"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1]} and 
expected version 68 failed due to 
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode 
for /brokers/topics/foo/partitions/0/state (kafka.utils.ZkUtils)
[2016-03-04 09:42:33,897] INFO Partition [foo,0] on broker 1: Cached zkVersion 
[68] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
...
{noformat}

Before topic deletion was requested, this was state in ZK of its sole partition:
{noformat}
Zxid:   0x181045
Cxid:   0xc92
Client id:  0x3532dd88fd2
Time:   Mon Feb 29 16:46:23 CET 2016
Operation:  setData
Path:   /brokers/topics/foo/partitions/0/state
Data:   
{"controller_epoch":53,"leader":1,"version":1,"leader_epoch":34,"isr":[1,3,2]}
Version:68
{noformat}

Topic (sole partition) had no data ever published to it. I guess at some point 
after topic deletion has been requested, partition state first got updated and 
this was updated state:
{noformat}
Zxid:   0x18b0be
Cxid:   0x141e4
Client id:  0x3532dd88fd2
Time:   Fri Mar 04 9:41:52 CET 2016
Operation:  setData
Path:   /brokers/topics/foo/partitions/0/state
Data:   
{"controller_epoch":54,"leader":1,"version":1,"leader_epoch":35,"isr":[1,3]}
Version:69
{noformat}

For whatever reason replica manager (some cache it uses, I guess 
ReplicaManager.allPartitions) never sees this update, nor does it see that the 
partition state, partition, partitions node and finally topic node got deleted:
{noformat}
Zxid:   0x18b0bf
Cxid:   0x40fb
Client id:  0x3532dd88fd2000a
Time:   Fri Mar 04 9:41:52 CET 2016
Operation:  delete
Path:   /brokers/topics/foo/partitions/0/state
---
Zxid:   0x18b0c0
Cxid:   0x40fe
Client id:  0x3532dd88fd2000a
Time:   Fri Mar 04 9:41:52 CET 2016
Operation:  delete
Path:   /brokers/topics/foo/partitions/0
---
Zxid:   0x18b0c1
Cxid:   0x4100
Client id:  0x3532dd88fd2000a
Time:   Fri Mar 04 9:41:52 CET 2016
Operation:  delete
Path:   /brokers/topics/foo/partitions
---
Zxid:   0x18b0c2
Cxid:   0x4102
Client id:  0x3532dd88fd2000a
Time:   Fri Mar 04 9:41:52 CET 2016
Operation:  delete
Path:   /brokers/topics/foo
{noformat}

it just keeps on trying, every {{replica.lag.time.max.ms}}, to shrink ISR even 
for partition/topic that has been deleted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3389) ReplicaStateMachine areAllReplicasForTopicDeleted check not handling well case when there are no replicas for topic

2016-03-13 Thread Stevo Slavic (JIRA)
Stevo Slavic created KAFKA-3389:
---

 Summary: ReplicaStateMachine areAllReplicasForTopicDeleted check 
not handling well case when there are no replicas for topic
 Key: KAFKA-3389
 URL: https://issues.apache.org/jira/browse/KAFKA-3389
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 0.9.0.1
Reporter: Stevo Slavic
Assignee: Neha Narkhede
Priority: Minor


Line ReplicaStateMachine.scala#L285
{noformat}
replicaStatesForTopic.forall(_._2 == ReplicaDeletionSuccessful)
{noformat}

which is return value of {{areAllReplicasForTopicDeleted}} function/check, 
probably should better be checking for
{noformat}
replicaStatesForTopic.isEmpty || replicaStatesForTopic.forall(_._2 == 
ReplicaDeletionSuccessful)
{noformat}
I noticed it because in controller logs I found entries like:
{noformat}
[2016-03-04 13:27:29,115] DEBUG [Replica state machine on controller 1]: Are 
all replicas for topic foo deleted Map() (kafka.controller.ReplicaStateMachine)
{noformat}
even though normally they look like:
{noformat}
[2016-03-04 09:33:41,036] DEBUG [Replica state machine on controller 1]: Are 
all replicas for topic foo deleted Map([Topic=foo,Partition=0,Replica=0] -> 
ReplicaDeletionStarted, [Topic=foo,Partition=0,Replica=3] -> 
ReplicaDeletionStarted, [Topic=foo,Partition=0,Replica=1] -> 
ReplicaDeletionSuccessful) (kafka.controller.ReplicaStateMachine)
{noformat}

This may cause topic deletion request never to be cleared from ZK even when 
topic has been deleted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3388) Producer should only timeout a batch in the accumulator when metadata is missing.

2016-03-13 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192225#comment-15192225
 ] 

Ismael Juma commented on KAFKA-3388:


[~becket_qin], are you intending to work on this (double-checking if you forgot 
to assign it to yourself to avoid duplicating work).

> Producer should only timeout a batch in the accumulator when metadata is 
> missing.
> -
>
> Key: KAFKA-3388
> URL: https://issues.apache.org/jira/browse/KAFKA-3388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Jiangjie Qin
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> In KIP-19 we are reusing the request.timeout.ms to timeout the batches in the 
> accumulator. We were intended to avoid the case that the batches sitting in 
> the accumulator forever when topic metadata is missing.
> Currently we are not checking if metadata is available or not when we timeout 
> the batches in the accumulator (although the comments says we will check the 
> metadata). This causes problem that once the previous batch hit a request 
> timeout and got retried, all the subsequent batches will fail with timeout 
> exception. We should only timeout the batches in the accumulator when the 
> metadata of the partition is missing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3330) Truncate log cleaner offset checkpoint if the log is truncated

2016-03-13 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3330:
---
Reviewer: Jun Rao
  Status: Patch Available  (was: Open)

> Truncate log cleaner offset checkpoint if the log is truncated
> --
>
> Key: KAFKA-3330
> URL: https://issues.apache.org/jira/browse/KAFKA-3330
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Critical
> Fix For: 0.10.0.0
>
>
> Were getting a number of failures of the log compaction thread with the
> following error:
> 2016/02/02 00:13:58.832 [LogCleaner] Cleaner 0: Beginning cleaning of log
> __consumer_offsets-93.
> 2016/02/02 00:13:58.832 [LogCleaner] Cleaner 0: Building offset map for
> __consumer_offsets-93...
> 2016/02/02 00:13:59.048 [LogCleaner] Cleaner 0: Building offset map for log
> __consumer_offsets-93 for 2 segments in offset range [11951210572,
> 11952632314).
> 2016/02/02 00:13:59.066 [LogCleaner] [kafka-log-cleaner-thread-0], Error
> due to
> java.lang.IllegalArgumentException: requirement failed: Last clean offset
> is 11951210572 but segment base offset is 11950300163 for log
> __consumer_offsets-93.
> at scala.Predef$.require(Predef.scala:233) ~[scala-library-2.10.4.jar:?]
> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:561)
> ~[kafka_2.10-0.8.2.56.jar:?]
> at kafka.log.Cleaner.clean(LogCleaner.scala:306)
> ~[kafka_2.10-0.8.2.56.jar:?]
> at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:217)
> ~[kafka_2.10-0.8.2.56.jar:?]
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:195)
> ~[kafka_2.10-0.8.2.56.jar:?]
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> ~[kafka_2.10-0.8.2.56.jar:?]
> 2016/02/02 00:13:59.066 [LogCleaner] [kafka-log-cleaner-thread-0], Stopped
> We found that this may be caused in the following scenario:
> - we have three log segments with offset range [100, 200), [200, 300), and 
> [300, 400) respectively. 300 is the base offset of the active log segment. 
> Log cleaner offset checkpoint is also 300.
> - After log is truncated to offset 220, the log segments become [100, 200), 
> [200, 220). The Log cleaner offset checkpoint is still 300.
> - After new messages are appended to the log, the log segments become [100, 
> 200), [200, 320), [320, 420). The Log cleaner offset checkpoint is still 300.
> - Log cleaner cleans the log starting at offset 300. The require(offset == 
> start) in Cleaner.buildOffsetMap() fails because the the offset 300 is not 
> the base offset of any segments.
> To fix the problem, when the log is truncated to an offset smaller than 
> cleaner offset checkpoint, we should reset cleaner offset checkpoint to the 
> base offset of the active segment if this value is larger than the 
> checkpointed offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log

2016-03-13 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3047:
---
Resolution: Fixed
  Reviewer: Guozhang Wang  (was: Jun Rao)
Status: Resolved  (was: Patch Available)

Guozhang merged this, but JIRA was down so he could not close the issue. I 
downgraded the priority because this doesn't affect Kafka's usage of `Log`.

> Explicit offset assignment in Log.append can corrupt the log
> 
>
> Key: KAFKA-3047
> URL: https://issues.apache.org/jira/browse/KAFKA-3047
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.9.0.0
>Reporter: Maciek Makowski
>Assignee: Ismael Juma
> Fix For: 0.10.0.0
>
>
> {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, 
> should cause Kafka to use the offsets specified in the 
> {{ByteBufferMessageSet}} and not recalculate them based on 
> {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} 
> is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can 
> cause corruption of the log in the following scenario:
> * {{nextOffsetMetadata.messageOffset}} is 2001
> * {{append(messageSet, assignOffsets = false)}} is called, where 
> {{messageSet}} contains offsets 1001...1500 
> * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, 
> {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500
> * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, 
> {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500
> * consistency check {{if(!appendInfo.offsetsMonotonic || 
> appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the 
> second condition can never fail due to unconditional assignment) and writing 
> proceeds
> * the message set is appended to current log segment starting at offset 2001, 
> but the offsets in the set are 1001...1500
> * the system shuts down abruptly
> * on restart, the following unrecoverable error is reported: 
> {code}
> Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to 
> append an offset (1001) to position 12345 no larger than the last offset 
> appended (1950) to xyz/.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>   at java.lang.Thread.run(Thread.java:722)
> {code} 
> *Proposed fix:* the assignment {{appendInfo.firstOffset = 
> nextOffsetMetadata.messageOffset}} should only happen in {{if 
> (assignOffsets)}} branch of code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log

2016-03-13 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3047:
---
Priority: Major  (was: Blocker)

> Explicit offset assignment in Log.append can corrupt the log
> 
>
> Key: KAFKA-3047
> URL: https://issues.apache.org/jira/browse/KAFKA-3047
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.9.0.0
>Reporter: Maciek Makowski
>Assignee: Ismael Juma
> Fix For: 0.10.0.0
>
>
> {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, 
> should cause Kafka to use the offsets specified in the 
> {{ByteBufferMessageSet}} and not recalculate them based on 
> {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} 
> is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can 
> cause corruption of the log in the following scenario:
> * {{nextOffsetMetadata.messageOffset}} is 2001
> * {{append(messageSet, assignOffsets = false)}} is called, where 
> {{messageSet}} contains offsets 1001...1500 
> * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, 
> {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500
> * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, 
> {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500
> * consistency check {{if(!appendInfo.offsetsMonotonic || 
> appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the 
> second condition can never fail due to unconditional assignment) and writing 
> proceeds
> * the message set is appended to current log segment starting at offset 2001, 
> but the offsets in the set are 1001...1500
> * the system shuts down abruptly
> * on restart, the following unrecoverable error is reported: 
> {code}
> Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to 
> append an offset (1001) to position 12345 no larger than the last offset 
> appended (1950) to xyz/.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>   at java.lang.Thread.run(Thread.java:722)
> {code} 
> *Proposed fix:* the assignment {{appendInfo.firstOffset = 
> nextOffsetMetadata.messageOffset}} should only happen in {{if 
> (assignOffsets)}} branch of code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)