Build failed in Jenkins: kafka-trunk-jdk7 #1112

2016-03-11 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-3047: Explicit offset assignment in Log.append can corrupt the 
log

--
[...truncated 1469 lines...]

kafka.log.LogTest > testTruncateTo PASSED

kafka.log.LogTest > testCleanShutdownFile PASSED

kafka.log.OffsetIndexTest > lookupExtremeCases PASSED

kafka.log.OffsetIndexTest > appendTooMany PASSED

kafka.log.OffsetIndexTest > randomLookupTest PASSED

kafka.log.OffsetIndexTest > testReopen PASSED

kafka.log.OffsetIndexTest > appendOutOfOrder PASSED

kafka.log.OffsetIndexTest > truncate PASSED

kafka.log.LogSegmentTest > testRecoveryWithCorruptMessage PASSED

kafka.log.LogSegmentTest > testRecoveryFixesCorruptIndex PASSED

kafka.log.LogSegmentTest > testReadFromGap PASSED

kafka.log.LogSegmentTest > testTruncate PASSED

kafka.log.LogSegmentTest > testReadBeforeFirstOffset PASSED

kafka.log.LogSegmentTest > testCreateWithInitFileSizeAppendMessage PASSED

kafka.log.LogSegmentTest > testChangeFileSuffixes PASSED

kafka.log.LogSegmentTest > testMaxOffset PASSED

kafka.log.LogSegmentTest > testNextOffsetCalculation PASSED

kafka.log.LogSegmentTest > testReadOnEmptySegment PASSED

kafka.log.LogSegmentTest > testReadAfterLast PASSED

kafka.log.LogSegmentTest > testCreateWithInitFileSizeClearShutdown PASSED

kafka.log.LogSegmentTest > testTruncateFull PASSED

kafka.log.CleanerTest > testBuildOffsetMap PASSED

kafka.log.CleanerTest > testSegmentGrouping PASSED

kafka.log.CleanerTest > testCleanSegmentsWithAbort PASSED

kafka.log.CleanerTest > testSegmentGroupingWithSparseOffsets PASSED

kafka.log.CleanerTest > testRecoveryAfterCrash PASSED

kafka.log.CleanerTest > testLogToClean PASSED

kafka.log.CleanerTest > testCleaningWithDeletes PASSED

kafka.log.CleanerTest > testCleanSegments PASSED

kafka.log.CleanerTest > testCleaningWithUnkeyedMessages PASSED

kafka.log.OffsetMapTest > testClear PASSED

kafka.log.OffsetMapTest > testBasicValidation PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testHeartbeatWrongCoordinator 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testDescribeGroupStable PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testHeartbeatIllegalGeneration 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testDescribeGroupWrongCoordinator PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testDescribeGroupRebalancing 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testLeaderFailureInSyncGroup 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testGenerationIdIncrementsOnRebalance PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testSyncGroupFromIllegalGeneration PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testInvalidGroupId PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testHeartbeatUnknownGroup 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testListGroupsIncludesStableGroups PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testHeartbeatDuringRebalanceCausesRebalanceInProgress PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupInconsistentGroupProtocol PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupSessionTimeoutTooLarge PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupSessionTimeoutTooSmall PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testSyncGroupEmptyAssignment 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testCommitOffsetWithDefaultGeneration PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupFromUnchangedLeaderShouldRebalance PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testHeartbeatRebalanceInProgress PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testLeaveGroupUnknownGroup 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testListGroupsIncludesRebalancingGroups PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testSyncGroupFollowerAfterLeader PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testCommitOffsetInAwaitingSync 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testJoinGroupWrongCoordinator 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupUnknownConsumerExistingGroup PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testSyncGroupFromUnknownGroup 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupInconsistentProtocolType PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testCommitOffsetFromUnknownGroup PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testLeaveGroupWrongCoordinator 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testLeaveGroupUnknownConsumerExistingGroup PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupUnknownConsumerNewGroup PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testJoinGroupFromUnchangedFollowerDoesNotRebalance PASSED


[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log

2016-03-11 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191656#comment-15191656
 ] 

Guozhang Wang commented on KAFKA-3047:
--

Good to know, thanks for explanation!

> Explicit offset assignment in Log.append can corrupt the log
> 
>
> Key: KAFKA-3047
> URL: https://issues.apache.org/jira/browse/KAFKA-3047
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.9.0.0
>Reporter: Maciek Makowski
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, 
> should cause Kafka to use the offsets specified in the 
> {{ByteBufferMessageSet}} and not recalculate them based on 
> {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} 
> is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can 
> cause corruption of the log in the following scenario:
> * {{nextOffsetMetadata.messageOffset}} is 2001
> * {{append(messageSet, assignOffsets = false)}} is called, where 
> {{messageSet}} contains offsets 1001...1500 
> * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, 
> {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500
> * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, 
> {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500
> * consistency check {{if(!appendInfo.offsetsMonotonic || 
> appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the 
> second condition can never fail due to unconditional assignment) and writing 
> proceeds
> * the message set is appended to current log segment starting at offset 2001, 
> but the offsets in the set are 1001...1500
> * the system shuts down abruptly
> * on restart, the following unrecoverable error is reported: 
> {code}
> Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to 
> append an offset (1001) to position 12345 no larger than the last offset 
> appended (1950) to xyz/.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>   at java.lang.Thread.run(Thread.java:722)
> {code} 
> *Proposed fix:* the assignment {{appendInfo.firstOffset = 
> nextOffsetMetadata.messageOffset}} should only happen in {{if 
> (assignOffsets)}} branch of code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk8 #440

2016-03-11 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-3047: Explicit offset assignment in Log.append can corrupt the 
log

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on ubuntu-us1 (Ubuntu ubuntu ubuntu-us) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url 
 > https://git-wip-us.apache.org/repos/asf/kafka.git # timeout=10
Fetching upstream changes from https://git-wip-us.apache.org/repos/asf/kafka.git
 > git --version # timeout=10
 > git -c core.askpass=true fetch --tags --progress 
 > https://git-wip-us.apache.org/repos/asf/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/trunk^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/trunk^{commit} # timeout=10
Checking out Revision c9311d5f4ec3b135cb6c0f87008da946863daaa2 
(refs/remotes/origin/trunk)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f c9311d5f4ec3b135cb6c0f87008da946863daaa2
 > git rev-list a162f6bf66d0d21505c8d11942f84be446616491 # timeout=10
Setting 
JDK1_8_0_45_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk1.8.0_45
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
[kafka-trunk-jdk8] $ /bin/bash -xe /tmp/hudson6808856696745921545.sh
+ 
/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2/bin/gradle
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
http://gradle.org/docs/2.4-rc-2/userguide/gradle_daemon.html.
Building project 'core' with Scala version 2.10.6
:downloadWrapper

BUILD SUCCESSFUL

Total time: 11.74 secs
Setting 
JDK1_8_0_45_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk1.8.0_45
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
[kafka-trunk-jdk8] $ /bin/bash -xe /tmp/hudson4876888009300539621.sh
+ export GRADLE_OPTS=-Xmx1024m
+ GRADLE_OPTS=-Xmx1024m
+ ./gradlew -Dorg.gradle.project.maxParallelForks=1 clean jarAll testAll
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
https://docs.gradle.org/2.11/userguide/gradle_daemon.html.
Building project 'core' with Scala version 2.10.6
:clean UP-TO-DATE
:clients:clean UP-TO-DATE
:connect:clean UP-TO-DATE
:core:clean UP-TO-DATE
:examples:clean UP-TO-DATE
:log4j-appender:clean UP-TO-DATE
:streams:clean UP-TO-DATE
:tools:clean UP-TO-DATE
:connect:api:clean UP-TO-DATE
:connect:file:clean UP-TO-DATE
:connect:json:clean UP-TO-DATE
:connect:runtime:clean UP-TO-DATE
:streams:examples:clean UP-TO-DATE
:jar_core_2_10
Building project 'core' with Scala version 2.10.6
:kafka-trunk-jdk8:clients:compileJava
:jar_core_2_10 FAILED

FAILURE: Build failed with an exception.

* What went wrong:
Failed to capture snapshot of input files for task 'compileJava' during 
up-to-date check.
> Could not add entry 
> '/home/jenkins/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-api/1.7.18/b631d286463ced7cc42ee2171fe3beaed2836823/slf4j-api-1.7.18.jar'
>  to cache fileHashes.bin 
> (

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug 
option to get more log output.

BUILD FAILED

Total time: 11.213 secs
Build step 'Execute shell' marked build as failure
Recording test results
Setting 
JDK1_8_0_45_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk1.8.0_45
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
ERROR: Step ‘Publish JUnit test result report’ failed: No test report files 
were found. Configuration error?
Setting 
JDK1_8_0_45_HOME=/home/jenkins/jenkins-slave/tools/hudson.model.JDK/jdk1.8.0_45
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2


[GitHub] kafka pull request: KAFKA-3047: Explicit offset assignment in Log....

2016-03-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1029


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log

2016-03-11 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191652#comment-15191652
 ] 

Guozhang Wang commented on KAFKA-3047:
--

Good to know, thanks for explanation!

> Explicit offset assignment in Log.append can corrupt the log
> 
>
> Key: KAFKA-3047
> URL: https://issues.apache.org/jira/browse/KAFKA-3047
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.9.0.0
>Reporter: Maciek Makowski
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, 
> should cause Kafka to use the offsets specified in the 
> {{ByteBufferMessageSet}} and not recalculate them based on 
> {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} 
> is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can 
> cause corruption of the log in the following scenario:
> * {{nextOffsetMetadata.messageOffset}} is 2001
> * {{append(messageSet, assignOffsets = false)}} is called, where 
> {{messageSet}} contains offsets 1001...1500 
> * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, 
> {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500
> * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, 
> {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500
> * consistency check {{if(!appendInfo.offsetsMonotonic || 
> appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the 
> second condition can never fail due to unconditional assignment) and writing 
> proceeds
> * the message set is appended to current log segment starting at offset 2001, 
> but the offsets in the set are 1001...1500
> * the system shuts down abruptly
> * on restart, the following unrecoverable error is reported: 
> {code}
> Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to 
> append an offset (1001) to position 12345 no larger than the last offset 
> appended (1950) to xyz/.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>   at java.lang.Thread.run(Thread.java:722)
> {code} 
> *Proposed fix:* the assignment {{appendInfo.firstOffset = 
> nextOffsetMetadata.messageOffset}} should only happen in {{if 
> (assignOffsets)}} branch of code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-35 - Retrieve protocol version

2016-03-11 Thread Jay Kreps
Yeah I agree with that characterization of the tradeoff. I think what that
would imply would be that evolution of the metadata request (or the
protocol version request) would remain server-first, whereas other apis
would be independent. Not sure if I've fully thought it through, though.

-Jay

On Mon, Mar 7, 2016 at 10:58 AM, Ashish Singh  wrote:

> On Fri, Mar 4, 2016 at 5:46 PM, Jay Kreps  wrote:
>
> > Hey Ashish,
> >
> > Both good points.
> >
> > I think the issue with the general metadata request is the same as the
> > issue with a version-specific metadata request from the other
> > proposal--basically it's a chicken and egg problem, to find out anything
> > about the cluster you have to be able to communicate something in a
> format
> > the server can understand without knowing a priori what version it's on.
> I
> > guess the question is how can you continue to evolve the metadata request
> > (whether it is the existing metadata or a protocol-version specific
> > metadata request) given that you need this information to bootstrap you
> > have to be more careful in how that request evolves.
> >
> You are correct. It's just that protocol version request would be very
> specific to retrieve the protocol versions. Changes to protocol version
> request itself should be very rare, if at all. However, the general
> metadata request carries a lot more information and its format is more
> probable to evolve. This boils down to higher probability of change vs a
> definite network round-trip for each re/connect. It does sound like, it is
> better to avoid a definite penalty than to avoid a probable rare issue.
>
> >
> > I think deprecation/removal may be okay. Ultimately clients will always
> use
> > the highest possible version of the protocol the server supports so if
> > we've already deprecated and removed your highest version then you are
> > screwed and you're going to get an error no matter what, right? Basically
> > there is nothing dynamic you can do in that case.
> >
> Sure, this should be expected. Just wanted to make sure deprecation is
> still on the table.
>
> >
> > -Jay
> >
> > On Fri, Mar 4, 2016 at 4:05 PM, Ashish Singh 
> wrote:
> >
> > > Hello Jay,
> > >
> > > The overall approach sounds good. I do realize that this discussion has
> > > gotten too lengthy and is starting to shoot tangents. Maybe a KIP call
> > will
> > > help us getting to a decision faster. I do have a few questions though.
> > >
> > > On Fri, Mar 4, 2016 at 9:52 AM, Jay Kreps  wrote:
> > >
> > > > Yeah here is my summary of my take:
> > > >
> > > > 1. Negotiating a per-connection protocol actually does add a lot of
> > > > complexity to clients (many more failure states to get right).
> > > >
> > > > 2. Having the client configure the protocol version manually is
> doable
> > > now
> > > > but probably a worse state. I suspect this will lead to more not less
> > > > confusion.
> > > >
> > > > 3. I don't think the current state is actually that bad. Integrators
> > > pick a
> > > > conservative version and build against that. There is a tradeoff
> > between
> > > > getting the new features and being compatible with old Kafka
> versions.
> > > But
> > > > a large part of this tradeoff is essential since new features aren't
> > > going
> > > > to magically appear on old servers, so even if you upgrade your
> client
> > > you
> > > > likely aren't going to get the new stuff (since we will end up
> > > dynamically
> > > > turning it off). Having client features that are there but don't work
> > > > because you're on an old cluster may actually be a worse experience
> if
> > > not
> > > > handled very carefully..
> > > >
> > > > 4. The problems Dana brought up are totally orthogonal to the problem
> > of
> > > > having per-api versions or overall versions. The problem was that we
> > > > changed behavior subtly without changing the version. This will be an
> > > issue
> > > > regardless of whether the version is global or not.
> > > >
> > > > 5. Using the broker release as the version is strictly worse than
> > using a
> > > > global protocol version (0, 1, 2, ...) that increments any time any
> api
> > > > changes but doesn't increment just because non-protocol code is
> > changed.
> > > > The problem with using the broker release version is we want to be
> able
> > > to
> > > > keep Kafka releasable from any commit which means there isn't as
> clear
> > a
> > > > sequencing of releases as you would think.
> > > >
> > > > 6. We need to consider the case of mixed version clusters during the
> > time
> > > > period when you are upgrading Kafka.
> > > >
> > > > So overall I think this is not a critical thing to do right now, but
> if
> > > we
> > > > are going to do it we should do it in a way that actually improves
> > > things.
> > > >
> > > > Here would be one proposal for that:
> > > > a. Add a global protocol version that increments with any api 

[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log

2016-03-11 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191642#comment-15191642
 ] 

Guozhang Wang commented on KAFKA-3047:
--

Good to know, thanks for explanation!

> Explicit offset assignment in Log.append can corrupt the log
> 
>
> Key: KAFKA-3047
> URL: https://issues.apache.org/jira/browse/KAFKA-3047
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.9.0.0
>Reporter: Maciek Makowski
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, 
> should cause Kafka to use the offsets specified in the 
> {{ByteBufferMessageSet}} and not recalculate them based on 
> {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} 
> is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can 
> cause corruption of the log in the following scenario:
> * {{nextOffsetMetadata.messageOffset}} is 2001
> * {{append(messageSet, assignOffsets = false)}} is called, where 
> {{messageSet}} contains offsets 1001...1500 
> * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, 
> {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500
> * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, 
> {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500
> * consistency check {{if(!appendInfo.offsetsMonotonic || 
> appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the 
> second condition can never fail due to unconditional assignment) and writing 
> proceeds
> * the message set is appended to current log segment starting at offset 2001, 
> but the offsets in the set are 1001...1500
> * the system shuts down abruptly
> * on restart, the following unrecoverable error is reported: 
> {code}
> Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to 
> append an offset (1001) to position 12345 no larger than the last offset 
> appended (1950) to xyz/.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>   at java.lang.Thread.run(Thread.java:722)
> {code} 
> *Proposed fix:* the assignment {{appendInfo.firstOffset = 
> nextOffsetMetadata.messageOffset}} should only happen in {{if 
> (assignOffsets)}} branch of code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log

2016-03-11 Thread Maciek Makowski (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191589#comment-15191589
 ] 

Maciek Makowski commented on KAFKA-3047:


[~ijuma]: thanks for the fix!

[~guozhang]: I discovered it when I attempted to use the {{Log}} component on 
its own -- I wanted a library that would do reliable logging and housekeeping, 
but without the networking. So no, nothing in Kafka proper that I'm aware of 
would expose it.

> Explicit offset assignment in Log.append can corrupt the log
> 
>
> Key: KAFKA-3047
> URL: https://issues.apache.org/jira/browse/KAFKA-3047
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.9.0.0
>Reporter: Maciek Makowski
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, 
> should cause Kafka to use the offsets specified in the 
> {{ByteBufferMessageSet}} and not recalculate them based on 
> {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} 
> is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can 
> cause corruption of the log in the following scenario:
> * {{nextOffsetMetadata.messageOffset}} is 2001
> * {{append(messageSet, assignOffsets = false)}} is called, where 
> {{messageSet}} contains offsets 1001...1500 
> * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, 
> {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500
> * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, 
> {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500
> * consistency check {{if(!appendInfo.offsetsMonotonic || 
> appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the 
> second condition can never fail due to unconditional assignment) and writing 
> proceeds
> * the message set is appended to current log segment starting at offset 2001, 
> but the offsets in the set are 1001...1500
> * the system shuts down abruptly
> * on restart, the following unrecoverable error is reported: 
> {code}
> Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to 
> append an offset (1001) to position 12345 no larger than the last offset 
> appended (1950) to xyz/.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>   at java.lang.Thread.run(Thread.java:722)
> {code} 
> *Proposed fix:* the assignment {{appendInfo.firstOffset = 
> nextOffsetMetadata.messageOffset}} should only happen in {{if 
> (assignOffsets)}} branch of code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[DISCUSS] KIP-35 - Retrieve protocol version

2016-03-11 Thread Ashish Singh
Sounds like we are mostly in agreement. Following are the key points.

   1. Every time a protocol version changes, for any request/response,
   broker version, ApiVersion, will be bumped up.
   2. Protocol documentation will be versioned with broker version. Every
   time there is a broker version change, protocol documentation version needs
   to be updated and linked to main documentation page.
   3. Deprecation of protocol version will be done via marking the version
   as deprecated on the protocol documentation.
   4. On getting unknown protocol version, broker will send an empty
   response, instead of simply closing client connection.
   5. Metadata response will be enhanced to also contain broker version,
   VersionInt and VersionString. VersionString will contain internal
   version information.
   6. Metadata request with single null topic and size of -1 can be used to
   fetch metadata response with only broker version information and no
   topic/broker info.
   7. On receiving a metadata request with single null topic with size of
   -1, broker will respond with only broker version.


Please correct/add, if I missed out something. If the aforementioned
changes sound good, I can update the KIP-35 wiki, WIP PR and start a Vote
thread.

On Fri, Mar 11, 2016 at 12:48 PM, Magnus Edenhill 
wrote:

> I'm not sure supporting specific interim versions between releases are
> really that big of a concern,
> for a start the protocol changes may be in flux and not settle until the
> formal release, secondly
> the 3rd party clients typically lag behind at least until the formal
> release before they implement support (for the first stated reason..).
> But this is still a good point and if we could use the version fields to
> specify a point between
> two formal releases then that would be useful to ease client development
> during that period.
> Grabbing 0.10.0 from versionInt and "IV" from versionString is an
> acceptable solution as long
> as there is some way for a client to distinguish the formal release.
>
>
> /Magnus
>
>
>
>
> 2016-03-11 20:27 GMT+01:00 Gwen Shapira :
>
> > Yeah, I'm not sure that 0.10.0-IV1 and 0.10.0-IV2 is what Magnus had
> > in mind when he was advocating for release versions in the protocol.
> >
> > But - if we serialize both the string and the integer Id of ApiVersion
> > into the Metadata object, I think both Magnus and Jay will be happy :)
> >
> > Gwen
> >
> > On Fri, Mar 11, 2016 at 11:22 AM, Ismael Juma  wrote:
> > > We introduced a way to bump the API version in between releases as part
> > of
> > > the KIP-31/KIP-32 by the way. Extending that could maybe work. Take a
> > look
> > > at the ApiVersion class and its documentation.
> > >
> > > Ismael
> > > On 11 Mar 2016 19:06, "Gwen Shapira"  wrote:
> > >
> > >> Magnus,
> > >>
> > >> If we go with release version as protocol version (which I agree is
> > >> much more user-friendly) - what will be the release version on trunk?
> > >> 0.10.0-SNAPSHOT?
> > >> How will clients handle the fact that some 0.10.0-SNAPSHOT will have
> > >> different protocol than others (because we modify the protocol
> > >> multiple times between releases)?
> > >>
> > >> Gwen
> > >>
> > >> On Thu, Mar 10, 2016 at 1:52 PM, Magnus Edenhill 
> > >> wrote:
> > >> > Hi all,
> > >> >
> > >> > sorry for joining late in the game, the carribean got in the way.
> > >> >
> > >> > My thoughts:
> > >> >
> > >> > There is no way around the chicken problem, so the sooner we can
> > >> > add protocol versioning functionality the better and we'll add
> > heuristics
> > >> > in clients to
> > >> > handle the migration period (e.g, what Dana has done in
> kafka-python).
> > >> > The focus at this point should be to mitigate the core issue (allow
> > >> clients
> > >> > to know what is supported)
> > >> > in the least intrusive way. Hopefully we can redesign the protocol
> in
> > the
> > >> > future to add proper
> > >> > response headers, etc.
> > >> >
> > >> > I'm with Data that reusing the broker version as a protocol version
> > will
> > >> > work just fine and
> > >> > saves us from administrating another version.
> > >> > From a client's perspective an explicit protocol version doesn't
> > really
> > >> add
> > >> > any value.
> > >> > I'd rather maintain a mapping of actual broker versions to supported
> > >> > protocol requests rather than
> > >> > some independent protocol version that still needs to be translated
> > to a
> > >> > broker version for
> > >> > proper code maintainability / error messages / etc.
> > >> >
> > >> >
> > >> > Thus my suggestion is in line with some of the previous speakers,
> > that is
> > >> > is to keep things
> > >> > simple and bump the MetadataRequest version to 1 by adding a
> > >> VersionString
> > >> > ("0.9.1.0")
> > >> > and VersionInt (0x00090100) field to the response.
> > >> > These fields return version information for the 

[jira] [Updated] (KAFKA-3380) Add system test for GetOffsetShell tool

2016-03-11 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-3380:

   Resolution: Fixed
Fix Version/s: 0.10.0.0
   Status: Resolved  (was: Patch Available)

Issue resolved by pull request 1048
[https://github.com/apache/kafka/pull/1048]

> Add system test for GetOffsetShell tool
> ---
>
> Key: KAFKA-3380
> URL: https://issues.apache.org/jira/browse/KAFKA-3380
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ashish K Singh
>Assignee: Ashish K Singh
> Fix For: 0.10.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-35 - Retrieve protocol version

2016-03-11 Thread Magnus Edenhill
I'm not sure supporting specific interim versions between releases are
really that big of a concern,
for a start the protocol changes may be in flux and not settle until the
formal release, secondly
the 3rd party clients typically lag behind at least until the formal
release before they implement support (for the first stated reason..).
But this is still a good point and if we could use the version fields to
specify a point between
two formal releases then that would be useful to ease client development
during that period.
Grabbing 0.10.0 from versionInt and "IV" from versionString is an
acceptable solution as long
as there is some way for a client to distinguish the formal release.


/Magnus




2016-03-11 20:27 GMT+01:00 Gwen Shapira :

> Yeah, I'm not sure that 0.10.0-IV1 and 0.10.0-IV2 is what Magnus had
> in mind when he was advocating for release versions in the protocol.
>
> But - if we serialize both the string and the integer Id of ApiVersion
> into the Metadata object, I think both Magnus and Jay will be happy :)
>
> Gwen
>
> On Fri, Mar 11, 2016 at 11:22 AM, Ismael Juma  wrote:
> > We introduced a way to bump the API version in between releases as part
> of
> > the KIP-31/KIP-32 by the way. Extending that could maybe work. Take a
> look
> > at the ApiVersion class and its documentation.
> >
> > Ismael
> > On 11 Mar 2016 19:06, "Gwen Shapira"  wrote:
> >
> >> Magnus,
> >>
> >> If we go with release version as protocol version (which I agree is
> >> much more user-friendly) - what will be the release version on trunk?
> >> 0.10.0-SNAPSHOT?
> >> How will clients handle the fact that some 0.10.0-SNAPSHOT will have
> >> different protocol than others (because we modify the protocol
> >> multiple times between releases)?
> >>
> >> Gwen
> >>
> >> On Thu, Mar 10, 2016 at 1:52 PM, Magnus Edenhill 
> >> wrote:
> >> > Hi all,
> >> >
> >> > sorry for joining late in the game, the carribean got in the way.
> >> >
> >> > My thoughts:
> >> >
> >> > There is no way around the chicken problem, so the sooner we can
> >> > add protocol versioning functionality the better and we'll add
> heuristics
> >> > in clients to
> >> > handle the migration period (e.g, what Dana has done in kafka-python).
> >> > The focus at this point should be to mitigate the core issue (allow
> >> clients
> >> > to know what is supported)
> >> > in the least intrusive way. Hopefully we can redesign the protocol in
> the
> >> > future to add proper
> >> > response headers, etc.
> >> >
> >> > I'm with Data that reusing the broker version as a protocol version
> will
> >> > work just fine and
> >> > saves us from administrating another version.
> >> > From a client's perspective an explicit protocol version doesn't
> really
> >> add
> >> > any value.
> >> > I'd rather maintain a mapping of actual broker versions to supported
> >> > protocol requests rather than
> >> > some independent protocol version that still needs to be translated
> to a
> >> > broker version for
> >> > proper code maintainability / error messages / etc.
> >> >
> >> >
> >> > Thus my suggestion is in line with some of the previous speakers,
> that is
> >> > is to keep things
> >> > simple and bump the MetadataRequest version to 1 by adding a
> >> VersionString
> >> > ("0.9.1.0")
> >> > and VersionInt (0x00090100) field to the response.
> >> > These fields return version information for the current connection's
> >> broker
> >> > only, not for other broker's
> >> > in the cluster:
> >> > Providing version information for other brokers doesn't really serve
> any
> >> > purpose:
> >> >  a) the information is cached by the responding broker so it might be
> >> > outdated ( = cant be trusted)
> >> >  b) by the time the client connects to a given broker it might have
> >> upgraded
> >> >
> >> > This means that a client (that is interested in protocol versioning)
> will
> >> > need to query each
> >> > connection's version any way. Since MetadataRequets are typically
> already
> >> > sent on connection set up
> >> > this seems to be the proper place to put it.
> >> >
> >> > The MetadataRequest semantics should also be extended to allow asking
> >> only
> >> > for cluster and version information,
> >> > but not the topic list since this might have negative performance
> impact
> >> on
> >> > large clusters with many topics.
> >> > One way to achieve this would be to provide one single Null topic in
> the
> >> > request (length=-1).
> >> >
> >> > Sending a new Metadata V1 request to an old broker will cause the
> >> > connection to be closed and
> >> > the client will need to use this as a heuristic to downgrade its
> protocol
> >> > ambitions to an older version
> >> > (either by some default value or by user configuration).
> >> >
> >> >
> >> > /Magnus
> >> >
> >> >
> >> > 2016-03-10 20:04 GMT+01:00 Ashish Singh :
> >> >
> >> >> @Magnus,
> >> >>
> >> >> Does the latest suggestion 

[jira] [Created] (KAFKA-3387) Update GetOffsetShell tool to not rely on old producer.

2016-03-11 Thread Ashish K Singh (JIRA)
Ashish K Singh created KAFKA-3387:
-

 Summary: Update GetOffsetShell tool to not rely on old producer.
 Key: KAFKA-3387
 URL: https://issues.apache.org/jira/browse/KAFKA-3387
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ashish K Singh
Assignee: Ashish K Singh






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3380) Add system test for GetOffsetShell tool

2016-03-11 Thread Ashish K Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish K Singh updated KAFKA-3380:
--
Status: Patch Available  (was: Open)

> Add system test for GetOffsetShell tool
> ---
>
> Key: KAFKA-3380
> URL: https://issues.apache.org/jira/browse/KAFKA-3380
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ashish K Singh
>Assignee: Ashish K Singh
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: KAFKA-3381: Add system test for SimpleConsumer...

2016-03-11 Thread SinghAsDev
GitHub user SinghAsDev opened a pull request:

https://github.com/apache/kafka/pull/1053

KAFKA-3381: Add system test for SimpleConsumerShell



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/SinghAsDev/kafka KAFKA-3381

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1053.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1053


commit 7ac3a93798f02516f132773739de27d8055f74a6
Author: Ashish Singh 
Date:   2016-03-10T22:59:38Z

KAFKA-3381: Add system test for SimpleConsumerShell




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-3379) Update tools relying on old producer to use new producer.

2016-03-11 Thread Ashish K Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish K Singh updated KAFKA-3379:
--
Description: 
Following tools are using old producer.

* ReplicationVerificationTool
* SimpleConsumerShell
* GetOffsetShell

Old producer is being marked as deprecated in 0.10. These tools should be 
updated to use new producer. To make sure that this update does not break 
existing behavior. Below is the action plan.

For each tool that uses old producer.
* Add ducktape test to establish current behavior.
* Once the tests are committed and run fine, add patch for modification of 
these tools. The ducktape tests added in previous step should confirm that 
existing behavior is still intact.

  was:
Following tools are using old producer.

* ReplicationVerificationTool
* SimpleConsumerShell
* GetOffsetShell

Old producer is being marked as deprecated in 0.10. These tools should be 
updated to use new producer. To make sure that this update does not break 
existing behavior. Below is the action plan.

For each tool that uses old producer.
* Add ducktape test to establish current behavior.
* Once the tests are committed and run fine, add patch for modification of 
these tools. The ducktape tests added in previous step should conform that 
existing behavior is still intact.


> Update tools relying on old producer to use new producer.
> -
>
> Key: KAFKA-3379
> URL: https://issues.apache.org/jira/browse/KAFKA-3379
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ashish K Singh
>Assignee: Ashish K Singh
>
> Following tools are using old producer.
> * ReplicationVerificationTool
> * SimpleConsumerShell
> * GetOffsetShell
> Old producer is being marked as deprecated in 0.10. These tools should be 
> updated to use new producer. To make sure that this update does not break 
> existing behavior. Below is the action plan.
> For each tool that uses old producer.
> * Add ducktape test to establish current behavior.
> * Once the tests are committed and run fine, add patch for modification of 
> these tools. The ducktape tests added in previous step should confirm that 
> existing behavior is still intact.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log

2016-03-11 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191427#comment-15191427
 ] 

Guozhang Wang commented on KAFKA-3047:
--

Hi [~mmakowski], thanks for reporting this. I'm curious how you encountered 
this issue, since currently {{append(messageSet, assignOffsets = false}} is 
only called by the replica fetcher thread, in which 
{{nextOffsetMetadata.messageOffset}} and {{messageSet}}'s offset should be 
usually consistent. Just trying to see if it some other scenarios to lead to 
this issue?

> Explicit offset assignment in Log.append can corrupt the log
> 
>
> Key: KAFKA-3047
> URL: https://issues.apache.org/jira/browse/KAFKA-3047
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.9.0.0
>Reporter: Maciek Makowski
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, 
> should cause Kafka to use the offsets specified in the 
> {{ByteBufferMessageSet}} and not recalculate them based on 
> {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} 
> is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can 
> cause corruption of the log in the following scenario:
> * {{nextOffsetMetadata.messageOffset}} is 2001
> * {{append(messageSet, assignOffsets = false)}} is called, where 
> {{messageSet}} contains offsets 1001...1500 
> * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, 
> {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500
> * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, 
> {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500
> * consistency check {{if(!appendInfo.offsetsMonotonic || 
> appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the 
> second condition can never fail due to unconditional assignment) and writing 
> proceeds
> * the message set is appended to current log segment starting at offset 2001, 
> but the offsets in the set are 1001...1500
> * the system shuts down abruptly
> * on restart, the following unrecoverable error is reported: 
> {code}
> Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to 
> append an offset (1001) to position 12345 no larger than the last offset 
> appended (1950) to xyz/.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>   at java.lang.Thread.run(Thread.java:722)
> {code} 
> *Proposed fix:* the assignment {{appendInfo.firstOffset = 
> nextOffsetMetadata.messageOffset}} should only happen in {{if 
> (assignOffsets)}} branch of code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2960) DelayedProduce may cause message loss during repeated leader change

2016-03-11 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-2960:
--
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Issue resolved by pull request 1018
[https://github.com/apache/kafka/pull/1018]

> DelayedProduce may cause message loss during repeated leader change
> ---
>
> Key: KAFKA-2960
> URL: https://issues.apache.org/jira/browse/KAFKA-2960
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Xing Huang
>Assignee: Jiangjie Qin
>Priority: Critical
> Fix For: 0.10.0.0
>
>
> related to #KAFKA-1148
> When a leader replica became follower then leader again, it may truncated its 
> log as follower. But the second time it became leader, its ISR may shrink and 
> if at this moment new messages were appended, the DelayedProduce generated 
> when it was leader the first time may be satisfied, and the client will 
> receive a response with no error. But, actually the messages were lost. 
> We simulated this scene, which proved the message lose could happen. And it 
> seems to be the reason for a data lose recently happened to us according to 
> broker logs and client logs.
> I think we should check the leader epoch when send a response, or satisfy 
> DelayedProduce when leader change as described in #KAFKA-1148.
> And we may need an new error code to inform the producer about this error. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1148) Delayed fetch/producer requests should be satisfied on a leader change

2016-03-11 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy resolved KAFKA-1148.
---
   Resolution: Fixed
Fix Version/s: 0.10.0.0

Issue resolved by pull request 1018
[https://github.com/apache/kafka/pull/1018]

> Delayed fetch/producer requests should be satisfied on a leader change
> --
>
> Key: KAFKA-1148
> URL: https://issues.apache.org/jira/browse/KAFKA-1148
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
> Fix For: 0.10.0.0
>
>
> Somewhat related to KAFKA-1016.
> This would be an issue only if max.wait is set to a very high value. When a 
> leader change occurs we should remove the delayed request from the purgatory 
> - either satisfy with error/expire - whichever makes more sense.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2960) DelayedProduce may cause message loss during repeated leader change

2016-03-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191416#comment-15191416
 ] 

ASF GitHub Bot commented on KAFKA-2960:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1018


> DelayedProduce may cause message loss during repeated leader change
> ---
>
> Key: KAFKA-2960
> URL: https://issues.apache.org/jira/browse/KAFKA-2960
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Xing Huang
>Assignee: Jiangjie Qin
>Priority: Critical
> Fix For: 0.10.0.0
>
>
> related to #KAFKA-1148
> When a leader replica became follower then leader again, it may truncated its 
> log as follower. But the second time it became leader, its ISR may shrink and 
> if at this moment new messages were appended, the DelayedProduce generated 
> when it was leader the first time may be satisfied, and the client will 
> receive a response with no error. But, actually the messages were lost. 
> We simulated this scene, which proved the message lose could happen. And it 
> seems to be the reason for a data lose recently happened to us according to 
> broker logs and client logs.
> I think we should check the leader epoch when send a response, or satisfy 
> DelayedProduce when leader change as described in #KAFKA-1148.
> And we may need an new error code to inform the producer about this error. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-35 - Retrieve protocol version

2016-03-11 Thread Ismael Juma
We introduced a way to bump the API version in between releases as part of
the KIP-31/KIP-32 by the way. Extending that could maybe work. Take a look
at the ApiVersion class and its documentation.

Ismael
On 11 Mar 2016 19:06, "Gwen Shapira"  wrote:

> Magnus,
>
> If we go with release version as protocol version (which I agree is
> much more user-friendly) - what will be the release version on trunk?
> 0.10.0-SNAPSHOT?
> How will clients handle the fact that some 0.10.0-SNAPSHOT will have
> different protocol than others (because we modify the protocol
> multiple times between releases)?
>
> Gwen
>
> On Thu, Mar 10, 2016 at 1:52 PM, Magnus Edenhill 
> wrote:
> > Hi all,
> >
> > sorry for joining late in the game, the carribean got in the way.
> >
> > My thoughts:
> >
> > There is no way around the chicken problem, so the sooner we can
> > add protocol versioning functionality the better and we'll add heuristics
> > in clients to
> > handle the migration period (e.g, what Dana has done in kafka-python).
> > The focus at this point should be to mitigate the core issue (allow
> clients
> > to know what is supported)
> > in the least intrusive way. Hopefully we can redesign the protocol in the
> > future to add proper
> > response headers, etc.
> >
> > I'm with Data that reusing the broker version as a protocol version will
> > work just fine and
> > saves us from administrating another version.
> > From a client's perspective an explicit protocol version doesn't really
> add
> > any value.
> > I'd rather maintain a mapping of actual broker versions to supported
> > protocol requests rather than
> > some independent protocol version that still needs to be translated to a
> > broker version for
> > proper code maintainability / error messages / etc.
> >
> >
> > Thus my suggestion is in line with some of the previous speakers, that is
> > is to keep things
> > simple and bump the MetadataRequest version to 1 by adding a
> VersionString
> > ("0.9.1.0")
> > and VersionInt (0x00090100) field to the response.
> > These fields return version information for the current connection's
> broker
> > only, not for other broker's
> > in the cluster:
> > Providing version information for other brokers doesn't really serve any
> > purpose:
> >  a) the information is cached by the responding broker so it might be
> > outdated ( = cant be trusted)
> >  b) by the time the client connects to a given broker it might have
> upgraded
> >
> > This means that a client (that is interested in protocol versioning) will
> > need to query each
> > connection's version any way. Since MetadataRequets are typically already
> > sent on connection set up
> > this seems to be the proper place to put it.
> >
> > The MetadataRequest semantics should also be extended to allow asking
> only
> > for cluster and version information,
> > but not the topic list since this might have negative performance impact
> on
> > large clusters with many topics.
> > One way to achieve this would be to provide one single Null topic in the
> > request (length=-1).
> >
> > Sending a new Metadata V1 request to an old broker will cause the
> > connection to be closed and
> > the client will need to use this as a heuristic to downgrade its protocol
> > ambitions to an older version
> > (either by some default value or by user configuration).
> >
> >
> > /Magnus
> >
> >
> > 2016-03-10 20:04 GMT+01:00 Ashish Singh :
> >
> >> @Magnus,
> >>
> >> Does the latest suggestion sound OK to you. I am planning to update PR
> >> based on latest suggestion.
> >>
> >> On Mon, Mar 7, 2016 at 10:58 AM, Ashish Singh 
> wrote:
> >>
> >> >
> >> >
> >> > On Fri, Mar 4, 2016 at 5:46 PM, Jay Kreps  wrote:
> >> >
> >> >> Hey Ashish,
> >> >>
> >> >> Both good points.
> >> >>
> >> >> I think the issue with the general metadata request is the same as
> the
> >> >> issue with a version-specific metadata request from the other
> >> >> proposal--basically it's a chicken and egg problem, to find out
> anything
> >> >> about the cluster you have to be able to communicate something in a
> >> format
> >> >> the server can understand without knowing a priori what version it's
> >> on. I
> >> >> guess the question is how can you continue to evolve the metadata
> >> request
> >> >> (whether it is the existing metadata or a protocol-version specific
> >> >> metadata request) given that you need this information to bootstrap
> you
> >> >> have to be more careful in how that request evolves.
> >> >>
> >> > You are correct. It's just that protocol version request would be very
> >> > specific to retrieve the protocol versions. Changes to protocol
> version
> >> > request itself should be very rare, if at all. However, the general
> >> > metadata request carries a lot more information and its format is more
> >> > probable to evolve. This boils down to higher probability of change
> vs a
> >> > definite network 

[GitHub] kafka pull request: KAFKA-2960: Clear purgatory for partitions bef...

2016-03-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1018


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-2073) Replace TopicMetadata request/response with o.a.k.requests.metadata

2016-03-11 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-2073:

Resolution: Fixed
Status: Resolved  (was: Patch Available)

> Replace TopicMetadata request/response with o.a.k.requests.metadata
> ---
>
> Key: KAFKA-2073
> URL: https://issues.apache.org/jira/browse/KAFKA-2073
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Gwen Shapira
>Assignee: Jason Gustafson
> Fix For: 0.10.0.0
>
>
> Replace TopicMetadata request/response with o.a.k.requests.metadata.
> Note, this is more challenging that it appears because while the wire 
> protocol is identical, the objects are completely different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: KAFKA-2073: migrate to client-side topic metad...

2016-03-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/988


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request: Changes to KafkaApis and MetadataCache based o...

2016-03-11 Thread fpj
GitHub user fpj opened a pull request:

https://github.com/apache/kafka/pull/1052

Changes to KafkaApis and MetadataCache based on KAFKA-2073 

Changes to reduce the latency of topic metadata requests based on the PR of 
KAFKA-2073. This is just to review for now, please do not merge yet.

CC/ @hachikuji @ijuma 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fpj/kafka KAFKA-2073-0.9

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1052.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1052


commit 30e4c89200e1b4112a91bfa3d43c24ddaf0c3162
Author: Flavio Junqueira 
Date:   2016-03-11T18:04:15Z

First cut of the changes to KafkaApis and MetadataCache.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-35 - Retrieve protocol version

2016-03-11 Thread Gwen Shapira
Magnus,

If we go with release version as protocol version (which I agree is
much more user-friendly) - what will be the release version on trunk?
0.10.0-SNAPSHOT?
How will clients handle the fact that some 0.10.0-SNAPSHOT will have
different protocol than others (because we modify the protocol
multiple times between releases)?

Gwen

On Thu, Mar 10, 2016 at 1:52 PM, Magnus Edenhill  wrote:
> Hi all,
>
> sorry for joining late in the game, the carribean got in the way.
>
> My thoughts:
>
> There is no way around the chicken problem, so the sooner we can
> add protocol versioning functionality the better and we'll add heuristics
> in clients to
> handle the migration period (e.g, what Dana has done in kafka-python).
> The focus at this point should be to mitigate the core issue (allow clients
> to know what is supported)
> in the least intrusive way. Hopefully we can redesign the protocol in the
> future to add proper
> response headers, etc.
>
> I'm with Data that reusing the broker version as a protocol version will
> work just fine and
> saves us from administrating another version.
> From a client's perspective an explicit protocol version doesn't really add
> any value.
> I'd rather maintain a mapping of actual broker versions to supported
> protocol requests rather than
> some independent protocol version that still needs to be translated to a
> broker version for
> proper code maintainability / error messages / etc.
>
>
> Thus my suggestion is in line with some of the previous speakers, that is
> is to keep things
> simple and bump the MetadataRequest version to 1 by adding a VersionString
> ("0.9.1.0")
> and VersionInt (0x00090100) field to the response.
> These fields return version information for the current connection's broker
> only, not for other broker's
> in the cluster:
> Providing version information for other brokers doesn't really serve any
> purpose:
>  a) the information is cached by the responding broker so it might be
> outdated ( = cant be trusted)
>  b) by the time the client connects to a given broker it might have upgraded
>
> This means that a client (that is interested in protocol versioning) will
> need to query each
> connection's version any way. Since MetadataRequets are typically already
> sent on connection set up
> this seems to be the proper place to put it.
>
> The MetadataRequest semantics should also be extended to allow asking only
> for cluster and version information,
> but not the topic list since this might have negative performance impact on
> large clusters with many topics.
> One way to achieve this would be to provide one single Null topic in the
> request (length=-1).
>
> Sending a new Metadata V1 request to an old broker will cause the
> connection to be closed and
> the client will need to use this as a heuristic to downgrade its protocol
> ambitions to an older version
> (either by some default value or by user configuration).
>
>
> /Magnus
>
>
> 2016-03-10 20:04 GMT+01:00 Ashish Singh :
>
>> @Magnus,
>>
>> Does the latest suggestion sound OK to you. I am planning to update PR
>> based on latest suggestion.
>>
>> On Mon, Mar 7, 2016 at 10:58 AM, Ashish Singh  wrote:
>>
>> >
>> >
>> > On Fri, Mar 4, 2016 at 5:46 PM, Jay Kreps  wrote:
>> >
>> >> Hey Ashish,
>> >>
>> >> Both good points.
>> >>
>> >> I think the issue with the general metadata request is the same as the
>> >> issue with a version-specific metadata request from the other
>> >> proposal--basically it's a chicken and egg problem, to find out anything
>> >> about the cluster you have to be able to communicate something in a
>> format
>> >> the server can understand without knowing a priori what version it's
>> on. I
>> >> guess the question is how can you continue to evolve the metadata
>> request
>> >> (whether it is the existing metadata or a protocol-version specific
>> >> metadata request) given that you need this information to bootstrap you
>> >> have to be more careful in how that request evolves.
>> >>
>> > You are correct. It's just that protocol version request would be very
>> > specific to retrieve the protocol versions. Changes to protocol version
>> > request itself should be very rare, if at all. However, the general
>> > metadata request carries a lot more information and its format is more
>> > probable to evolve. This boils down to higher probability of change vs a
>> > definite network round-trip for each re/connect. It does sound like, it
>> is
>> > better to avoid a definite penalty than to avoid a probable rare issue.
>> >
>> >>
>> >> I think deprecation/removal may be okay. Ultimately clients will always
>> >> use
>> >> the highest possible version of the protocol the server supports so if
>> >> we've already deprecated and removed your highest version then you are
>> >> screwed and you're going to get an error no matter what, right?
>> Basically
>> >> there is nothing dynamic you can do in 

[GitHub] kafka pull request: MINOR: update compression design doc to includ...

2016-03-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1040


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3202) Add system test for KIP-31 and KIP-32 - Change message format version on the fly

2016-03-11 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191327#comment-15191327
 ] 

Eno Thereska commented on KAFKA-3202:
-

Thank you [~becket_qin], I'll give it a go.

> Add system test for KIP-31 and KIP-32 - Change message format version on the 
> fly
> 
>
> Key: KAFKA-3202
> URL: https://issues.apache.org/jira/browse/KAFKA-3202
> Project: Kafka
>  Issue Type: Sub-task
>  Components: system tests
>Reporter: Jiangjie Qin
>Assignee: Eno Thereska
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> The system test should cover the case that message format changes are made 
> when clients are producing/consuming. The message format change should not 
> cause client side issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3188) Add system test for KIP-31 and KIP-32 - Compatibility Test

2016-03-11 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191313#comment-15191313
 ] 

Eno Thereska commented on KAFKA-3188:
-

[~apovzner] I'll help and see how far I get with this. Thanks.

> Add system test for KIP-31 and KIP-32 - Compatibility Test
> --
>
> Key: KAFKA-3188
> URL: https://issues.apache.org/jira/browse/KAFKA-3188
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 0.9.0.0
>Reporter: Jiangjie Qin
>Assignee: Eno Thereska
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> The integration test should test the compatibility between 0.10.0 broker with 
> clients on older versions. The clients version should include 0.9.0 and 0.8.x.
> We already cover 0.10 brokers with old producers/consumers in upgrade tests. 
> So, the main thing to test is a mix of 0.9 and 0.10 producers and consumers. 
> E.g., test1: 0.9 producer/0.10 consumer and then test2: 0.10 producer/0.9 
> consumer. And then, each of them: compression/no compression (like in upgrade 
> test). And we could probably add another dimension : topic configured with 
> CreateTime (default) and LogAppendTime. So, total 2x2x2 combinations (but 
> maybe can reduce that — eg. do LogAppendTime with compression only).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3188) Add system test for KIP-31 and KIP-32 - Compatibility Test

2016-03-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska reassigned KAFKA-3188:
---

Assignee: Eno Thereska  (was: Anna Povzner)

> Add system test for KIP-31 and KIP-32 - Compatibility Test
> --
>
> Key: KAFKA-3188
> URL: https://issues.apache.org/jira/browse/KAFKA-3188
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 0.9.0.0
>Reporter: Jiangjie Qin
>Assignee: Eno Thereska
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> The integration test should test the compatibility between 0.10.0 broker with 
> clients on older versions. The clients version should include 0.9.0 and 0.8.x.
> We already cover 0.10 brokers with old producers/consumers in upgrade tests. 
> So, the main thing to test is a mix of 0.9 and 0.10 producers and consumers. 
> E.g., test1: 0.9 producer/0.10 consumer and then test2: 0.10 producer/0.9 
> consumer. And then, each of them: compression/no compression (like in upgrade 
> test). And we could probably add another dimension : topic configured with 
> CreateTime (default) and LogAppendTime. So, total 2x2x2 combinations (but 
> maybe can reduce that — eg. do LogAppendTime with compression only).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3190) KafkaProducer should not invoke callback in send()

2016-03-11 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-3190:

Priority: Blocker  (was: Critical)

> KafkaProducer should not invoke callback in send()
> --
>
> Key: KAFKA-3190
> URL: https://issues.apache.org/jira/browse/KAFKA-3190
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.9.0.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> Currently KafkaProducer will invoke callback.onComplete() if it receives an 
> ApiException during send(). This breaks the guarantee that callback will be 
> invoked in order. It seems ApiException in send() only comes from metadata 
> refresh. If so, we can probably simply throw it instead of invoking 
> callback().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3202) Add system test for KIP-31 and KIP-32 - Change message format version on the fly

2016-03-11 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191286#comment-15191286
 ] 

Jiangjie Qin commented on KAFKA-3202:
-

[~ijuma] Apologies that I somehow missed your poke...
[~enothereska] I was thinking about the following test sequence:
1. Have both 0.9 and 0.10 producer and consumer produce to and consume from two 
different topics.
2. initially the both topics are using message format 0.9.0
3. change the message format version for both topic to 0.10.0 on the fly.
4. change the message format version for both topic back to 0.9.0 on the fly.
The producers and consumers should not have any issue.

> Add system test for KIP-31 and KIP-32 - Change message format version on the 
> fly
> 
>
> Key: KAFKA-3202
> URL: https://issues.apache.org/jira/browse/KAFKA-3202
> Project: Kafka
>  Issue Type: Sub-task
>  Components: system tests
>Reporter: Jiangjie Qin
>Assignee: Eno Thereska
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> The system test should cover the case that message format changes are made 
> when clients are producing/consuming. The message format change should not 
> cause client side issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3215) controller may not be started when there are multiple ZK session expirations

2016-03-11 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-3215.

   Resolution: Fixed
Fix Version/s: 0.9.0.0

> controller may not be started when there are multiple ZK session expirations
> 
>
> Key: KAFKA-3215
> URL: https://issues.apache.org/jira/browse/KAFKA-3215
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jun Rao
>Assignee: Flavio Junqueira
>  Labels: controller
> Fix For: 0.9.0.0
>
>
> Suppose that broker 1 is the controller and it has 2 consecutive ZK session 
> expirations. In this case, two ZK session expiration events will be fired.
> 1. When handling the first ZK session expiration event, 
> SessionExpirationListener.handleNewSession() can elect broker 1 itself as the 
> new controller and initialize the states properly.
> 2. When handling the second ZK session expiration event, 
> SessionExpirationListener.handleNewSession() first calls 
> onControllerResignation(), which will set ReplicaStateMachine.hasStarted to 
> false. It then continues to do controller election in 
> ZookeeperLeaderElector.elect() and try to create the controller node in ZK. 
> This will fail since broker 1 has already registered itself as the controller 
> node in ZK. In this case, we simply ignore the failure to create the 
> controller node since we assume the controller must be in another broker. 
> However, in this case, the controller is broker 1 itself, but the 
> ReplicaStateMachine.hasStarted is still false.
> 3. Now, if a new broker event is fired, we will be ignoring the event in 
> BrokerChangeListener.handleChildChange since ReplicaStateMachine.hasStarted 
> is false. Now, we are in a situation that a controller is alive, but won't 
> react to any broker change event.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3361) Initial protocol documentation page and generation

2016-03-11 Thread Grant Henke (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke updated KAFKA-3361:
---
   Resolution: Fixed
Fix Version/s: 0.9.0.0
   Status: Resolved  (was: Patch Available)

> Initial protocol documentation page and generation
> --
>
> Key: KAFKA-3361
> URL: https://issues.apache.org/jira/browse/KAFKA-3361
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Grant Henke
> Fix For: 0.10.0.0, 0.9.0.0
>
>
> Add an initial rough draft page to the official documentation. The output 
> will be mostly generated from code, ensuring the docs are accurate and up to 
> date.  This is likely to be a separate page due to the size of the content.
> The idea here is that something is better than nothing. Other jiras will 
> track needed improvements. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3361) Initial protocol documentation page and generation

2016-03-11 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191276#comment-15191276
 ] 

Jiangjie Qin commented on KAFKA-3361:
-

[~granthenke] [~gwenshap] Thanks for fixing this :) BTW, I really like this 
patch!

> Initial protocol documentation page and generation
> --
>
> Key: KAFKA-3361
> URL: https://issues.apache.org/jira/browse/KAFKA-3361
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Grant Henke
> Fix For: 0.10.0.0
>
>
> Add an initial rough draft page to the official documentation. The output 
> will be mostly generated from code, ensuring the docs are accurate and up to 
> date.  This is likely to be a separate page due to the size of the content.
> The idea here is that something is better than nothing. Other jiras will 
> track needed improvements. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (KAFKA-3386) Need to log "Rejected connection" as WARNING message

2016-03-11 Thread Xiaomin Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaomin Zhang closed KAFKA-3386.


closing out which was duplicated with KAFKA-3386

> Need to log "Rejected connection" as WARNING message
> 
>
> Key: KAFKA-3386
> URL: https://issues.apache.org/jira/browse/KAFKA-3386
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Xiaomin Zhang
>Priority: Minor
>
> We may found below INFO messages in the log due to inappropriate 
> configuration:
> INFO kafka.network. Acceptor: Rejected connection from /, address already 
> has the configured maximum of 10 connections.
> It will make more sense for Kafka to report above message as "WARN", not just 
> "INFO", as it truly indicates something need to check against. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3215) controller may not be started when there are multiple ZK session expirations

2016-03-11 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15190794#comment-15190794
 ] 

Flavio Junqueira edited comment on KAFKA-3215 at 3/11/16 4:34 PM:
--

[~junrao] Let me see if I understand this issue correctly.

bq. broker 1 is the controller and it has 2 consecutive ZK session expirations

As I understand it, one possible run that reflects this is the following:

# zkclient creates a session S1
# S1 session expires
# zkclient queues the session expiration event to deliver to the kafka broker
# zkclient creates a new session S2
# S2 expires
# zkclient queues the session expiration for S2 and the event for S1 still 
hasn't been delivered
# zkclient creates a third session S3
# broker 1 processes the session expiration of S1
# broker 1 successfully elects itself leader/controller in session S3
# broker 1 processes session expiration for S2

After this last step, the broker is messed up because the replica state machine 
isn't properly initialized. Also, the broker won't give up leadership because 
the ephemeral has been created in the current session.

I think this was a problem in 0.8.2, but not a problem in 0.9 because we fixed 
it in KAFKA-1387. With ZKWatchedEphemeral, in the case we get that the znode 
exists while creating it, we check if the existing znode has the same session 
owner, in which case the operation returns ok and the controller becomes 
leader. Does it make sense?


was (Author: fpj):
[~junrao] Let me see if I understand this issue correctly.

bq. broker 1 is the controller and it has 2 consecutive ZK session expirations

As I understand this, one possible run that reflects this is the following:

# zkclient creates a session S1
# S1 session expires
# zkclient queues the session expiration event to deliver to the kafka broker
# zkclient creates a new session S2
# S2 expires
# zkclient queues the session expiration for S2 and the event for S1 still 
hasn't been delivered
# zkclient creates a third session S3
# broker 1 processes the session expiration of S1
# broker 1 successfully elects itself leader/controller in session S3
# broker 1 processes session expiration for S2

After this last step, broker S2 is messed up because the replica state machine 
isn't properly initialized. Also, the broker won't give up leadership because 
the ephemeral has been created in the current session.

I think this was a problem in 0.8.2, but not a problem in 0.9 because we fixed 
it in KAFKA-1387. With ZKWatchedEphemeral, in the case we get that the znode 
exists while creating it, we check if the existing znode has the same session 
owner, in which case the operation returns ok and the controller becomes 
leader. Does it make sense?

> controller may not be started when there are multiple ZK session expirations
> 
>
> Key: KAFKA-3215
> URL: https://issues.apache.org/jira/browse/KAFKA-3215
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jun Rao
>Assignee: Flavio Junqueira
>  Labels: controller
>
> Suppose that broker 1 is the controller and it has 2 consecutive ZK session 
> expirations. In this case, two ZK session expiration events will be fired.
> 1. When handling the first ZK session expiration event, 
> SessionExpirationListener.handleNewSession() can elect broker 1 itself as the 
> new controller and initialize the states properly.
> 2. When handling the second ZK session expiration event, 
> SessionExpirationListener.handleNewSession() first calls 
> onControllerResignation(), which will set ReplicaStateMachine.hasStarted to 
> false. It then continues to do controller election in 
> ZookeeperLeaderElector.elect() and try to create the controller node in ZK. 
> This will fail since broker 1 has already registered itself as the controller 
> node in ZK. In this case, we simply ignore the failure to create the 
> controller node since we assume the controller must be in another broker. 
> However, in this case, the controller is broker 1 itself, but the 
> ReplicaStateMachine.hasStarted is still false.
> 3. Now, if a new broker event is fired, we will be ignoring the event in 
> BrokerChangeListener.handleChildChange since ReplicaStateMachine.hasStarted 
> is false. Now, we are in a situation that a controller is alive, but won't 
> react to any broker change event.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3386) Need to log "Rejected connection" as WARNING message

2016-03-11 Thread Xiaomin Zhang (JIRA)
Xiaomin Zhang created KAFKA-3386:


 Summary: Need to log "Rejected connection" as WARNING message
 Key: KAFKA-3386
 URL: https://issues.apache.org/jira/browse/KAFKA-3386
 Project: Kafka
  Issue Type: Improvement
Reporter: Xiaomin Zhang
Priority: Minor


We may found below INFO messages in the log due to inappropriate configuration:
INFO kafka.network. Acceptor: Rejected connection from /, address already 
has the configured maximum of 10 connections.

It will make more sense for Kafka to report above message as "WARN", not just 
"INFO", as it truly indicates something need to check against. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3385) Need to log "Rejected connection" as WARNING message

2016-03-11 Thread Xiaomin Zhang (JIRA)
Xiaomin Zhang created KAFKA-3385:


 Summary: Need to log "Rejected connection" as WARNING message
 Key: KAFKA-3385
 URL: https://issues.apache.org/jira/browse/KAFKA-3385
 Project: Kafka
  Issue Type: Improvement
Reporter: Xiaomin Zhang
Priority: Minor


We may found below INFO messages in the log due to inappropriate configuration:
INFO kafka.network. Acceptor: Rejected connection from /, address already 
has the configured maximum of 10 connections.

It will make more sense for Kafka to report above message as "WARN", not just 
"INFO", as it truly indicates something need to check against. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3215) controller may not be started when there are multiple ZK session expirations

2016-03-11 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191094#comment-15191094
 ] 

Jun Rao commented on KAFKA-3215:


[~fpj], thanks for the analysis. Yes, it does seems that this issue is fixed in 
0.9.0.

> controller may not be started when there are multiple ZK session expirations
> 
>
> Key: KAFKA-3215
> URL: https://issues.apache.org/jira/browse/KAFKA-3215
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jun Rao
>Assignee: Flavio Junqueira
>  Labels: controller
>
> Suppose that broker 1 is the controller and it has 2 consecutive ZK session 
> expirations. In this case, two ZK session expiration events will be fired.
> 1. When handling the first ZK session expiration event, 
> SessionExpirationListener.handleNewSession() can elect broker 1 itself as the 
> new controller and initialize the states properly.
> 2. When handling the second ZK session expiration event, 
> SessionExpirationListener.handleNewSession() first calls 
> onControllerResignation(), which will set ReplicaStateMachine.hasStarted to 
> false. It then continues to do controller election in 
> ZookeeperLeaderElector.elect() and try to create the controller node in ZK. 
> This will fail since broker 1 has already registered itself as the controller 
> node in ZK. In this case, we simply ignore the failure to create the 
> controller node since we assume the controller must be in another broker. 
> However, in this case, the controller is broker 1 itself, but the 
> ReplicaStateMachine.hasStarted is still false.
> 3. Now, if a new broker event is fired, we will be ignoring the event in 
> BrokerChangeListener.handleChildChange since ReplicaStateMachine.hasStarted 
> is false. Now, we are in a situation that a controller is alive, but won't 
> react to any broker change event.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: KAFKA-3371: ClientCompatibilityTest system tes...

2016-03-11 Thread enothereska
GitHub user enothereska opened a pull request:

https://github.com/apache/kafka/pull/1051

KAFKA-3371: ClientCompatibilityTest system test failing

@becketqin have a look if this looks reasonable to you. Thanks.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka kafka-3371

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1051.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1051


commit d18a35556c98ca116e5bee0c4c56bbe6d80d2463
Author: Eno Thereska 
Date:   2016-03-11T14:35:50Z

Removed unneeded string




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3371) ClientCompatibilityTest system test failing since KIP-31/KIP-32 was merged

2016-03-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191009#comment-15191009
 ] 

ASF GitHub Bot commented on KAFKA-3371:
---

GitHub user enothereska opened a pull request:

https://github.com/apache/kafka/pull/1051

KAFKA-3371: ClientCompatibilityTest system test failing

@becketqin have a look if this looks reasonable to you. Thanks.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka kafka-3371

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1051.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1051


commit d18a35556c98ca116e5bee0c4c56bbe6d80d2463
Author: Eno Thereska 
Date:   2016-03-11T14:35:50Z

Removed unneeded string




> ClientCompatibilityTest system test failing since KIP-31/KIP-32 was merged
> --
>
> Key: KAFKA-3371
> URL: https://issues.apache.org/jira/browse/KAFKA-3371
> Project: Kafka
>  Issue Type: Test
>Reporter: Ismael Juma
>Assignee: Eno Thereska
>Priority: Blocker
>
> ClientCompatibilityTest system test has been failing since we merged 
> KIP-31/32. We need to fix this for 0.10.0.0. Latest failure below:
> test_id:
> 2016-03-09--001.kafkatest.tests.compatibility_test.ClientCompatibilityTest.test_producer_back_compatibility
> status: FAIL
> run time:   1 minute 4.864 seconds
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-03-09--001.1457539618--apache--trunk--324b0c8/report.html
> cc [~becket_qin]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-3371) ClientCompatibilityTest system test failing since KIP-31/KIP-32 was merged

2016-03-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3371 started by Eno Thereska.
---
> ClientCompatibilityTest system test failing since KIP-31/KIP-32 was merged
> --
>
> Key: KAFKA-3371
> URL: https://issues.apache.org/jira/browse/KAFKA-3371
> Project: Kafka
>  Issue Type: Test
>Reporter: Ismael Juma
>Assignee: Eno Thereska
>Priority: Blocker
>
> ClientCompatibilityTest system test has been failing since we merged 
> KIP-31/32. We need to fix this for 0.10.0.0. Latest failure below:
> test_id:
> 2016-03-09--001.kafkatest.tests.compatibility_test.ClientCompatibilityTest.test_producer_back_compatibility
> status: FAIL
> run time:   1 minute 4.864 seconds
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-03-09--001.1457539618--apache--trunk--324b0c8/report.html
> cc [~becket_qin]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3371) ClientCompatibilityTest system test failing since KIP-31/KIP-32 was merged

2016-03-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska reassigned KAFKA-3371:
---

Assignee: Eno Thereska

> ClientCompatibilityTest system test failing since KIP-31/KIP-32 was merged
> --
>
> Key: KAFKA-3371
> URL: https://issues.apache.org/jira/browse/KAFKA-3371
> Project: Kafka
>  Issue Type: Test
>Reporter: Ismael Juma
>Assignee: Eno Thereska
>Priority: Blocker
>
> ClientCompatibilityTest system test has been failing since we merged 
> KIP-31/32. We need to fix this for 0.10.0.0. Latest failure below:
> test_id:
> 2016-03-09--001.kafkatest.tests.compatibility_test.ClientCompatibilityTest.test_producer_back_compatibility
> status: FAIL
> run time:   1 minute 4.864 seconds
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-03-09--001.1457539618--apache--trunk--324b0c8/report.html
> cc [~becket_qin]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3215) controller may not be started when there are multiple ZK session expirations

2016-03-11 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15190794#comment-15190794
 ] 

Flavio Junqueira edited comment on KAFKA-3215 at 3/11/16 1:21 PM:
--

[~junrao] Let me see if I understand this issue correctly.

bq. broker 1 is the controller and it has 2 consecutive ZK session expirations

As I understand this, one possible run that reflects this is the following:

# zkclient creates a session S1
# S1 session expires
# zkclient queues the session expiration event to deliver to the kafka broker
# zkclient creates a new session S2
# S2 expires
# zkclient queues the session expiration for S2 and the event for S1 still 
hasn't been delivered
# zkclient creates a third session S3
# broker 1 processes the session expiration of S1
# broker 1 successfully elects itself leader/controller in session S3
# broker 1 processes session expiration for S2

After this last step, broker S2 is messed up because the replica state machine 
isn't properly initialized. Also, the broker won't give up leadership because 
the ephemeral has been created in the current session.

I think this was a problem in 0.8.2, but not a problem in 0.9 because we fixed 
it in KAFKA-1387. With ZKWatchedEphemeral, in the case we get that the znode 
exists while creating it, we check if the existing znode has the same session 
owner, in which case the operation returns ok and the controller becomes 
leader. Does it make sense?


was (Author: fpj):
[~junrao] Let me see if I understand this issue correctly.

bq. broker 1 is the controller and it has 2 consecutive ZK session expirations

As I understand this, one possible run that reflects this is the following:

# zkclient creates a session S1
# S1 session expires
# zkclient queues the session expiration event to deliver to the kafka broker
# zkclient creates a new session S2
# S2 expires
# zkclient queues the session expiration for S2 and the event for S1 still 
hasn't been delivered
# zkclient creates a third session S3
# broker 1 processes the session expiration of S1
# broker 1 successfully elects itself leader/controller in session S3
# broker 1 processes session expiration for S2

After this last step, broker S2 is messed up because it thinks the replica 
state machine isn't properly initialized. Also, the broker won't give up 
leadership because the ephemeral has been created in the current session.

I think this was a problem in 0.8.2, but not a problem in 0.9 because we fixed 
it in KAFKA-1387. With ZKWatchedEphemeral, in the case we get that the znode 
exists while creating it, we check if the existing znode has the same session 
owner, in which case the operation returns ok and the controller becomes 
leader. Does it make sense?

> controller may not be started when there are multiple ZK session expirations
> 
>
> Key: KAFKA-3215
> URL: https://issues.apache.org/jira/browse/KAFKA-3215
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jun Rao
>Assignee: Flavio Junqueira
>  Labels: controller
>
> Suppose that broker 1 is the controller and it has 2 consecutive ZK session 
> expirations. In this case, two ZK session expiration events will be fired.
> 1. When handling the first ZK session expiration event, 
> SessionExpirationListener.handleNewSession() can elect broker 1 itself as the 
> new controller and initialize the states properly.
> 2. When handling the second ZK session expiration event, 
> SessionExpirationListener.handleNewSession() first calls 
> onControllerResignation(), which will set ReplicaStateMachine.hasStarted to 
> false. It then continues to do controller election in 
> ZookeeperLeaderElector.elect() and try to create the controller node in ZK. 
> This will fail since broker 1 has already registered itself as the controller 
> node in ZK. In this case, we simply ignore the failure to create the 
> controller node since we assume the controller must be in another broker. 
> However, in this case, the controller is broker 1 itself, but the 
> ReplicaStateMachine.hasStarted is still false.
> 3. Now, if a new broker event is fired, we will be ignoring the event in 
> BrokerChangeListener.handleChildChange since ReplicaStateMachine.hasStarted 
> is false. Now, we are in a situation that a controller is alive, but won't 
> react to any broker change event.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3215) controller may not be started when there are multiple ZK session expirations

2016-03-11 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira reassigned KAFKA-3215:
---

Assignee: Flavio Junqueira

> controller may not be started when there are multiple ZK session expirations
> 
>
> Key: KAFKA-3215
> URL: https://issues.apache.org/jira/browse/KAFKA-3215
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jun Rao
>Assignee: Flavio Junqueira
>  Labels: controller
>
> Suppose that broker 1 is the controller and it has 2 consecutive ZK session 
> expirations. In this case, two ZK session expiration events will be fired.
> 1. When handling the first ZK session expiration event, 
> SessionExpirationListener.handleNewSession() can elect broker 1 itself as the 
> new controller and initialize the states properly.
> 2. When handling the second ZK session expiration event, 
> SessionExpirationListener.handleNewSession() first calls 
> onControllerResignation(), which will set ReplicaStateMachine.hasStarted to 
> false. It then continues to do controller election in 
> ZookeeperLeaderElector.elect() and try to create the controller node in ZK. 
> This will fail since broker 1 has already registered itself as the controller 
> node in ZK. In this case, we simply ignore the failure to create the 
> controller node since we assume the controller must be in another broker. 
> However, in this case, the controller is broker 1 itself, but the 
> ReplicaStateMachine.hasStarted is still false.
> 3. Now, if a new broker event is fired, we will be ignoring the event in 
> BrokerChangeListener.handleChildChange since ReplicaStateMachine.hasStarted 
> is false. Now, we are in a situation that a controller is alive, but won't 
> react to any broker change event.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3215) controller may not be started when there are multiple ZK session expirations

2016-03-11 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15190794#comment-15190794
 ] 

Flavio Junqueira commented on KAFKA-3215:
-

[~junrao] Let me see if I understand this issue correctly.

bq. broker 1 is the controller and it has 2 consecutive ZK session expirations

As I understand this, one possible run that reflects this is the following:

# zkclient creates a session S1
# S1 session expires
# zkclient queues the session expiration event to deliver to the kafka broker
# zkclient creates a new session S2
# S2 expires
# zkclient queues the session expiration for S2 and the event for S1 still 
hasn't been delivered
# zkclient creates a third session S3
# broker 1 processes the session expiration of S1
# broker 1 successfully elects itself leader/controller in session S3
# broker 1 processes session expiration for S2

After this last step, broker S2 is messed up because it thinks the replica 
state machine isn't properly initialized. Also, the broker won't give up 
leadership because the ephemeral has been created in the current session.

I think this was a problem in 0.8.2, but not a problem in 0.9 because we fixed 
it in KAFKA-1387. With ZKWatchedEphemeral, in the case we get that the znode 
exists while creating it, we check if the existing znode has the same session 
owner, in which case the operation returns ok and the controller becomes 
leader. Does it make sense?

> controller may not be started when there are multiple ZK session expirations
> 
>
> Key: KAFKA-3215
> URL: https://issues.apache.org/jira/browse/KAFKA-3215
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jun Rao
>  Labels: controller
>
> Suppose that broker 1 is the controller and it has 2 consecutive ZK session 
> expirations. In this case, two ZK session expiration events will be fired.
> 1. When handling the first ZK session expiration event, 
> SessionExpirationListener.handleNewSession() can elect broker 1 itself as the 
> new controller and initialize the states properly.
> 2. When handling the second ZK session expiration event, 
> SessionExpirationListener.handleNewSession() first calls 
> onControllerResignation(), which will set ReplicaStateMachine.hasStarted to 
> false. It then continues to do controller election in 
> ZookeeperLeaderElector.elect() and try to create the controller node in ZK. 
> This will fail since broker 1 has already registered itself as the controller 
> node in ZK. In this case, we simply ignore the failure to create the 
> controller node since we assume the controller must be in another broker. 
> However, in this case, the controller is broker 1 itself, but the 
> ReplicaStateMachine.hasStarted is still false.
> 3. Now, if a new broker event is fired, we will be ignoring the event in 
> BrokerChangeListener.handleChildChange since ReplicaStateMachine.hasStarted 
> is false. Now, we are in a situation that a controller is alive, but won't 
> react to any broker change event.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)