Re: Brokers are crash due to __consumer_offsets folder are deleted

2016-07-01 Thread Peter Davis
Dear 黄杰斌:

I am guessing your operating system is configured to delete your /tmp directory 
when you restart the server.

You will need to change the "log.dir" property in your broker's 
server.properties file to someplace permanent.  Unfortunately, your data is 
lost unless you had a backup or had configured replication. 

log.dir The directory in which the log data is kept (supplemental for log.dirs 
property)string  /tmp/kafka-logs high


Dear Community: why does log.dir default under /tmp?  It is unsafe as a default.

-Peter


> On Jun 30, 2016, at 11:19 PM, 黄杰斌  wrote:
> 
> Hi All,
> 
> Do you encounter below issue when using kafka_2.11-0.10.0.0?
> All brokers are crash due to __consumer_offsets folder are deleted.
> sample log:
> [2016-06-30 12:46:32,579] FATAL [Replica Manager on Broker 2]: Halting due
> to unrecoverable I/O error while handling produce request:
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log
> '__consumer_offsets-32'
>at kafka.log.Log.append(Log.scala:329)
>at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:443)
>at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:429)
>at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
>at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:237)
>at
> kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>at
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:406)
>at
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:392)
>at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
>at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>at
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:392)
>at
> kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:328)
>at
> kafka.coordinator.GroupMetadataManager.store(GroupMetadataManager.scala:232)
>at
> kafka.coordinator.GroupCoordinator$$anonfun$handleCommitOffsets$9.apply(GroupCoordinator.scala:424)
>at
> kafka.coordinator.GroupCoordinator$$anonfun$handleCommitOffsets$9.apply(GroupCoordinator.scala:424)
>at scala.Option.foreach(Option.scala:257)
>at
> kafka.coordinator.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:424)
>at
> kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:310)
>at kafka.server.KafkaApis.handle(KafkaApis.scala:84)
>at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException:
> /tmp/kafka2-logs/__consumer_offsets-32/.index (No such
> file or directory)
>at java.io.RandomAccessFile.open0(Native Method)
>at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
>at java.io.RandomAccessFile.(RandomAccessFile.java:243)
>at
> kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:286)
>at
> kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:285)
>at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
>at kafka.log.OffsetIndex.resize(OffsetIndex.scala:285)
>at
> kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetIndex.scala:274)
>at
> kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:274)
>at
> kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:274)
>at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
>at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:273)
>at kafka.log.Log.roll(Log.scala:655)
>at kafka.log.Log.maybeRoll(Log.scala:630)
>at kafka.log.Log.append(Log.scala:383)
>... 23 more
> 
> No one remove those folders, and topic __consumer_offsets is handled by
> broker, no one can remove this topic.
> Do you know why this happened? And how to avoid it?
> 
> Best Regards,
> Ben


Build failed in Jenkins: kafka-trunk-jdk8 #731

2016-07-01 Thread Apache Jenkins Server
See 

Changes:

[ismael] MINOR: Typo fix in docs ops

[ismael] MINOR: Warning instead of error if TGT cannot be renewed beyond the 
next

--
[...truncated 1195 lines...]
kafka.server.KafkaConfigTest > testInvalidAdvertisedListenersProtocol STARTED

kafka.server.KafkaConfigTest > testInvalidAdvertisedListenersProtocol PASSED

kafka.server.KafkaConfigTest > testUncleanElectionEnabled STARTED

kafka.server.KafkaConfigTest > testUncleanElectionEnabled PASSED

kafka.server.KafkaConfigTest > testAdvertisePortDefault STARTED

kafka.server.KafkaConfigTest > testAdvertisePortDefault PASSED

kafka.server.KafkaConfigTest > testVersionConfiguration STARTED

kafka.server.KafkaConfigTest > testVersionConfiguration PASSED

kafka.server.KafkaConfigTest > testEqualAdvertisedListenersProtocol STARTED

kafka.server.KafkaConfigTest > testEqualAdvertisedListenersProtocol PASSED

kafka.server.MetadataRequestTest > testReplicaDownResponse STARTED

kafka.server.MetadataRequestTest > testReplicaDownResponse PASSED

kafka.server.MetadataRequestTest > testRack STARTED

kafka.server.MetadataRequestTest > testRack PASSED

kafka.server.MetadataRequestTest > testIsInternal STARTED

kafka.server.MetadataRequestTest > testIsInternal PASSED

kafka.server.MetadataRequestTest > testControllerId STARTED

kafka.server.MetadataRequestTest > testControllerId PASSED

kafka.server.MetadataRequestTest > testAllTopicsRequest STARTED

kafka.server.MetadataRequestTest > testAllTopicsRequest PASSED

kafka.server.MetadataRequestTest > testNoTopicsRequest STARTED

kafka.server.MetadataRequestTest > testNoTopicsRequest PASSED

kafka.server.SimpleFetchTest > testReadFromLog STARTED

kafka.server.SimpleFetchTest > testReadFromLog PASSED

kafka.server.ProduceRequestTest > testSimpleProduceRequest STARTED

kafka.server.ProduceRequestTest > testSimpleProduceRequest PASSED

kafka.server.ProduceRequestTest > testCorruptLz4ProduceRequest STARTED

kafka.server.ProduceRequestTest > testCorruptLz4ProduceRequest PASSED

kafka.server.DelayedOperationTest > testRequestPurge STARTED

kafka.server.DelayedOperationTest > testRequestPurge PASSED

kafka.server.DelayedOperationTest > testRequestExpiry STARTED

kafka.server.DelayedOperationTest > testRequestExpiry PASSED

kafka.server.DelayedOperationTest > testRequestSatisfaction STARTED

kafka.server.DelayedOperationTest > testRequestSatisfaction PASSED

kafka.server.LogRecoveryTest > testHWCheckpointNoFailuresMultipleLogSegments 
STARTED

kafka.server.LogRecoveryTest > testHWCheckpointNoFailuresMultipleLogSegments 
PASSED

kafka.server.LogRecoveryTest > testHWCheckpointWithFailuresMultipleLogSegments 
STARTED

kafka.server.LogRecoveryTest > testHWCheckpointWithFailuresMultipleLogSegments 
PASSED

kafka.server.LogRecoveryTest > testHWCheckpointNoFailuresSingleLogSegment 
STARTED

kafka.server.LogRecoveryTest > testHWCheckpointNoFailuresSingleLogSegment PASSED

kafka.server.LogRecoveryTest > testHWCheckpointWithFailuresSingleLogSegment 
STARTED

kafka.server.LogRecoveryTest > testHWCheckpointWithFailuresSingleLogSegment 
PASSED

kafka.server.AbstractFetcherThreadTest > testConsumerLagRemovedWithPartition 
STARTED

kafka.server.AbstractFetcherThreadTest > testConsumerLagRemovedWithPartition 
PASSED

kafka.server.AbstractFetcherThreadTest > testMetricsRemovedOnShutdown STARTED

kafka.server.AbstractFetcherThreadTest > testMetricsRemovedOnShutdown PASSED

kafka.server.ServerShutdownTest > testCleanShutdownAfterFailedStartup STARTED

kafka.server.ServerShutdownTest > testCleanShutdownAfterFailedStartup PASSED

kafka.server.ServerShutdownTest > testConsecutiveShutdown STARTED

kafka.server.ServerShutdownTest > testConsecutiveShutdown PASSED

kafka.server.ServerShutdownTest > testCleanShutdown STARTED

kafka.server.ServerShutdownTest > testCleanShutdown PASSED

kafka.server.ServerShutdownTest > testCleanShutdownWithDeleteTopicEnabled 
STARTED

kafka.server.ServerShutdownTest > testCleanShutdownWithDeleteTopicEnabled PASSED

kafka.server.ServerStartupTest > testBrokerStateRunningAfterZK STARTED

kafka.server.ServerStartupTest > testBrokerStateRunningAfterZK PASSED

kafka.server.ServerStartupTest > testBrokerCreatesZKChroot STARTED

kafka.server.ServerStartupTest > testBrokerCreatesZKChroot PASSED

kafka.server.ServerStartupTest > testConflictBrokerRegistration STARTED
ERROR: Could not install GRADLE_2_4_RC_2_HOME
java.lang.NullPointerException
at 
hudson.plugins.toolenv.ToolEnvBuildWrapper$1.buildEnvVars(ToolEnvBuildWrapper.java:46)
at hudson.model.AbstractBuild.getEnvironment(AbstractBuild.java:947)
at hudson.plugins.git.GitSCM.getParamExpandedRepos(GitSCM.java:390)
at 
hudson.plugins.git.GitSCM.compareRemoteRevisionWithImpl(GitSCM.java:577)
at hudson.plugins.git.GitSCM.compareRemoteRevisionWith(GitSCM.java:527)
at hudson.scm.SCM.compareRemoteRevisionWith(SCM.java:381)
at 

[GitHub] kafka pull request #1572: KAFKA-3854: Fix issues with new consumer's subsequ...

2016-07-01 Thread vahidhashemian
Github user vahidhashemian closed the pull request at:

https://github.com/apache/kafka/pull/1572


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3854) Subsequent regex subscription calls fail

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359943#comment-15359943
 ] 

ASF GitHub Bot commented on KAFKA-3854:
---

GitHub user vahidhashemian reopened a pull request:

https://github.com/apache/kafka/pull/1572

KAFKA-3854: Fix issues with new consumer's subsequent regex (pattern) 
subscriptions

This patch fixes two issues:
1. Subsequent regex subscriptions fail with the new consumer.
2. Subsequent regex subscriptions would not immediately refresh metadata to 
change the subscription of the new consumer and trigger a rebalance.

The final note on the JIRA stating that a later created topic that matches 
a consumer's subscription pattern would not be assigned to the consumer upon 
creation seems to be as designed. A repeat 
`subscribe()` to the same pattern or some wait time until the next 
automatic metadata refresh would handle that case.

An integration test was also added to verify these issues are fixed with 
this PR.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-3854

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1572.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1572


commit 0860bd7691b87de33393b26373df752b0d2f934e
Author: Vahid Hashemian 
Date:   2016-06-16T21:19:32Z

KAFKA-3854: Fix issues with new consumer's subsequent regex (pattern) 
subscriptions

This patch fixes two issues:
1. Subsequent regex subscriptions fail with the new consumer.
2. Subsequent regex subscriptions would not actually refresh metadata and 
change the subscription of the new consumer nor they would trigger a rebalance.

The final note on the JIRA stating that a later created topic that matches 
a consumer's subscription pattern would not be assigned to the consumer upon 
creation seems to be as designed. A repeat subscribe() to the same pattern 
would be needed to handle that case.
Unit tests for regex subscriptions will be handled in KAFKA-3897.




> Subsequent regex subscription calls fail
> 
>
> Key: KAFKA-3854
> URL: https://issues.apache.org/jira/browse/KAFKA-3854
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>
> There are a couple of issues with regex subscription in the new consumer:
> h6. Subsequent Subscriptions Fail
> When consecutive calls are made to new consumer's [regex 
> subscription|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L850],
>  like below: 
> {code}
> consumer.subscribe(Pattern.compile("..."), listener);
> consumer.poll(0);
> consumer.subscribe(Pattern.compile("f.."), listener);
> consumer.poll(0);
> {code}
> the second call fails with the following error:
> {code}
> Exception in thread "main" java.lang.IllegalStateException: Subscription to 
> topics, partitions and pattern are mutually exclusive
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.subscribe(SubscriptionState.java:175)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(KafkaConsumer.java:854)
>   ...
> {code}
> h6. Subsequent Subscriptions Fail to Trigger a Subscription Change and 
> Rebalance
> Even after the code is tweaked to get around the above issue, only the first 
> call to regex subscription triggers an actual subscription and the 
> subscription's rebalance listener. The reason is the regex {{subscription}} / 
> {{poll}} does not directly call {{changeSubscription(...)}} in which 
> {{needsPartitionAssignment}} is set and causes a rebalance. This method is 
> called only during the first regex {{subscription}} / {{poll}} when 
> coordinator is unknown and [a {{client.poll}} 
> call|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L179]
>  is made which eventually leads to [a {{changeSubscription}} 
> call|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L161].
>  The subsequent call does not reach this point because the coordinator is 
> already known.
> It seems due to the same reason, if a consumer is subscribed to a pattern, 
> and later on a new topic is created that matches that pattern, the consumer 
> does not become subscribed it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3854) Subsequent regex subscription calls fail

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359942#comment-15359942
 ] 

ASF GitHub Bot commented on KAFKA-3854:
---

Github user vahidhashemian closed the pull request at:

https://github.com/apache/kafka/pull/1572


> Subsequent regex subscription calls fail
> 
>
> Key: KAFKA-3854
> URL: https://issues.apache.org/jira/browse/KAFKA-3854
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>
> There are a couple of issues with regex subscription in the new consumer:
> h6. Subsequent Subscriptions Fail
> When consecutive calls are made to new consumer's [regex 
> subscription|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L850],
>  like below: 
> {code}
> consumer.subscribe(Pattern.compile("..."), listener);
> consumer.poll(0);
> consumer.subscribe(Pattern.compile("f.."), listener);
> consumer.poll(0);
> {code}
> the second call fails with the following error:
> {code}
> Exception in thread "main" java.lang.IllegalStateException: Subscription to 
> topics, partitions and pattern are mutually exclusive
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.subscribe(SubscriptionState.java:175)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(KafkaConsumer.java:854)
>   ...
> {code}
> h6. Subsequent Subscriptions Fail to Trigger a Subscription Change and 
> Rebalance
> Even after the code is tweaked to get around the above issue, only the first 
> call to regex subscription triggers an actual subscription and the 
> subscription's rebalance listener. The reason is the regex {{subscription}} / 
> {{poll}} does not directly call {{changeSubscription(...)}} in which 
> {{needsPartitionAssignment}} is set and causes a rebalance. This method is 
> called only during the first regex {{subscription}} / {{poll}} when 
> coordinator is unknown and [a {{client.poll}} 
> call|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L179]
>  is made which eventually leads to [a {{changeSubscription}} 
> call|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L161].
>  The subsequent call does not reach this point because the coordinator is 
> already known.
> It seems due to the same reason, if a consumer is subscribed to a pattern, 
> and later on a new topic is created that matches that pattern, the consumer 
> does not become subscribed it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1572: KAFKA-3854: Fix issues with new consumer's subsequ...

2016-07-01 Thread vahidhashemian
GitHub user vahidhashemian reopened a pull request:

https://github.com/apache/kafka/pull/1572

KAFKA-3854: Fix issues with new consumer's subsequent regex (pattern) 
subscriptions

This patch fixes two issues:
1. Subsequent regex subscriptions fail with the new consumer.
2. Subsequent regex subscriptions would not immediately refresh metadata to 
change the subscription of the new consumer and trigger a rebalance.

The final note on the JIRA stating that a later created topic that matches 
a consumer's subscription pattern would not be assigned to the consumer upon 
creation seems to be as designed. A repeat 
`subscribe()` to the same pattern or some wait time until the next 
automatic metadata refresh would handle that case.

An integration test was also added to verify these issues are fixed with 
this PR.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-3854

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1572.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1572


commit 0860bd7691b87de33393b26373df752b0d2f934e
Author: Vahid Hashemian 
Date:   2016-06-16T21:19:32Z

KAFKA-3854: Fix issues with new consumer's subsequent regex (pattern) 
subscriptions

This patch fixes two issues:
1. Subsequent regex subscriptions fail with the new consumer.
2. Subsequent regex subscriptions would not actually refresh metadata and 
change the subscription of the new consumer nor they would trigger a rebalance.

The final note on the JIRA stating that a later created topic that matches 
a consumer's subscription pattern would not be assigned to the consumer upon 
creation seems to be as designed. A repeat subscribe() to the same pattern 
would be needed to handle that case.
Unit tests for regex subscriptions will be handled in KAFKA-3897.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Build failed in Jenkins: kafka-trunk-jdk8 #730

2016-07-01 Thread Apache Jenkins Server
See 

Changes:

[ismael] MINOR: remove "auto.commit.interval.ms" from "Manual Offset Control"

[ismael] MINOR: Fix consumer constructor doc string

--
[...truncated 11396 lines...]
org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithWrongParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithWrongParent PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testPatternMatchesAlreadyProvidedTopicSource STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testPatternMatchesAlreadyProvidedTopicSource PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkConnectedWithMultipleParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkConnectedWithMultipleParent PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testSubscribeTopicNameAndPattern STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testSubscribeTopicNameAndPattern PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithWrongParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithWrongParent PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > testAddStateStore 
STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > testAddStateStore 
PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkConnectedWithParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkConnectedWithParent PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithNonExistingProcessor STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithNonExistingProcessor PASSED

org.apache.kafka.streams.processor.internals.RecordQueueTest > testTimeTracking 
STARTED

org.apache.kafka.streams.processor.internals.RecordQueueTest > testTimeTracking 
PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testRegisterNonPersistentStore STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testRegisterNonPersistentStore PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testLockStateDirectory STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testLockStateDirectory PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testGetStore STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testGetStore PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testClose STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testClose PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testChangeLogOffsets STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testChangeLogOffsets PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testRegisterPersistentStore STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testRegisterPersistentStore PASSED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testNoTopic STARTED

org.apache.kafka.streams.processor.internals.ProcessorStateManagerTest > 
testNoTopic PASSED

org.apache.kafka.streams.processor.internals.QuickUnionTest > testUnite STARTED

org.apache.kafka.streams.processor.internals.QuickUnionTest > testUnite PASSED

org.apache.kafka.streams.processor.internals.QuickUnionTest > testUniteMany 
STARTED

org.apache.kafka.streams.processor.internals.QuickUnionTest > testUniteMany 
PASSED

org.apache.kafka.streams.processor.internals.StandbyTaskTest > 
testStorePartitions STARTED

org.apache.kafka.streams.processor.internals.StandbyTaskTest > 
testStorePartitions PASSED

org.apache.kafka.streams.processor.internals.StandbyTaskTest > testUpdateKTable 
STARTED

org.apache.kafka.streams.processor.internals.StandbyTaskTest > testUpdateKTable 
PASSED

org.apache.kafka.streams.processor.internals.StandbyTaskTest > 
testUpdateNonPersistentStore STARTED

org.apache.kafka.streams.processor.internals.StandbyTaskTest > 
testUpdateNonPersistentStore PASSED

org.apache.kafka.streams.processor.internals.StandbyTaskTest > testUpdate 
STARTED

org.apache.kafka.streams.processor.internals.StandbyTaskTest > testUpdate PASSED

org.apache.kafka.streams.processor.internals.PartitionGroupTest > 
testTimeTracking STARTED

org.apache.kafka.streams.processor.internals.PartitionGroupTest > 
testTimeTracking PASSED

org.apache.kafka.streams.processor.internals.StreamThreadTest > 
testInjectClients STARTED

org.apache.kafka.streams.processor.internals.StreamThreadTest > 
testInjectClients PASSED

org.apache.kafka.streams.processor.internals.StreamThreadTest > 

Build failed in Jenkins: kafka-trunk-jdk7 #1400

2016-07-01 Thread Apache Jenkins Server
See 

Changes:

[ismael] MINOR: remove "auto.commit.interval.ms" from "Manual Offset Control"

[ismael] MINOR: Fix consumer constructor doc string

[ismael] MINOR: Typo fix in docs ops

[ismael] MINOR: Warning instead of error if TGT cannot be renewed beyond the 
next

--
[...truncated 3330 lines...]

kafka.producer.AsyncProducerTest > testJavaProducer PASSED

kafka.producer.AsyncProducerTest > testIncompatibleEncoder STARTED

kafka.producer.AsyncProducerTest > testIncompatibleEncoder PASSED

kafka.producer.ProducerTest > testSendToNewTopic STARTED

kafka.producer.ProducerTest > testSendToNewTopic PASSED

kafka.producer.ProducerTest > testAsyncSendCanCorrectlyFailWithTimeout STARTED

kafka.producer.ProducerTest > testAsyncSendCanCorrectlyFailWithTimeout PASSED

kafka.producer.ProducerTest > testSendNullMessage STARTED

kafka.producer.ProducerTest > testSendNullMessage PASSED

kafka.producer.ProducerTest > testUpdateBrokerPartitionInfo STARTED

kafka.producer.ProducerTest > testUpdateBrokerPartitionInfo PASSED

kafka.producer.ProducerTest > testSendWithDeadBroker STARTED

kafka.producer.ProducerTest > testSendWithDeadBroker PASSED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit STARTED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig STARTED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig PASSED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails STARTED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithStringOffset STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithStringOffset PASSED

kafka.tools.ConsoleConsumerTest > shouldParseConfigsFromFile STARTED

kafka.tools.ConsoleConsumerTest > shouldParseConfigsFromFile PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidOldConsumerValidConfig STARTED

kafka.tools.ConsoleConsumerTest > shouldParseValidOldConsumerValidConfig PASSED

kafka.tools.ConsoleProducerTest > testParseKeyProp STARTED

kafka.tools.ConsoleProducerTest > testParseKeyProp PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer PASSED

kafka.tools.ConsoleProducerTest > testInvalidConfigs STARTED

kafka.tools.ConsoleProducerTest > testInvalidConfigs PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer PASSED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandler STARTED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandler PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes PASSED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic STARTED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic PASSED

kafka.common.ConfigTest > testInvalidGroupIds STARTED

kafka.common.ConfigTest > testInvalidGroupIds PASSED

kafka.common.ConfigTest > testInvalidClientIds STARTED

kafka.common.ConfigTest > testInvalidClientIds PASSED

kafka.common.ZkNodeChangeNotificationListenerTest > testProcessNotification 
STARTED

kafka.common.ZkNodeChangeNotificationListenerTest > testProcessNotification 
PASSED

kafka.common.TopicTest > testInvalidTopicNames STARTED

kafka.common.TopicTest > testInvalidTopicNames PASSED

kafka.common.TopicTest > testTopicHasCollision STARTED


[GitHub] kafka pull request #1572: KAFKA-3854: Fix issues with new consumer's subsequ...

2016-07-01 Thread vahidhashemian
Github user vahidhashemian closed the pull request at:

https://github.com/apache/kafka/pull/1572


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3854) Subsequent regex subscription calls fail

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359911#comment-15359911
 ] 

ASF GitHub Bot commented on KAFKA-3854:
---

GitHub user vahidhashemian reopened a pull request:

https://github.com/apache/kafka/pull/1572

KAFKA-3854: Fix issues with new consumer's subsequent regex (pattern) 
subscriptions

This patch fixes two issues:
1. Subsequent regex subscriptions fail with the new consumer.
2. Subsequent regex subscriptions would not immediately refresh metadata to 
change the subscription of the new consumer and trigger a rebalance.

The final note on the JIRA stating that a later created topic that matches 
a consumer's subscription pattern would not be assigned to the consumer upon 
creation seems to be as designed. A repeat 
`subscribe()` to the same pattern or some wait time until the next 
automatic metadata refresh would handle that case.

An integration test was also added to verify these issues are fixed with 
this PR.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-3854

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1572.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1572


commit 0860bd7691b87de33393b26373df752b0d2f934e
Author: Vahid Hashemian 
Date:   2016-06-16T21:19:32Z

KAFKA-3854: Fix issues with new consumer's subsequent regex (pattern) 
subscriptions

This patch fixes two issues:
1. Subsequent regex subscriptions fail with the new consumer.
2. Subsequent regex subscriptions would not actually refresh metadata and 
change the subscription of the new consumer nor they would trigger a rebalance.

The final note on the JIRA stating that a later created topic that matches 
a consumer's subscription pattern would not be assigned to the consumer upon 
creation seems to be as designed. A repeat subscribe() to the same pattern 
would be needed to handle that case.
Unit tests for regex subscriptions will be handled in KAFKA-3897.




> Subsequent regex subscription calls fail
> 
>
> Key: KAFKA-3854
> URL: https://issues.apache.org/jira/browse/KAFKA-3854
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>
> There are a couple of issues with regex subscription in the new consumer:
> h6. Subsequent Subscriptions Fail
> When consecutive calls are made to new consumer's [regex 
> subscription|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L850],
>  like below: 
> {code}
> consumer.subscribe(Pattern.compile("..."), listener);
> consumer.poll(0);
> consumer.subscribe(Pattern.compile("f.."), listener);
> consumer.poll(0);
> {code}
> the second call fails with the following error:
> {code}
> Exception in thread "main" java.lang.IllegalStateException: Subscription to 
> topics, partitions and pattern are mutually exclusive
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.subscribe(SubscriptionState.java:175)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(KafkaConsumer.java:854)
>   ...
> {code}
> h6. Subsequent Subscriptions Fail to Trigger a Subscription Change and 
> Rebalance
> Even after the code is tweaked to get around the above issue, only the first 
> call to regex subscription triggers an actual subscription and the 
> subscription's rebalance listener. The reason is the regex {{subscription}} / 
> {{poll}} does not directly call {{changeSubscription(...)}} in which 
> {{needsPartitionAssignment}} is set and causes a rebalance. This method is 
> called only during the first regex {{subscription}} / {{poll}} when 
> coordinator is unknown and [a {{client.poll}} 
> call|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L179]
>  is made which eventually leads to [a {{changeSubscription}} 
> call|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L161].
>  The subsequent call does not reach this point because the coordinator is 
> already known.
> It seems due to the same reason, if a consumer is subscribed to a pattern, 
> and later on a new topic is created that matches that pattern, the consumer 
> does not become subscribed it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3854) Subsequent regex subscription calls fail

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359909#comment-15359909
 ] 

ASF GitHub Bot commented on KAFKA-3854:
---

Github user vahidhashemian closed the pull request at:

https://github.com/apache/kafka/pull/1572


> Subsequent regex subscription calls fail
> 
>
> Key: KAFKA-3854
> URL: https://issues.apache.org/jira/browse/KAFKA-3854
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>
> There are a couple of issues with regex subscription in the new consumer:
> h6. Subsequent Subscriptions Fail
> When consecutive calls are made to new consumer's [regex 
> subscription|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L850],
>  like below: 
> {code}
> consumer.subscribe(Pattern.compile("..."), listener);
> consumer.poll(0);
> consumer.subscribe(Pattern.compile("f.."), listener);
> consumer.poll(0);
> {code}
> the second call fails with the following error:
> {code}
> Exception in thread "main" java.lang.IllegalStateException: Subscription to 
> topics, partitions and pattern are mutually exclusive
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.subscribe(SubscriptionState.java:175)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(KafkaConsumer.java:854)
>   ...
> {code}
> h6. Subsequent Subscriptions Fail to Trigger a Subscription Change and 
> Rebalance
> Even after the code is tweaked to get around the above issue, only the first 
> call to regex subscription triggers an actual subscription and the 
> subscription's rebalance listener. The reason is the regex {{subscription}} / 
> {{poll}} does not directly call {{changeSubscription(...)}} in which 
> {{needsPartitionAssignment}} is set and causes a rebalance. This method is 
> called only during the first regex {{subscription}} / {{poll}} when 
> coordinator is unknown and [a {{client.poll}} 
> call|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L179]
>  is made which eventually leads to [a {{changeSubscription}} 
> call|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L161].
>  The subsequent call does not reach this point because the coordinator is 
> already known.
> It seems due to the same reason, if a consumer is subscribed to a pattern, 
> and later on a new topic is created that matches that pattern, the consumer 
> does not become subscribed it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1572: KAFKA-3854: Fix issues with new consumer's subsequ...

2016-07-01 Thread vahidhashemian
GitHub user vahidhashemian reopened a pull request:

https://github.com/apache/kafka/pull/1572

KAFKA-3854: Fix issues with new consumer's subsequent regex (pattern) 
subscriptions

This patch fixes two issues:
1. Subsequent regex subscriptions fail with the new consumer.
2. Subsequent regex subscriptions would not immediately refresh metadata to 
change the subscription of the new consumer and trigger a rebalance.

The final note on the JIRA stating that a later created topic that matches 
a consumer's subscription pattern would not be assigned to the consumer upon 
creation seems to be as designed. A repeat 
`subscribe()` to the same pattern or some wait time until the next 
automatic metadata refresh would handle that case.

An integration test was also added to verify these issues are fixed with 
this PR.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-3854

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1572.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1572


commit 0860bd7691b87de33393b26373df752b0d2f934e
Author: Vahid Hashemian 
Date:   2016-06-16T21:19:32Z

KAFKA-3854: Fix issues with new consumer's subsequent regex (pattern) 
subscriptions

This patch fixes two issues:
1. Subsequent regex subscriptions fail with the new consumer.
2. Subsequent regex subscriptions would not actually refresh metadata and 
change the subscription of the new consumer nor they would trigger a rebalance.

The final note on the JIRA stating that a later created topic that matches 
a consumer's subscription pattern would not be assigned to the consumer upon 
creation seems to be as designed. A repeat subscribe() to the same pattern 
would be needed to handle that case.
Unit tests for regex subscriptions will be handled in KAFKA-3897.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #1510: Minor: warn if the TGT cannot be renewed.

2016-07-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1510


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Build failed in Jenkins: kafka-trunk-jdk7 #1399

2016-07-01 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-3902: Optimize KTable.filter in Streams DSL to avoid 
forwarding if

--
[...truncated 3330 lines...]

kafka.producer.AsyncProducerTest > testJavaProducer PASSED

kafka.producer.AsyncProducerTest > testIncompatibleEncoder STARTED

kafka.producer.AsyncProducerTest > testIncompatibleEncoder PASSED

kafka.producer.ProducerTest > testSendToNewTopic STARTED

kafka.producer.ProducerTest > testSendToNewTopic PASSED

kafka.producer.ProducerTest > testAsyncSendCanCorrectlyFailWithTimeout STARTED

kafka.producer.ProducerTest > testAsyncSendCanCorrectlyFailWithTimeout PASSED

kafka.producer.ProducerTest > testSendNullMessage STARTED

kafka.producer.ProducerTest > testSendNullMessage PASSED

kafka.producer.ProducerTest > testUpdateBrokerPartitionInfo STARTED

kafka.producer.ProducerTest > testUpdateBrokerPartitionInfo PASSED

kafka.producer.ProducerTest > testSendWithDeadBroker STARTED

kafka.producer.ProducerTest > testSendWithDeadBroker PASSED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit STARTED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig STARTED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig PASSED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails STARTED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithStringOffset STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithStringOffset PASSED

kafka.tools.ConsoleConsumerTest > shouldParseConfigsFromFile STARTED

kafka.tools.ConsoleConsumerTest > shouldParseConfigsFromFile PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidOldConsumerValidConfig STARTED

kafka.tools.ConsoleConsumerTest > shouldParseValidOldConsumerValidConfig PASSED

kafka.tools.ConsoleProducerTest > testParseKeyProp STARTED

kafka.tools.ConsoleProducerTest > testParseKeyProp PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer PASSED

kafka.tools.ConsoleProducerTest > testInvalidConfigs STARTED

kafka.tools.ConsoleProducerTest > testInvalidConfigs PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer PASSED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandler STARTED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandler PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes PASSED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic STARTED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic PASSED

kafka.common.ConfigTest > testInvalidGroupIds STARTED

kafka.common.ConfigTest > testInvalidGroupIds PASSED

kafka.common.ConfigTest > testInvalidClientIds STARTED

kafka.common.ConfigTest > testInvalidClientIds PASSED

kafka.common.ZkNodeChangeNotificationListenerTest > testProcessNotification 
STARTED

kafka.common.ZkNodeChangeNotificationListenerTest > testProcessNotification 
PASSED

kafka.common.TopicTest > testInvalidTopicNames STARTED

kafka.common.TopicTest > testInvalidTopicNames PASSED

kafka.common.TopicTest > testTopicHasCollision STARTED

kafka.common.TopicTest > testTopicHasCollision PASSED

kafka.common.TopicTest > testTopicHasCollisionChars STARTED

kafka.common.TopicTest > testTopicHasCollisionChars PASSED


[GitHub] kafka pull request #1568: MINOR: Fix consumer constructor doc string

2016-07-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1568


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #1578: MINOR: remove "auto.commit.interval.ms" from "Manu...

2016-07-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1578


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Build failed in Jenkins: kafka-trunk-jdk8 #729

2016-07-01 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-3902: Optimize KTable.filter in Streams DSL to avoid 
forwarding if

--
[...truncated 1113 lines...]
kafka.server.KafkaConfigTest > testDuplicateListeners PASSED

kafka.server.KafkaConfigTest > testLogRetentionUnlimited STARTED

kafka.server.KafkaConfigTest > testLogRetentionUnlimited PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeMsProvided STARTED

kafka.server.KafkaConfigTest > testLogRetentionTimeMsProvided PASSED

kafka.server.KafkaConfigTest > testLogRollTimeNoConfigProvided STARTED

kafka.server.KafkaConfigTest > testLogRollTimeNoConfigProvided PASSED

kafka.server.KafkaConfigTest > testInvalidInterBrokerSecurityProtocol STARTED

kafka.server.KafkaConfigTest > testInvalidInterBrokerSecurityProtocol PASSED

kafka.server.KafkaConfigTest > testAdvertiseDefaults STARTED

kafka.server.KafkaConfigTest > testAdvertiseDefaults PASSED

kafka.server.KafkaConfigTest > testBadListenerProtocol STARTED

kafka.server.KafkaConfigTest > testBadListenerProtocol PASSED

kafka.server.KafkaConfigTest > testListenerDefaults STARTED

kafka.server.KafkaConfigTest > testListenerDefaults PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeBothMinutesAndHoursProvided 
STARTED

kafka.server.KafkaConfigTest > testLogRetentionTimeBothMinutesAndHoursProvided 
PASSED

kafka.server.KafkaConfigTest > testUncleanElectionDisabled STARTED

kafka.server.KafkaConfigTest > testUncleanElectionDisabled PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeNoConfigProvided STARTED

kafka.server.KafkaConfigTest > testLogRetentionTimeNoConfigProvided PASSED

kafka.server.KafkaConfigTest > testCaseInsensitiveListenerProtocol STARTED

kafka.server.KafkaConfigTest > testCaseInsensitiveListenerProtocol PASSED

kafka.server.KafkaConfigTest > testFromPropsInvalid STARTED

kafka.server.KafkaConfigTest > testFromPropsInvalid PASSED

kafka.server.KafkaConfigTest > testInvalidCompressionType STARTED

kafka.server.KafkaConfigTest > testInvalidCompressionType PASSED

kafka.server.KafkaConfigTest > testAdvertiseHostNameDefault STARTED

kafka.server.KafkaConfigTest > testAdvertiseHostNameDefault PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeMinutesProvided STARTED

kafka.server.KafkaConfigTest > testLogRetentionTimeMinutesProvided PASSED

kafka.server.KafkaConfigTest > testValidCompressionType STARTED

kafka.server.KafkaConfigTest > testValidCompressionType PASSED

kafka.server.KafkaConfigTest > testUncleanElectionInvalid STARTED

kafka.server.KafkaConfigTest > testUncleanElectionInvalid PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeBothMinutesAndMsProvided 
STARTED

kafka.server.KafkaConfigTest > testLogRetentionTimeBothMinutesAndMsProvided 
PASSED

kafka.server.KafkaConfigTest > testLogRollTimeMsProvided STARTED

kafka.server.KafkaConfigTest > testLogRollTimeMsProvided PASSED

kafka.server.KafkaConfigTest > testUncleanLeaderElectionDefault STARTED

kafka.server.KafkaConfigTest > testUncleanLeaderElectionDefault PASSED

kafka.server.KafkaConfigTest > testInvalidAdvertisedListenersProtocol STARTED

kafka.server.KafkaConfigTest > testInvalidAdvertisedListenersProtocol PASSED

kafka.server.KafkaConfigTest > testUncleanElectionEnabled STARTED

kafka.server.KafkaConfigTest > testUncleanElectionEnabled PASSED

kafka.server.KafkaConfigTest > testAdvertisePortDefault STARTED

kafka.server.KafkaConfigTest > testAdvertisePortDefault PASSED

kafka.server.KafkaConfigTest > testVersionConfiguration STARTED

kafka.server.KafkaConfigTest > testVersionConfiguration PASSED

kafka.server.KafkaConfigTest > testEqualAdvertisedListenersProtocol STARTED

kafka.server.KafkaConfigTest > testEqualAdvertisedListenersProtocol PASSED

kafka.server.MetadataRequestTest > testReplicaDownResponse STARTED

kafka.server.MetadataRequestTest > testReplicaDownResponse PASSED

kafka.server.MetadataRequestTest > testRack STARTED

kafka.server.MetadataRequestTest > testRack PASSED

kafka.server.MetadataRequestTest > testIsInternal STARTED

kafka.server.MetadataRequestTest > testIsInternal PASSED

kafka.server.MetadataRequestTest > testControllerId STARTED

kafka.server.MetadataRequestTest > testControllerId PASSED

kafka.server.MetadataRequestTest > testAllTopicsRequest STARTED

kafka.server.MetadataRequestTest > testAllTopicsRequest PASSED

kafka.server.MetadataRequestTest > testNoTopicsRequest STARTED

kafka.server.MetadataRequestTest > testNoTopicsRequest PASSED

kafka.server.SimpleFetchTest > testReadFromLog STARTED

kafka.server.SimpleFetchTest > testReadFromLog PASSED

kafka.server.ProduceRequestTest > testSimpleProduceRequest STARTED

kafka.server.ProduceRequestTest > testSimpleProduceRequest PASSED

kafka.server.ProduceRequestTest > testCorruptLz4ProduceRequest STARTED

kafka.server.ProduceRequestTest > testCorruptLz4ProduceRequest PASSED

kafka.server.DelayedOperationTest > testRequestPurge STARTED


[jira] [Commented] (KAFKA-3902) Optimize KTable.filter() to reduce unnecessary traffic

2016-07-01 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359843#comment-15359843
 ] 

Guozhang Wang commented on KAFKA-3902:
--

[~phderome] Thanks for your patch! It has been merged in trunk and 0.10.0 
branch.

I have added you as contributor to Kafka and moving forward you can assign 
tickets to yourself.

> Optimize KTable.filter() to reduce unnecessary traffic
> --
>
> Key: KAFKA-3902
> URL: https://issues.apache.org/jira/browse/KAFKA-3902
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Phil Derome
>  Labels: architecture, performance
> Fix For: 0.10.0.1
>
>
> {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be 
> optimized to reduce unnecessary data traffic to downstream operators. More 
> specifically:
> 1. Some context: when a KTable participates in a downstream operators (e.g. 
> if that operator is an aggregation), then we need to materialize this KTable 
> and send both its old value as well as new value as a pair {old -> new} to 
> the downstream operator. In practice it usually needs to send the pair. 
> So let's discuss about them separately, take the following example source 
> stream for your KTable
> {{, ,  ...}}
> When the KTable needs to be materialized, it will transform the source 
> messages into the pairs of:
> {{ 1\}>,  2\}>,  3\}>}}
> 2. If "send old value" is not enabled, then when the filter predicate returns 
> false, we MUST send a  to the downstream operator to indicate that 
> this key is being filtered in the table. Otherwise, for example if your 
> filter is "value < 2", then the updated value  will just be filtered, 
> resulting in incorrect semantics.
> If it returns true we should still send the original  to 
> downstream operators.
> 3. If "send old value" is enabled, then there are a couple of cases we can 
> consider:
> a. If old value is  and new value is , and the 
> filter predicate return false for the new value, then in this case it is safe 
> to optimize and not returning anything to the downstream operator, since in 
> this case we know there is no value for the key previously anyways; otherwise 
> we send the original pair.
> b. If old value is  and new value is , 
> indicating to delete this key, and the filter predicate return false for the 
> old value, then in this case it is safe to optimize and not returning 
> anything to the downstream operator, since we know that the old value has 
> already been filtered in a previous message; otherwise we send the original 
> pair.
> c. If both old and new values are not null, and:
> 1) predicate return true on both, send the original pair;
> 2) predicate return false on both, we can optimize and do not send 
> anything;
> 3) predicate return true on old and false on new, send the key: \{old 
> -> null\};
> 4) predicate return false on old and true on new, send the key: 
> \{null -> new\};



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3902) Optimize KTable.filter() to reduce unnecessary traffic

2016-07-01 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-3902.
--
   Resolution: Fixed
Fix Version/s: 0.10.0.1

Issue resolved by pull request 1556
[https://github.com/apache/kafka/pull/1556]

> Optimize KTable.filter() to reduce unnecessary traffic
> --
>
> Key: KAFKA-3902
> URL: https://issues.apache.org/jira/browse/KAFKA-3902
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Phil Derome
>  Labels: architecture, performance
> Fix For: 0.10.0.1
>
>
> {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be 
> optimized to reduce unnecessary data traffic to downstream operators. More 
> specifically:
> 1. Some context: when a KTable participates in a downstream operators (e.g. 
> if that operator is an aggregation), then we need to materialize this KTable 
> and send both its old value as well as new value as a pair {old -> new} to 
> the downstream operator. In practice it usually needs to send the pair. 
> So let's discuss about them separately, take the following example source 
> stream for your KTable
> {{, ,  ...}}
> When the KTable needs to be materialized, it will transform the source 
> messages into the pairs of:
> {{ 1\}>,  2\}>,  3\}>}}
> 2. If "send old value" is not enabled, then when the filter predicate returns 
> false, we MUST send a  to the downstream operator to indicate that 
> this key is being filtered in the table. Otherwise, for example if your 
> filter is "value < 2", then the updated value  will just be filtered, 
> resulting in incorrect semantics.
> If it returns true we should still send the original  to 
> downstream operators.
> 3. If "send old value" is enabled, then there are a couple of cases we can 
> consider:
> a. If old value is  and new value is , and the 
> filter predicate return false for the new value, then in this case it is safe 
> to optimize and not returning anything to the downstream operator, since in 
> this case we know there is no value for the key previously anyways; otherwise 
> we send the original pair.
> b. If old value is  and new value is , 
> indicating to delete this key, and the filter predicate return false for the 
> old value, then in this case it is safe to optimize and not returning 
> anything to the downstream operator, since we know that the old value has 
> already been filtered in a previous message; otherwise we send the original 
> pair.
> c. If both old and new values are not null, and:
> 1) predicate return true on both, send the original pair;
> 2) predicate return false on both, we can optimize and do not send 
> anything;
> 3) predicate return true on old and false on new, send the key: \{old 
> -> null\};
> 4) predicate return false on old and true on new, send the key: 
> \{null -> new\};



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3902) Optimize KTable.filter() to reduce unnecessary traffic

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359841#comment-15359841
 ] 

ASF GitHub Bot commented on KAFKA-3902:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1556


> Optimize KTable.filter() to reduce unnecessary traffic
> --
>
> Key: KAFKA-3902
> URL: https://issues.apache.org/jira/browse/KAFKA-3902
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Phil Derome
>  Labels: architecture, performance
> Fix For: 0.10.0.1
>
>
> {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be 
> optimized to reduce unnecessary data traffic to downstream operators. More 
> specifically:
> 1. Some context: when a KTable participates in a downstream operators (e.g. 
> if that operator is an aggregation), then we need to materialize this KTable 
> and send both its old value as well as new value as a pair {old -> new} to 
> the downstream operator. In practice it usually needs to send the pair. 
> So let's discuss about them separately, take the following example source 
> stream for your KTable
> {{, ,  ...}}
> When the KTable needs to be materialized, it will transform the source 
> messages into the pairs of:
> {{ 1\}>,  2\}>,  3\}>}}
> 2. If "send old value" is not enabled, then when the filter predicate returns 
> false, we MUST send a  to the downstream operator to indicate that 
> this key is being filtered in the table. Otherwise, for example if your 
> filter is "value < 2", then the updated value  will just be filtered, 
> resulting in incorrect semantics.
> If it returns true we should still send the original  to 
> downstream operators.
> 3. If "send old value" is enabled, then there are a couple of cases we can 
> consider:
> a. If old value is  and new value is , and the 
> filter predicate return false for the new value, then in this case it is safe 
> to optimize and not returning anything to the downstream operator, since in 
> this case we know there is no value for the key previously anyways; otherwise 
> we send the original pair.
> b. If old value is  and new value is , 
> indicating to delete this key, and the filter predicate return false for the 
> old value, then in this case it is safe to optimize and not returning 
> anything to the downstream operator, since we know that the old value has 
> already been filtered in a previous message; otherwise we send the original 
> pair.
> c. If both old and new values are not null, and:
> 1) predicate return true on both, send the original pair;
> 2) predicate return false on both, we can optimize and do not send 
> anything;
> 3) predicate return true on old and false on new, send the key: \{old 
> -> null\};
> 4) predicate return false on old and true on new, send the key: 
> \{null -> new\};



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1556: KAFKA-3902

2016-07-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1556


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-3902) Optimize KTable.filter() to reduce unnecessary traffic

2016-07-01 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3902:
-
Assignee: Phil Derome

> Optimize KTable.filter() to reduce unnecessary traffic
> --
>
> Key: KAFKA-3902
> URL: https://issues.apache.org/jira/browse/KAFKA-3902
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Phil Derome
>  Labels: architecture, performance
>
> {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be 
> optimized to reduce unnecessary data traffic to downstream operators. More 
> specifically:
> 1. Some context: when a KTable participates in a downstream operators (e.g. 
> if that operator is an aggregation), then we need to materialize this KTable 
> and send both its old value as well as new value as a pair {old -> new} to 
> the downstream operator. In practice it usually needs to send the pair. 
> So let's discuss about them separately, take the following example source 
> stream for your KTable
> {{, ,  ...}}
> When the KTable needs to be materialized, it will transform the source 
> messages into the pairs of:
> {{ 1\}>,  2\}>,  3\}>}}
> 2. If "send old value" is not enabled, then when the filter predicate returns 
> false, we MUST send a  to the downstream operator to indicate that 
> this key is being filtered in the table. Otherwise, for example if your 
> filter is "value < 2", then the updated value  will just be filtered, 
> resulting in incorrect semantics.
> If it returns true we should still send the original  to 
> downstream operators.
> 3. If "send old value" is enabled, then there are a couple of cases we can 
> consider:
> a. If old value is  and new value is , and the 
> filter predicate return false for the new value, then in this case it is safe 
> to optimize and not returning anything to the downstream operator, since in 
> this case we know there is no value for the key previously anyways; otherwise 
> we send the original pair.
> b. If old value is  and new value is , 
> indicating to delete this key, and the filter predicate return false for the 
> old value, then in this case it is safe to optimize and not returning 
> anything to the downstream operator, since we know that the old value has 
> already been filtered in a previous message; otherwise we send the original 
> pair.
> c. If both old and new values are not null, and:
> 1) predicate return true on both, send the original pair;
> 2) predicate return false on both, we can optimize and do not send 
> anything;
> 3) predicate return true on old and false on new, send the key: \{old 
> -> null\};
> 4) predicate return false on old and true on new, send the key: 
> \{null -> new\};



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable

2016-07-01 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359835#comment-15359835
 ] 

Guozhang Wang commented on KAFKA-3705:
--

Yes, this does finally clarify your scenario, thanks!.

I think the change<> pair can still help in your case, because it has the 
benefit that for aggregations for example, you have the clear information that 
"subtract the old value, and add the new value" instead of depending on whether 
the returned value is null. For example, the Streams DSL defines the 
aggregation operator in the following way (note that in your customized 
implementation you do not need to strictly follow the same pattern, but just to 
illustrate this idea):

{code}
 KTable aggregate(Initializer initializer,
   Aggregator adder,
   Aggregator substractor, ...);
{code}


Let me try again with your example code and the aggregation pattern in the 
above Streams DSL, and if you do not agree let's have a small skype chat :)


1. Suppose your current value in Table B is

B.PK => B.V.old, which contains A.PK.old, C.PK, etc.

And when you join tables A and B, you repartition the stream B by A.PK while 
still maintaining the message format as B.BK => B.V, and the join result is in 
the format of: 

B.PK => join


2. Now suppose you have an update on Table B, as B.PK => B.V.new, which 
contains A.PK.new, C.PK (same value), etc. And suppose it is represented as a 
change pair of {old, new}, i.e.

B.PK => {B.V.old, B.V.new}, or more specifically:

B.PK => {, }


3. When you repartition it based on A.PK value, this will result in two pairs 
sending to potentially two different partitions, as:

B.PK => {B.V.old, null}   (sent to partition1)

B.PK => {null, B.V.new}(sent to partition2)


4. These two records will be joined independently at two processors, each 
fetching one of the re-partitioned topic partition, and the result is:

B.PK => {joined(B.V.old, A.V.old), null}   (here A.V.old corresponds to the 
value for key A.PK.old in Table A)

B.PK => {null, joined(B.V.new, A.V.new)}   (here A.V.new corresponds to the 
value for key A.PK.new in Table A)


and then they will be sent to the second topic that is partitioned on C.PK, and 
since their C.PK value is the same, they will be sent to the same partition, 
but in arbitrary order.


5. The aggregation function consumes from the second re-partition topic based 
on C.PK, and does the aggregation by 1) call a subtract function on the old 
value of the pair, and then 2) call an add function on the new value of the 
pair, and if the value is null, skip that call. And more specifically the 
subtract / add functions look like:

{code}
List> subtractor.apply(C key, Joined value, List> 
current)
{
   current.remove(key)

   return m.entrySet.asList
}

List> adder.apply(C key, Joined value, List> 
current)
{
   current.put(key, value)

   return m.entrySet.asList
}
{code}

And based on the order these two records are received, we will either call 
{{subtract(C.PK, joined(B.V.old, A.V.old), current)}} first, and then 
{{add(C.PK, joined(B.V.new, A.V.new), current}}, or vice versa, and either way 
it is correct, since {{B.V.old}} and {{B.V.new}} are different keys.

> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Liquan Pei
>  Labels: api
> Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3924) Data loss due to halting when LEO is larger than leader's LEO

2016-07-01 Thread Maysam Yabandeh (JIRA)
Maysam Yabandeh created KAFKA-3924:
--

 Summary: Data loss due to halting when LEO is larger than leader's 
LEO
 Key: KAFKA-3924
 URL: https://issues.apache.org/jira/browse/KAFKA-3924
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.10.0.0
Reporter: Maysam Yabandeh


Currently the follower broker panics when its LEO is less than its leader's 
LEO,  and assuming that this is an impossible state to reach, halts the process 
to prevent any further damage.
{code}
if (leaderEndOffset < replica.logEndOffset.messageOffset) {
  // Prior to truncating the follower's log, ensure that doing so is not 
disallowed by the configuration for unclean leader election.
  // This situation could only happen if the unclean election configuration 
for a topic changes while a replica is down. Otherwise,
  // we should never encounter this situation since a non-ISR leader cannot 
be elected if disallowed by the broker configuration.
  if (!LogConfig.fromProps(brokerConfig.originals, 
AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
ConfigType.Topic, 
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
// Log a fatal error and shutdown the broker to ensure that data loss 
does not unexpectedly occur.
fatal("...")
Runtime.getRuntime.halt(1)
  }
{code}

Firstly this assumption is invalid and there are legitimate cases (examples 
below) that this case could actually occur. Secondly halt results into the 
broker losing its un-flushed data, and if multiple brokers halt simultaneously 
there is a chance that both leader and followers of a partition are among the 
halted brokers, which would result into permanent data loss.

Given that this is a legit case, we suggest to replace it with a graceful 
shutdown to avoid propagating data loss to the entire cluster.

Details:
One legit case that this could actually occur is when a troubled broker shrinks 
its partitions right before crashing (KAFKA-3410 and KAFKA-3861). In this case 
the broker has lost some data but the controller cannot still elects the others 
as the leader. If the crashed broker comes back up, the controller elects it as 
the leader, and as a result all other brokers who are now following it halt 
since they have LEOs larger than that of shrunk topics in the restarted broker. 
 We actually had a case that bringing up a crashed broker simultaneously took 
down the entire cluster and as explained above this could result into data loss.

The other legit case is when multiple brokers ungracefully shutdown at the same 
time. In this case both of the leader and the followers lose their un-flushed 
data but one of them has HW larger than the other. Controller elects the one 
who comes back up sooner as the leader and if its LEO is less than its future 
follower, the follower will halt (and probably lose more data). Simultaneous 
ungrateful shutdown could happen due to hardware issue (e.g., rack power 
failure), operator errors, or software issue (e.g., the case above that is 
further explained in KAFKA-3410 and KAFKA-3861 and causes simultaneous halts in 
multiple brokers)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Kafka Connect Transformers

2016-07-01 Thread Gwen Shapira
Added wiki access. Enjoy :)

On Fri, Jul 1, 2016 at 11:24 AM, Nisarg Shah  wrote:
> Need to submit a KIP for https://issues.apache.org/jira/browse/KAFKA-3209. 
> Please provide wiki write access to ‘snisarg’.
>
> Thanks,
> Nisarg Shah.
>
>> On Jun 28, 2016, at 6:27 PM, Nisarg Shah  wrote:
>>
>> Need permissions to edit the wiki. Username is ‘snisarg’.
>>
>> Thanks,
>> Nisarg.
>>
>>> On Jun 28, 2016, at 09:08, Nisarg Shah >> > wrote:
>>>
>>> Hello,
>>>
>>> I need to create a page so that I can write a Kafka Improvement Proposal 
>>> for the below. My username is ‘snisarg’.
>>>
>>> Thanks,
>>> Nisarg
>>>
 On Jun 19, 2016, at 10:43 PM, Nisarg Shah > wrote:

 Hello,

 I am looking to do https://issues.apache.org/jira/browse/KAFKA-3209 
 . I wanted feedback from 
 the devs for the design that I’m proposing to put in place. Thanks a lot 
 for all the discussions Ewen Cheslack-Postava.

 A gist of how I plan to do it is by using ‘Transformers’ that can be 
 configurationally chained together and data will pass through them between 
 a source and destination for Kafka Connect.

 To set up transformers, we propose using the properties to define 
 Transformer classes one after the other.
 transformer=abc.Transformer1,xyz.Transformer2

 Each transformer can get specific properties passed on from the same 
 properties file, as it is with the Connectors.

 About the actual signature for the transformation function that does all 
 the work, how’s this interface?
 public abstract class Transformer {
 public abstract T2 transform(T1 t1);

 public void initialize(Map props) {}
 }

 Approach 1:
 Functionally, the complete data can be passed.
 Just as the *Tasks get a complete List<*Record>, the transformer can get 
 the same. The whole list passing makes rearranging or merging data 
 possible. This can be helpful if transformations require looking up or 
 down the messages. Allowing custom datatypes between transformers will 
 allow custom objects to be passed around intermediate. Casting could be an 
 issue.

 Approach 2:
 Taking a simplistic approach and doing a message by message 
 transformation. The transformer could store data from the previous 
 message, but not go down the list of messages. From the comments by 
 Michael Graff, both approaches would work, but if down looking is 
 required, we would have to go with Approach 1.

 I will also have a working change ready for Approach 1 very soon but till 
 then, please give me your suggestions.

 Thanks,
 Nisarg.




>>>
>>
>


Build failed in Jenkins: kafka-trunk-jdk7 #1398

2016-07-01 Thread Apache Jenkins Server
See 

Changes:

[me] MINOR: fix Bash shebang on vagrant/ scripts

--
[...truncated 3368 lines...]

kafka.producer.AsyncProducerTest > testJavaProducer PASSED

kafka.producer.AsyncProducerTest > testIncompatibleEncoder STARTED

kafka.producer.AsyncProducerTest > testIncompatibleEncoder PASSED

kafka.producer.ProducerTest > testSendToNewTopic STARTED

kafka.producer.ProducerTest > testSendToNewTopic PASSED

kafka.producer.ProducerTest > testAsyncSendCanCorrectlyFailWithTimeout STARTED

kafka.producer.ProducerTest > testAsyncSendCanCorrectlyFailWithTimeout PASSED

kafka.producer.ProducerTest > testSendNullMessage STARTED

kafka.producer.ProducerTest > testSendNullMessage PASSED

kafka.producer.ProducerTest > testUpdateBrokerPartitionInfo STARTED

kafka.producer.ProducerTest > testUpdateBrokerPartitionInfo PASSED

kafka.producer.ProducerTest > testSendWithDeadBroker STARTED

kafka.producer.ProducerTest > testSendWithDeadBroker PASSED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit STARTED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig STARTED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig PASSED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails STARTED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithStringOffset STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithStringOffset PASSED

kafka.tools.ConsoleConsumerTest > shouldParseConfigsFromFile STARTED

kafka.tools.ConsoleConsumerTest > shouldParseConfigsFromFile PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidOldConsumerValidConfig STARTED

kafka.tools.ConsoleConsumerTest > shouldParseValidOldConsumerValidConfig PASSED

kafka.tools.ConsoleProducerTest > testParseKeyProp STARTED

kafka.tools.ConsoleProducerTest > testParseKeyProp PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer PASSED

kafka.tools.ConsoleProducerTest > testInvalidConfigs STARTED

kafka.tools.ConsoleProducerTest > testInvalidConfigs PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer PASSED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandler STARTED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandler PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes PASSED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic STARTED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic PASSED

kafka.common.ConfigTest > testInvalidGroupIds STARTED

kafka.common.ConfigTest > testInvalidGroupIds PASSED

kafka.common.ConfigTest > testInvalidClientIds STARTED

kafka.common.ConfigTest > testInvalidClientIds PASSED

kafka.common.ZkNodeChangeNotificationListenerTest > testProcessNotification 
STARTED

kafka.common.ZkNodeChangeNotificationListenerTest > testProcessNotification 
PASSED

kafka.common.TopicTest > testInvalidTopicNames STARTED

kafka.common.TopicTest > testInvalidTopicNames PASSED

kafka.common.TopicTest > testTopicHasCollision STARTED

kafka.common.TopicTest > testTopicHasCollision PASSED

kafka.common.TopicTest > testTopicHasCollisionChars STARTED

kafka.common.TopicTest > testTopicHasCollisionChars PASSED

kafka.security.auth.ResourceTypeTest > 

[jira] [Commented] (KAFKA-3922) Add a copy-constructor to AbstractStream

2016-07-01 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359577#comment-15359577
 ] 

Guozhang Wang commented on KAFKA-3922:
--

[~fhussonnois] This sounds a good idea. I have assigned the ticket to you, feel 
free to follow the below instructions to submit a PR:

https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest

> Add a copy-constructor to AbstractStream
> 
>
> Key: KAFKA-3922
> URL: https://issues.apache.org/jira/browse/KAFKA-3922
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Florian Hussonnois
>Assignee: Florian Hussonnois
>Priority: Trivial
>  Labels: api
>
> It would be nice to have a copy constructor into the class AbstractStream.
> This constructor will allow to decorate KStreamImpl and KTableImpl APIs with 
> new methods without impacting the public API. 
> I think, this can be special useful for community projects which would like 
> to develop DSL extensions. This constructor will give access to the protected 
> members of the AbstractStream.
> I can made a PR if you think this change is possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3922) Add a copy-constructor to AbstractStream

2016-07-01 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3922:
-
Assignee: Florian Hussonnois  (was: Guozhang Wang)

> Add a copy-constructor to AbstractStream
> 
>
> Key: KAFKA-3922
> URL: https://issues.apache.org/jira/browse/KAFKA-3922
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Florian Hussonnois
>Assignee: Florian Hussonnois
>Priority: Trivial
>  Labels: api
>
> It would be nice to have a copy constructor into the class AbstractStream.
> This constructor will allow to decorate KStreamImpl and KTableImpl APIs with 
> new methods without impacting the public API. 
> I think, this can be special useful for community projects which would like 
> to develop DSL extensions. This constructor will give access to the protected 
> members of the AbstractStream.
> I can made a PR if you think this change is possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3922) Add a copy-constructor to AbstractStream

2016-07-01 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3922:
-
Labels: api  (was: )

> Add a copy-constructor to AbstractStream
> 
>
> Key: KAFKA-3922
> URL: https://issues.apache.org/jira/browse/KAFKA-3922
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Florian Hussonnois
>Assignee: Florian Hussonnois
>Priority: Trivial
>  Labels: api
>
> It would be nice to have a copy constructor into the class AbstractStream.
> This constructor will allow to decorate KStreamImpl and KTableImpl APIs with 
> new methods without impacting the public API. 
> I think, this can be special useful for community projects which would like 
> to develop DSL extensions. This constructor will give access to the protected 
> members of the AbstractStream.
> I can made a PR if you think this change is possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk8 #728

2016-07-01 Thread Apache Jenkins Server
See 

Changes:

[me] MINOR: fix Bash shebang on vagrant/ scripts

--
[...truncated 3399 lines...]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
ERROR: Could not install JDK1_8_0_66_HOME
java.lang.NullPointerException
at 
hudson.plugins.toolenv.ToolEnvBuildWrapper$1.buildEnvVars(ToolEnvBuildWrapper.java:46)
at hudson.model.AbstractBuild.getEnvironment(AbstractBuild.java:947)
at hudson.plugins.git.GitSCM.getParamExpandedRepos(GitSCM.java:390)
at 
hudson.plugins.git.GitSCM.compareRemoteRevisionWithImpl(GitSCM.java:577)
at hudson.plugins.git.GitSCM.compareRemoteRevisionWith(GitSCM.java:527)
at hudson.scm.SCM.compareRemoteRevisionWith(SCM.java:381)
at hudson.scm.SCM.poll(SCM.java:398)
at hudson.model.AbstractProject._poll(AbstractProject.java:1453)
at hudson.model.AbstractProject.poll(AbstractProject.java:1356)
at hudson.triggers.SCMTrigger$Runner.runPolling(SCMTrigger.java:526)
at hudson.triggers.SCMTrigger$Runner.run(SCMTrigger.java:555)
at 
hudson.util.SequentialExecutionQueue$QueueEntry.run(SequentialExecutionQueue.java:119)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

kafka.producer.ProducerTest > testSendWithDeadBroker PASSED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit STARTED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig STARTED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig PASSED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails STARTED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithStringOffset STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithStringOffset PASSED

kafka.tools.ConsoleConsumerTest > shouldParseConfigsFromFile STARTED

kafka.tools.ConsoleConsumerTest > shouldParseConfigsFromFile PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidOldConsumerValidConfig STARTED

kafka.tools.ConsoleConsumerTest > shouldParseValidOldConsumerValidConfig PASSED

kafka.tools.ConsoleProducerTest > testParseKeyProp STARTED

kafka.tools.ConsoleProducerTest > testParseKeyProp PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer PASSED

kafka.tools.ConsoleProducerTest > testInvalidConfigs STARTED

kafka.tools.ConsoleProducerTest > testInvalidConfigs PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer PASSED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandler STARTED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandler PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes STARTED


[jira] [Work stopped] (KAFKA-3429) Remove Serdes needed for repartitioning in KTable stateful operations

2016-07-01 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3429 stopped by Matthias J. Sax.
--
> Remove Serdes needed for repartitioning in KTable stateful operations
> -
>
> Key: KAFKA-3429
> URL: https://issues.apache.org/jira/browse/KAFKA-3429
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>  Labels: api, newbie++
> Fix For: 0.10.1.0
>
>
> Currently in KTable aggregate operations where a repartition is possibly 
> needed since the aggregation key may not be the same as the original primary 
> key, we require the users to provide serdes (default to configured ones) for 
> read / write to the internally created re-partition topic. However, these are 
> not necessary since for all KTable instances either generated from the topics 
> directly:
> {code}table = builder.table(...){code}
> or from aggregation operations:
> {code}table = stream.aggregate(...){code}
> There are already serde provided for materializing the data, and hence the 
> same serde can be re-used when the resulted KTable is involved in future 
> aggregation operations. For example:
> {code}
> table1 = stream.aggregateByKey(serde);
> table2 = table1.aggregate(aggregator, selector, originalSerde, 
> aggregateSerde);
> {code}
> We would not need to require users to specify the "originalSerde" in 
> table1.aggregate since it could always reuse the "serde" from 
> stream.aggregateByKey, which is used to materialize the table1 object.
> In order to get ride of it, implementation-wise we need to carry the serde 
> information along with the KTableImpl instance in order to re-use it in a 
> future operation that requires repartitioning.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1580: Hotfix: auto-repartitioing for merge and code simp...

2016-07-01 Thread mjsax
GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/1580

Hotfix: auto-repartitioing for merge and code simplication

follow-up to auto-through feature:
 - add sourceNode to transform()
 - enable auto-repartitioning in merge()
 - null check not required anymore (always join-able due to auto-through)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1580.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1580


commit 55af0bf95510a39580fc794a674a2f542e9740b0
Author: Matthias J. Sax 
Date:   2016-06-29T19:11:50Z

hotfix: check not required anymore; cannot be null anymore (we have 
auto-through)

commit 249623a112795b0fd3693ed0c0f175f250c2423b
Author: Matthias J. Sax 
Date:   2016-06-30T14:32:58Z

add sourceNodes to transform()
some more minor improvements

commit 564185e2cd3d42b45f2665c29f6ca31a70991123
Author: Matthias J. Sax 
Date:   2016-07-01T19:43:18Z

fixed repartitioing bug in merge()




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #1567: MINOR: fix Bash shebang on vagrant/ scripts

2016-07-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1567


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Kafka Connect Transformers

2016-07-01 Thread Nisarg Shah
Need to submit a KIP for https://issues.apache.org/jira/browse/KAFKA-3209. 
Please provide wiki write access to ‘snisarg’. 

Thanks,
Nisarg Shah.

> On Jun 28, 2016, at 6:27 PM, Nisarg Shah  wrote:
> 
> Need permissions to edit the wiki. Username is ‘snisarg’. 
> 
> Thanks,
> Nisarg.
> 
>> On Jun 28, 2016, at 09:08, Nisarg Shah > > wrote:
>> 
>> Hello,
>> 
>> I need to create a page so that I can write a Kafka Improvement Proposal for 
>> the below. My username is ‘snisarg’. 
>> 
>> Thanks,
>> Nisarg
>> 
>>> On Jun 19, 2016, at 10:43 PM, Nisarg Shah >> > wrote:
>>> 
>>> Hello,
>>> 
>>> I am looking to do https://issues.apache.org/jira/browse/KAFKA-3209 
>>> . I wanted feedback from 
>>> the devs for the design that I’m proposing to put in place. Thanks a lot 
>>> for all the discussions Ewen Cheslack-Postava.
>>> 
>>> A gist of how I plan to do it is by using ‘Transformers’ that can be 
>>> configurationally chained together and data will pass through them between 
>>> a source and destination for Kafka Connect.
>>> 
>>> To set up transformers, we propose using the properties to define 
>>> Transformer classes one after the other. 
>>> transformer=abc.Transformer1,xyz.Transformer2
>>> 
>>> Each transformer can get specific properties passed on from the same 
>>> properties file, as it is with the Connectors.
>>> 
>>> About the actual signature for the transformation function that does all 
>>> the work, how’s this interface? 
>>> public abstract class Transformer {
>>> public abstract T2 transform(T1 t1);
>>> 
>>> public void initialize(Map props) {}
>>> }
>>> 
>>> Approach 1:
>>> Functionally, the complete data can be passed. 
>>> Just as the *Tasks get a complete List<*Record>, the transformer can get 
>>> the same. The whole list passing makes rearranging or merging data 
>>> possible. This can be helpful if transformations require looking up or down 
>>> the messages. Allowing custom datatypes between transformers will allow 
>>> custom objects to be passed around intermediate. Casting could be an issue.
>>> 
>>> Approach 2: 
>>> Taking a simplistic approach and doing a message by message transformation. 
>>> The transformer could store data from the previous message, but not go down 
>>> the list of messages. From the comments by Michael Graff, both approaches 
>>> would work, but if down looking is required, we would have to go with 
>>> Approach 1. 
>>> 
>>> I will also have a working change ready for Approach 1 very soon but till 
>>> then, please give me your suggestions. 
>>> 
>>> Thanks,
>>> Nisarg.
>>> 
>>> 
>>> 
>>> 
>> 
> 



[jira] [Commented] (KAFKA-3854) Subsequent regex subscription calls fail

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359384#comment-15359384
 ] 

ASF GitHub Bot commented on KAFKA-3854:
---

GitHub user vahidhashemian reopened a pull request:

https://github.com/apache/kafka/pull/1572

KAFKA-3854: Fix issues with new consumer's subsequent regex (pattern) 
subscriptions

This patch fixes two issues:
1. Subsequent regex subscriptions fail with the new consumer.
2. Subsequent regex subscriptions would not immediately refresh metadata to 
change the subscription of the new consumer and trigger a rebalance.

The final note on the JIRA stating that a later created topic that matches 
a consumer's subscription pattern would not be assigned to the consumer upon 
creation seems to be as designed. A repeat 
`subscribe()` to the same pattern or some wait time until the next 
automatic metadata refresh would handle that case.

An integration test was also added to verify these issues are fixed with 
this PR.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-3854

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1572.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1572


commit 5fb472a48a0231aece798d601c3427bd058fc078
Author: Vahid Hashemian 
Date:   2016-06-16T21:19:32Z

KAFKA-3854: Fix issues with new consumer's subsequent regex (pattern) 
subscriptions

This patch fixes two issues:
1. Subsequent regex subscriptions fail with the new consumer.
2. Subsequent regex subscriptions would not actually refresh metadata and 
change the subscription of the new consumer nor they would trigger a rebalance.

The final note on the JIRA stating that a later created topic that matches 
a consumer's subscription pattern would not be assigned to the consumer upon 
creation seems to be as designed. A repeat subscribe() to the same pattern 
would be needed to handle that case.
Unit tests for regex subscriptions will be handled in KAFKA-3897.




> Subsequent regex subscription calls fail
> 
>
> Key: KAFKA-3854
> URL: https://issues.apache.org/jira/browse/KAFKA-3854
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>
> There are a couple of issues with regex subscription in the new consumer:
> h6. Subsequent Subscriptions Fail
> When consecutive calls are made to new consumer's [regex 
> subscription|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L850],
>  like below: 
> {code}
> consumer.subscribe(Pattern.compile("..."), listener);
> consumer.poll(0);
> consumer.subscribe(Pattern.compile("f.."), listener);
> consumer.poll(0);
> {code}
> the second call fails with the following error:
> {code}
> Exception in thread "main" java.lang.IllegalStateException: Subscription to 
> topics, partitions and pattern are mutually exclusive
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.subscribe(SubscriptionState.java:175)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(KafkaConsumer.java:854)
>   ...
> {code}
> h6. Subsequent Subscriptions Fail to Trigger a Subscription Change and 
> Rebalance
> Even after the code is tweaked to get around the above issue, only the first 
> call to regex subscription triggers an actual subscription and the 
> subscription's rebalance listener. The reason is the regex {{subscription}} / 
> {{poll}} does not directly call {{changeSubscription(...)}} in which 
> {{needsPartitionAssignment}} is set and causes a rebalance. This method is 
> called only during the first regex {{subscription}} / {{poll}} when 
> coordinator is unknown and [a {{client.poll}} 
> call|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L179]
>  is made which eventually leads to [a {{changeSubscription}} 
> call|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L161].
>  The subsequent call does not reach this point because the coordinator is 
> already known.
> It seems due to the same reason, if a consumer is subscribed to a pattern, 
> and later on a new topic is created that matches that pattern, the consumer 
> does not become subscribed it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1572: KAFKA-3854: Fix issues with new consumer's subsequ...

2016-07-01 Thread vahidhashemian
Github user vahidhashemian closed the pull request at:

https://github.com/apache/kafka/pull/1572


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #1572: KAFKA-3854: Fix issues with new consumer's subsequ...

2016-07-01 Thread vahidhashemian
GitHub user vahidhashemian reopened a pull request:

https://github.com/apache/kafka/pull/1572

KAFKA-3854: Fix issues with new consumer's subsequent regex (pattern) 
subscriptions

This patch fixes two issues:
1. Subsequent regex subscriptions fail with the new consumer.
2. Subsequent regex subscriptions would not immediately refresh metadata to 
change the subscription of the new consumer and trigger a rebalance.

The final note on the JIRA stating that a later created topic that matches 
a consumer's subscription pattern would not be assigned to the consumer upon 
creation seems to be as designed. A repeat 
`subscribe()` to the same pattern or some wait time until the next 
automatic metadata refresh would handle that case.

An integration test was also added to verify these issues are fixed with 
this PR.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-3854

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1572.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1572


commit 5fb472a48a0231aece798d601c3427bd058fc078
Author: Vahid Hashemian 
Date:   2016-06-16T21:19:32Z

KAFKA-3854: Fix issues with new consumer's subsequent regex (pattern) 
subscriptions

This patch fixes two issues:
1. Subsequent regex subscriptions fail with the new consumer.
2. Subsequent regex subscriptions would not actually refresh metadata and 
change the subscription of the new consumer nor they would trigger a rebalance.

The final note on the JIRA stating that a later created topic that matches 
a consumer's subscription pattern would not be assigned to the consumer upon 
creation seems to be as designed. A repeat subscribe() to the same pattern 
would be needed to handle that case.
Unit tests for regex subscriptions will be handled in KAFKA-3897.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3877) Gradle compiler daemon exits with non-zero exit code while running LogOffsetTest

2016-07-01 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359346#comment-15359346
 ] 

Vahid Hashemian commented on KAFKA-3877:


I've seen a few recent builds fail in 
{{kafka.utils.TestUtils$.waitUntilTrue(...)}} for some tests. I modified the 
default 5000 ms to 1 ms for my PR and it passed. Wondering whether the 5000 
ms needs to be increased.

> Gradle compiler daemon exits with non-zero exit code while running 
> LogOffsetTest
> 
>
> Key: KAFKA-3877
> URL: https://issues.apache.org/jira/browse/KAFKA-3877
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>  Labels: transient-unit-test-failure
>
> This happened in a recent build:
> {code}
> kafka.server.LogOffsetTest > testGetOffsetsBeforeNow STARTED
> :kafka-trunk-jdk8:core:test FAILED
> :test_core_2_11 FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':test_core_2_11'.
> > Process 'Gradle Compiler Daemon 1' finished with non-zero exit value 137
> {code}
> https://builds.apache.org/job/kafka-trunk-jdk8/702



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3877) Gradle compiler daemon exits with non-zero exit code while running LogOffsetTest

2016-07-01 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359335#comment-15359335
 ] 

Guozhang Wang commented on KAFKA-3877:
--

Just saw another one of this test case failure: 
https://builds.apache.org/job/kafka-trunk-git-pr-jdk7/4497/console

> Gradle compiler daemon exits with non-zero exit code while running 
> LogOffsetTest
> 
>
> Key: KAFKA-3877
> URL: https://issues.apache.org/jira/browse/KAFKA-3877
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>  Labels: transient-unit-test-failure
>
> This happened in a recent build:
> {code}
> kafka.server.LogOffsetTest > testGetOffsetsBeforeNow STARTED
> :kafka-trunk-jdk8:core:test FAILED
> :test_core_2_11 FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':test_core_2_11'.
> > Process 'Gradle Compiler Daemon 1' finished with non-zero exit value 137
> {code}
> https://builds.apache.org/job/kafka-trunk-jdk8/702



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3753) Add approximateNumEntries() to the StateStore interface for metrics reporting

2016-07-01 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-3753.
--
Resolution: Fixed

This JIRA has been merged in as PR 1486.

> Add approximateNumEntries() to the StateStore interface for metrics reporting
> -
>
> Key: KAFKA-3753
> URL: https://issues.apache.org/jira/browse/KAFKA-3753
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Assignee: Jeff Klukas
>Priority: Minor
>  Labels: api
> Fix For: 0.10.1.0
>
>
> As a developer building a Kafka Streams application, I'd like to have 
> visibility into what's happening with my state stores. How can I know if a 
> particular store is growing large? How can I know if a particular store is 
> frequently needing to hit disk?
> I'm interested to know if there are existing mechanisms for extracting this 
> information or if other people have thoughts on how we might approach this.
> I can't think of a way to provide metrics generically, so each state store 
> implementation would likely need to handle this separately. Given that the 
> default RocksDBStore will likely be the most-used, it would be a first target 
> for adding metrics.
> I'd be interested in knowing the total number of entries in the store, the 
> total size on disk and in memory, rates of gets and puts, and hit/miss ratio 
> for the MemoryLRUCache. Some of these numbers are likely calculable through 
> the RocksDB API, others may simply not be accessible.
> Would there be value to the wider community in having state stores register 
> metrics?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3538) Abstract the creation/retrieval of Producer for stream sinks for unit testing

2016-07-01 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-3538.
--
   Resolution: Duplicate
Fix Version/s: (was: 0.10.1.0)
   0.10.0.1

> Abstract the creation/retrieval of Producer for stream sinks for unit testing
> -
>
> Key: KAFKA-3538
> URL: https://issues.apache.org/jira/browse/KAFKA-3538
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Michael Coon
>Assignee: Guozhang Wang
>Priority: Minor
>  Labels: semantics
> Fix For: 0.10.0.1
>
>
> The StreamThread creates producer/consumers directly as KafkaProducer and 
> KafkaConsumer, thus eliminating my ability to unit test my streams code 
> without having an active Kafka nearby. Could this be abstracted in a way that 
> it relies on an optional ProducerProvider or ConsumerProvider implementation 
> that could inject a mock producer/consumer for unit testing? We do this in 
> all our kafka code for unit testing and if a provider is not offered at 
> runtime, we create the concrete KafkaProdocer/Consumer components by default.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3538) Abstract the creation/retrieval of Producer for stream sinks for unit testing

2016-07-01 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359264#comment-15359264
 ] 

Guozhang Wang commented on KAFKA-3538:
--

[~mdcoon] I think this has been fixed as in 
https://issues.apache.org/jira/browse/KAFKA-3616, hence resolving this ticket.

> Abstract the creation/retrieval of Producer for stream sinks for unit testing
> -
>
> Key: KAFKA-3538
> URL: https://issues.apache.org/jira/browse/KAFKA-3538
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Michael Coon
>Assignee: Guozhang Wang
>Priority: Minor
>  Labels: semantics
> Fix For: 0.10.1.0
>
>
> The StreamThread creates producer/consumers directly as KafkaProducer and 
> KafkaConsumer, thus eliminating my ability to unit test my streams code 
> without having an active Kafka nearby. Could this be abstracted in a way that 
> it relies on an optional ProducerProvider or ConsumerProvider implementation 
> that could inject a mock producer/consumer for unit testing? We do this in 
> all our kafka code for unit testing and if a provider is not offered at 
> runtime, we create the concrete KafkaProdocer/Consumer components by default.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3429) Remove Serdes needed for repartitioning in KTable stateful operations

2016-07-01 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359231#comment-15359231
 ] 

Guozhang Wang commented on KAFKA-3429:
--

I think this ticket should be re-defined a bit for KTable.groupBy: 

Today if user does not provide a serde when calling this operator, the default 
serde from the configs are used. But for a special case where the KTable was 
created directly from an aggregate operator, we can use the key-serdes 
specified in the previous aggregate operator instead of the default config in 
configs, and if there is no {{mapValues}} in between, we can also use the 
value-serdes in the previous aggregate operator as well.

And it now seems be quite a nitch optimization.



> Remove Serdes needed for repartitioning in KTable stateful operations
> -
>
> Key: KAFKA-3429
> URL: https://issues.apache.org/jira/browse/KAFKA-3429
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>  Labels: api, newbie++
> Fix For: 0.10.1.0
>
>
> Currently in KTable aggregate operations where a repartition is possibly 
> needed since the aggregation key may not be the same as the original primary 
> key, we require the users to provide serdes (default to configured ones) for 
> read / write to the internally created re-partition topic. However, these are 
> not necessary since for all KTable instances either generated from the topics 
> directly:
> {code}table = builder.table(...){code}
> or from aggregation operations:
> {code}table = stream.aggregate(...){code}
> There are already serde provided for materializing the data, and hence the 
> same serde can be re-used when the resulted KTable is involved in future 
> aggregation operations. For example:
> {code}
> table1 = stream.aggregateByKey(serde);
> table2 = table1.aggregate(aggregator, selector, originalSerde, 
> aggregateSerde);
> {code}
> We would not need to require users to specify the "originalSerde" in 
> table1.aggregate since it could always reuse the "serde" from 
> stream.aggregateByKey, which is used to materialize the table1 object.
> In order to get ride of it, implementation-wise we need to carry the serde 
> information along with the KTableImpl instance in order to re-use it in a 
> future operation that requires repartitioning.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3923) MetricReporter interface depends on final class KafkaMetric instead of Metric interface

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359202#comment-15359202
 ] 

ASF GitHub Bot commented on KAFKA-3923:
---

GitHub user stepio opened a pull request:

https://github.com/apache/kafka/pull/1579

KAFKA-3923: Use Metric interface instead of final KafkaMetric in 
MetricsReporter and its implementations

This is a patch for the previously highlighted issue:
https://issues.apache.org/jira/browse/KAFKA-3923

I'm minimised the scope of changes, so that now Metrics class is not 
affected and generics are introduced on the method level instead of class level.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/stepio/kafka metrics_reporter

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1579.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1579


commit 06841765707f40d553ab9c7ae635bfc67b7bb05d
Author: Igor Stepanov 
Date:   2016-07-01T16:10:55Z

KAFKA-3923: Use Metric interface instead of final KafkaMetric in 
MetricsReporter and its implementations
 - simplifies mocking of metrics for unit tests




> MetricReporter interface depends on final class KafkaMetric instead of Metric 
> interface
> ---
>
> Key: KAFKA-3923
> URL: https://issues.apache.org/jira/browse/KAFKA-3923
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Igor Stepanov
>
> Hello,
> I'm working on exposing Kafka's consumer/producer metrics to Spring Actuator.
> To achieve this, I've implemented Kafka's MetricReporter interface to allow 
> injecting it into the appropriate consumer/producer. No issues with 
> implementation itself, fine for me.
> But now I've moved to writing unit tests for this implementation and decided 
> to use mocked KafkaMetric instances for this. But mocking of KafkaMetric 
> itself is not so plain - the class is final. The logical step is to use 
> Metric interface for mocking, but MetricReporter accepts only KafkaMetric.
> I know that technically I can use PowerMock and most probably it will work 
> fine in this case, but talking about Kafka itself, is it a good approach to 
> depend on the exact implementation when interface is available?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1579: KAFKA-3923: Use Metric interface instead of final ...

2016-07-01 Thread stepio
GitHub user stepio opened a pull request:

https://github.com/apache/kafka/pull/1579

KAFKA-3923: Use Metric interface instead of final KafkaMetric in 
MetricsReporter and its implementations

This is a patch for the previously highlighted issue:
https://issues.apache.org/jira/browse/KAFKA-3923

I'm minimised the scope of changes, so that now Metrics class is not 
affected and generics are introduced on the method level instead of class level.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/stepio/kafka metrics_reporter

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1579.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1579


commit 06841765707f40d553ab9c7ae635bfc67b7bb05d
Author: Igor Stepanov 
Date:   2016-07-01T16:10:55Z

KAFKA-3923: Use Metric interface instead of final KafkaMetric in 
MetricsReporter and its implementations
 - simplifies mocking of metrics for unit tests




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] KIP-60: Make Java client class loading more flexible

2016-07-01 Thread Edoardo Comar
+1 (non binding)
--
Edoardo Comar
MQ Cloud Technologies
eco...@uk.ibm.com
+44 (0)1962 81 5576 
IBM UK Ltd, Hursley Park, SO21 2JN

IBM United Kingdom Limited Registered in England and Wales with number 
741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 
3AU



From:   Rajini Sivaram 
To: dev@kafka.apache.org
Date:   01/07/2016 12:27
Subject:[VOTE] KIP-60: Make Java client class loading more 
flexible



I would like to initiate the voting process for KIP-60 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-60+-+Make+Java+client+classloading+more+flexible
).
This is a simple set of changes that are fully compatible with the 
existing
class loading in Kafka and enables Java clients to be run in
multi-classloader environments including OSGi.

Thank you...


Regards,

Rajini



Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU


Re: [VOTE] KIP-60: Make Java client class loading more flexible

2016-07-01 Thread Jay Kreps
+1

-Jay

On Fri, Jul 1, 2016 at 4:27 AM, Rajini Sivaram  wrote:

> I would like to initiate the voting process for KIP-60 (
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-60+-+Make+Java+client+classloading+more+flexible
> ).
> This is a simple set of changes that are fully compatible with the existing
> class loading in Kafka and enables Java clients to be run in
> multi-classloader environments including OSGi.
>
> Thank you...
>
>
> Regards,
>
> Rajini
>


Re: [DISCUSS] KIP-67: Queryable state for Kafka Stream

2016-07-01 Thread Jay Kreps
We have not used the "get" prefex in methods, like getXyz(), elsewhere in
our java code, instead sticking with the scala style methods like xyz().
It'd be good to change those.

-Jay

On Fri, Jul 1, 2016 at 4:09 AM, Damian Guy  wrote:

> Hi,
>
> We've made some modifications to the KIP. The "Discovery" API has been
> changed
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams#KIP-67:QueryablestateforKafkaStreams-Step2inproposal:globaldiscoveryofstatestores
>
> Please take a look.
>
> Many thanks,
> Damian
>
> On Tue, 28 Jun 2016 at 09:34 Damian Guy  wrote:
>
> > Hi,
> >
> > We have created KIP 67: Queryable state for Kafka Streams`
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
> >
> > Please take a look. Feedback is appreciated.
> >
> > Thank you
> >
>


Brokers are crash due to __consumer_offsets folder are deleted

2016-07-01 Thread 黄杰斌
Hi All,

Do you encounter below issue when using kafka_2.11-0.10.0.0?
All brokers are crash due to __consumer_offsets folder are deleted.
 sample log:
[2016-06-30 12:46:32,579] FATAL [Replica Manager on Broker 2]: Halting due
to unrecoverable I/O error while handling produce request:
 (kafka.server.ReplicaManager)
kafka.common.KafkaStorageException: I/O exception in append to log
'__consumer_offsets-32'
at kafka.log.Log.append(Log.scala:329)
at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:443)
at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:429)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:237)
at
kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
at
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:406)
at
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:392)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:392)
at
kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:328)
at
kafka.coordinator.GroupMetadataManager.store(GroupMetadataManager.scala:232)
at
kafka.coordinator.GroupCoordinator$$anonfun$handleCommitOffsets$9.apply(GroupCoordinator.scala:424)
at
kafka.coordinator.GroupCoordinator$$anonfun$handleCommitOffsets$9.apply(GroupCoordinator.scala:424)
at scala.Option.foreach(Option.scala:257)
at
kafka.coordinator.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:424)
at
kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:310)
at kafka.server.KafkaApis.handle(KafkaApis.scala:84)
at
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException:
/tmp/kafka2-logs/__consumer_offsets-32/.index (No such
file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.(RandomAccessFile.java:243)
at
kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:286)
at
kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:285)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
at kafka.log.OffsetIndex.resize(OffsetIndex.scala:285)
at
kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetIndex.scala:274)
at
kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:274)
at
kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:274)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:273)
at kafka.log.Log.roll(Log.scala:655)
at kafka.log.Log.maybeRoll(Log.scala:630)
at kafka.log.Log.append(Log.scala:383)
... 23 more

No one remove those folders, and topic __consumer_offsets is handled by
broker, no one can remove this topic.
Do you know why this happened? And how to avoid it?

Best Regards,
Ben


[VOTE] KIP-60: Make Java client class loading more flexible

2016-07-01 Thread Rajini Sivaram
I would like to initiate the voting process for KIP-60 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-60+-+Make+Java+client+classloading+more+flexible).
This is a simple set of changes that are fully compatible with the existing
class loading in Kafka and enables Java clients to be run in
multi-classloader environments including OSGi.

Thank you...


Regards,

Rajini


Re: [DISCUSS] Client Side Auto Topic Creation

2016-07-01 Thread Ismael Juma
Hi all,

I think there are a few things being discussed and it would be good to make
that explicit:

1. If and how we expose auto-topic creation in the client (under the
assumption that the server auto-topic creation will be deprecated and
eventually removed)
2. The ability to create topics with the cluster defaults for replication
factor and partition counts
3. Support for topic "specs"
4. The fact that some exceptions are retriable in some cases, but not others

My thoughts on each:

1. I prefer the approach where we throw an exception and let the clients
create the topic via `AdminClient` if that's what they need.
2. Like Grant, I'm unsure that will generally be used in a positive way.
However, if this is what we need to be able to deprecate server auto-topic
creation, the benefits outweigh the costs in my opinion.
3. Something like this would be good to have and could potentially provide
a better solution than 2. However, it needs a separate KIP and may take a
while for the final design to be agreed. So, it should not prevent progress
from being made in my opinion.
4. This has come up before. Encoding whether an exception is retriable or
not via inheritance is a bit restrictive. Also, something that should be
discussed separately, probably.

Ismael

On Wed, Jun 29, 2016 at 10:37 PM, Grant Henke  wrote:

> Hi Roger and Constantine,
>
> Thanks for the feedback.
>
> I agree that configuration to maintain guarantees is commonly spread across
> enterprise teams, making it difficult to get right. That said its also hard
> to solve for every company structure too. I think there is room for an open
> discussion about what configs should be able to be
> validated/enforced/overridden and where configurations should live. I think
> thats big enough for a whole new KIP and would like to push that discussion
> out until that KIP is opened. These discussions would also make sense in
> KIP-37
> - Add Namespaces to Kafka
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-37+-+Add+Namespaces+to+Kafka
> >.
> To ensure we allow validation and overrides at the namespace level.
>
> That said, KIP-4 will be introducing a config request/response protocol
>  and adding call to get/alter configs to the admin api. You could leverage
> that to do some of the client validation and defaulting based on your
> needs. Look for a discussion thread from me on that soon.
>
> As far as auto topic creation goes, it sounds like failing fast and
> allowing the client application to create the topic would provide the most
> flexibility to ensure the topic matches its needed specifications.
>
> Thanks,
> Grant
>
> On Wed, Jun 29, 2016 at 3:02 PM, Konstantin Zadorozhny <
> konstantin.zadoroz...@tubemogul.com> wrote:
>
> > Roger,
> >
> > I concur with everything you said.
> >
> > Couple more use cases to prove the point:
> >
> >1. Some topics should always have 1 and only one partition.
> >2. CDC application based on Kafka Connect. Those type of application
> >absolutely must know how to create properly configured topics:
> > compacted, 1
> >partition, replication factor 3, 2 min in sync replicas. In many cases
> > per
> >table or per database configuration overrides will be useful too.
> >
> > If producer and consumer are able to verify topic configuration on
> startup
> > would be really useful. A spec would be great way to document the intent
> of
> > the code. A lot of silly (but quite hard to pin down) production issues
> > could have been prevented by having producer to fail fast on
> misconfigured
> > topics.
> >
> > To add to the auto-creation configuration tally. We do have topic
> > auto-creation disabled on all our clusters.
> >
> > *Konstantin Zadorozhny*
> > www.tubemogul.com
> >
> > On Wed, Jun 29, 2016 at 11:17 AM, Roger Hoover 
> > wrote:
> >
> > > My comments go a bit beyond just topic creation but I'd like to see
> Kafka
> > > make it easier for application developers to specify their requirements
> > > declaratively in a single place.  Today, for example, if your
> application
> > > requires strong guarantees against data loss, you must set a mix of
> > > topic-level configs (replication factor, min.in.sync.replicas,
> > > retention.ms)
> > > and client configs (acks=all and
> > > possibly max.in.flight.requests.per.connection if you care about
> > > ordering).  This can be complicated by organizational structure where
> you
> > > have a different team (SREs) responsible for the cluster configs and
> > > perhaps topic creation and application teams responsible for the client
> > > settings.  Let's say that you get all the settings right up front.  How
> > > would you know if they later were changed incorrectly?  How do admins
> > know
> > > which topics are ok to add more partitions are which are not?  How do
> > > downstream applications know how much retention they can rely on for
> > > re-processing in their upstream topics.
> > >
> > > I think it's 

Re: [DISCUSS] KIP-67: Queryable state for Kafka Stream

2016-07-01 Thread Damian Guy
Hi,

We've made some modifications to the KIP. The "Discovery" API has been
changed
https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams#KIP-67:QueryablestateforKafkaStreams-Step2inproposal:globaldiscoveryofstatestores

Please take a look.

Many thanks,
Damian

On Tue, 28 Jun 2016 at 09:34 Damian Guy  wrote:

> Hi,
>
> We have created KIP 67: Queryable state for Kafka Streams`
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
>
> Please take a look. Feedback is appreciated.
>
> Thank you
>


[jira] [Updated] (KAFKA-3923) MetricReporter interface depends on final class KafkaMetric instead of Metric interface

2016-07-01 Thread Igor Stepanov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Stepanov updated KAFKA-3923:
-
Description: 
Hello,

I'm working on exposing Kafka's consumer/producer metrics to Spring Actuator.

To achieve this, I've implemented Kafka's MetricReporter interface to allow 
injecting it into the appropriate consumer/producer. No issues with 
implementation itself, fine for me.

But now I've moved to writing unit tests for this implementation and decided to 
use mocked KafkaMetric instances for this. But mocking of KafkaMetric itself is 
not so plain - the class is final. The logical step is to use Metric interface 
for mocking, but MetricReporter accepts only KafkaMetric.

I know that technically I can use PowerMock and most probably it will work fine 
in this case, but talking about Kafka itself, is it a good approach to depend 
on the exact implementation when interface is available?

  was:
Hello,

I'm working on exposing Kafka's consumer/producer metrics to Spring Actuator.

To achieve this, I've implemented Kafka's MetricReporter interface to allow 
injecting it into the appropriate consumer/producer. No issues with 
implementation itself, fine for me.

But now I've moved to writing unit tests for this implementation and decided to 
use mocked KafkaMetric instances for this. But mocking of KafkaMetric itself is 
not so plain - the class is final. The logical step is to use Metric interface 
for mocking, but MetricReporter accepts only KafkaMetric.

I know that technically I can use PowerMock and most probably it will work fine 
in this case, but talking about Kafka itself, is it a good approach to depend 
on the exact implementation when interface fulfils most of the needs?


> MetricReporter interface depends on final class KafkaMetric instead of Metric 
> interface
> ---
>
> Key: KAFKA-3923
> URL: https://issues.apache.org/jira/browse/KAFKA-3923
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Igor Stepanov
>
> Hello,
> I'm working on exposing Kafka's consumer/producer metrics to Spring Actuator.
> To achieve this, I've implemented Kafka's MetricReporter interface to allow 
> injecting it into the appropriate consumer/producer. No issues with 
> implementation itself, fine for me.
> But now I've moved to writing unit tests for this implementation and decided 
> to use mocked KafkaMetric instances for this. But mocking of KafkaMetric 
> itself is not so plain - the class is final. The logical step is to use 
> Metric interface for mocking, but MetricReporter accepts only KafkaMetric.
> I know that technically I can use PowerMock and most probably it will work 
> fine in this case, but talking about Kafka itself, is it a good approach to 
> depend on the exact implementation when interface is available?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3923) MetricReporter interface depends on final class KafkaMetric instead of Metric interface

2016-07-01 Thread Igor Stepanov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358761#comment-15358761
 ] 

Igor Stepanov commented on KAFKA-3923:
--

If you consider this worth to be improved, let me share what I've found already.
1. Tried to "generify" MetricsReporter:
{code:java}
public interface MetricsReporter extends Configurable {

public void init(List metrics);

public void metricChange(T metric);

...

}
{code}
This approach affected only MetricsReporter & JmxReporter and looked very 
promising, but I've got an error in Scala code:
{quote}
/Users/igorstepanov/git/github/kafka/core/src/main/scala/kafka/server/KafkaConfig.scala:887:
 trait MetricsReporter takes type parameters
  val metricReporterClasses: java.util.List[MetricsReporter] = 
getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, 
classOf[MetricsReporter])
^
/Users/igorstepanov/git/github/kafka/core/src/main/scala/kafka/server/KafkaConfig.scala:887:
 trait MetricsReporter takes type parameters
  val metricReporterClasses: java.util.List[MetricsReporter] = 
getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, 
classOf[MetricsReporter])

 ^
/Users/igorstepanov/git/github/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:99:
 trait MetricsReporter takes type parameters
  private val reporters: java.util.List[MetricsReporter] = 
config.metricReporterClasses
{quote}
2. Tried just replacing KafkaMetric with Metric in most cases - this affected 
also Metrics class (in addition to MetricsReporter & JmxReporter), but again 
got error in Scala:
{quote}
/Users/igorstepanov/git/github/kafka/core/src/main/scala/kafka/server/ClientQuotaManager.scala:121:
 type mismatch;
 found   : org.apache.kafka.common.Metric
 required: org.apache.kafka.common.metrics.KafkaMetric
throttleTimeMs = throttleTime(clientMetric, 
getQuotaMetricConfig(quota(clientId)))
{quote}

As I don't have the required experience with Scala, I've decided to stop at 
this point and consult with you.

> MetricReporter interface depends on final class KafkaMetric instead of Metric 
> interface
> ---
>
> Key: KAFKA-3923
> URL: https://issues.apache.org/jira/browse/KAFKA-3923
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Igor Stepanov
>
> Hello,
> I'm working on exposing Kafka's consumer/producer metrics to Spring Actuator.
> To achieve this, I've implemented Kafka's MetricReporter interface to allow 
> injecting it into the appropriate consumer/producer. No issues with 
> implementation itself, fine for me.
> But now I've moved to writing unit tests for this implementation and decided 
> to use mocked KafkaMetric instances for this. But mocking of KafkaMetric 
> itself is not so plain - the class is final. The logical step is to use 
> Metric interface for mocking, but MetricReporter accepts only KafkaMetric.
> I know that technically I can use PowerMock and most probably it will work 
> fine in this case, but talking about Kafka itself, is it a good approach to 
> depend on the exact implementation when interface fulfils most of the needs?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3923) MetricReporter interface depends on final class KafkaMetric instead of Metric interface

2016-07-01 Thread Igor Stepanov (JIRA)
Igor Stepanov created KAFKA-3923:


 Summary: MetricReporter interface depends on final class 
KafkaMetric instead of Metric interface
 Key: KAFKA-3923
 URL: https://issues.apache.org/jira/browse/KAFKA-3923
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Igor Stepanov


Hello,

I'm working on exposing Kafka's consumer/producer metrics to Spring Actuator.

To achieve this, I've implemented Kafka's MetricReporter interface to allow 
injecting it into the appropriate consumer/producer. No issues with 
implementation itself, fine for me.

But now I've moved to writing unit tests for this implementation and decided to 
use mocked KafkaMetric instances for this. But mocking of KafkaMetric itself is 
not so plain - the class is final. The logical step is to use Metric interface 
for mocking, but MetricReporter accepts only KafkaMetric.

I know that technically I can use PowerMock and most probably it will work fine 
in this case, but talking about Kafka itself, is it a good approach to depend 
on the exact implementation when interface fulfils most of the needs?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-1464) Add a throttling option to the Kafka replication tool

2016-07-01 Thread Ben Stopford (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Stopford reassigned KAFKA-1464:
---

Assignee: Ben Stopford  (was: Ismael Juma)

> Add a throttling option to the Kafka replication tool
> -
>
> Key: KAFKA-1464
> URL: https://issues.apache.org/jira/browse/KAFKA-1464
> Project: Kafka
>  Issue Type: New Feature
>  Components: replication
>Affects Versions: 0.8.0
>Reporter: mjuarez
>Assignee: Ben Stopford
>Priority: Minor
>  Labels: replication, replication-tools
> Fix For: 0.10.1.0
>
>
> When performing replication on new nodes of a Kafka cluster, the replication 
> process will use all available resources to replicate as fast as possible.  
> This causes performance issues (mostly disk IO and sometimes network 
> bandwidth) when doing this in a production environment, in which you're 
> trying to serve downstream applications, at the same time you're performing 
> maintenance on the Kafka cluster.
> An option to throttle the replication to a specific rate (in either MB/s or 
> activities/second) would help production systems to better handle maintenance 
> tasks while still serving downstream applications.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-07-01 Thread Andy Coates (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-3919:
---
Description: 
Hi All,

I encountered an issue with Kafka following a power outage that saw a 
proportion of our cluster disappear. When the power came back on several 
brokers halted on start up with the error:

{noformat}
Fatal error during KafkaServerStartable startup. Prepare to shutdown”
kafka.common.InvalidOffsetException: Attempt to append an offset 
(1239742691) to position 35728 no larger than the last offset appended 
(1239742822) to 
/data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
at 
kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
at kafka.log.LogSegment.recover(LogSegment.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

The only way to recover the brokers was to delete the log files that contained 
non monotonically incrementing offsets.

I’ve spent some time digging through the logs and I feel I may have worked out 
the sequence of events leading to this issue, (though this is based on some 
assumptions I've made about the way Kafka is working, which may be wrong).

Given:
* A topic that is produced to using acks = 1
* A topic that is produced to using gzip compression
* A topic that has min.isr set to less than the number of replicas, (i.e. 
min.isr=2, #replicas=3)
* Following ISRs are lagging behind the leader by some small number of 
messages, (which is normal with acks=1)
* brokers are configured with fairly large zk session timeout e.g. 30s.
* brokers are configured so that unclean leader elections are disabled.

Then:
When something like a power outage take out all three replicas, its possible to 
get into a state such that the indexes won’t rebuild on a restart and a broker 
fails to start. This can happen when:
* Enough brokers, but not the pre-outage leader, come on-line for the partition 
to be writeable
* Producers produce enough records to the partition that the head offset is now 
greater than the pre-outage leader head offset.
* The pre-outage leader comes back online.

At this point the logs on the pre-outage leader have diverged from the other 
replicas.  It has some messages that are not in the other replicas, and the 
other replicas have some records not in the pre-outage leader's log - at the 
same offsets.

I’m assuming that because the current leader has at higher offset than the 
pre-outage leader, the pre-outage leader just starts following the leader and 
requesting the records it thinks its missing.

I’m also assuming that because the producers were using gzip, so each record is 
actual a compressed message set, that iwhen the pre-outage leader requests 
records from the leader, the offset it requests could just happened to be in 
the middle of a compressed batch, but the leader returns the full batch.  When 
the pre-outage leader appends this batch to its own log it thinks all is OK. 
But what has happened is that the offsets in the log are no longer 
monotonically incrementing. Instead they actually dip by the number of records 
in the compressed batch that were before the requested offset.  If and when 
this broker restarts this dip may be at the 4K boundary the indexer checks. If 
it is, the broker won’t start.

Several of our brokers were unlucky enough to hit that 4K boundary, causing a 
protracted outage.  We’ve written a little utility that shows several more 

[jira] [Issue Comment Deleted] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-07-01 Thread Andy Coates (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-3919:
---
Comment: was deleted

(was: Hi [~junrao], thanks for taking the time to look at this.

Note: I've incorporated some of what I say below into the problem description 
above, so that it doesn't get lost in the comments.

First off, we have unclean leadership elections disable. (We did later enable 
them to help get around some other issues we were having, but this was several 
hours after this issue manifested).

We did look through data logs that were causing the brokers to not start. What 
we found before the incident was a monotonically increasing offset, where each 
compressed batch normally contained one or two records. Then the is a batch 
that contains many records, whose first records has an offset below the 
previous batch and whose last record has an offset above the previous batch. 
Following on from this there continues a period of large batches, with 
monotonically increasing offsets, and then the log returns to batches with one 
or two records.

Our working assumption here is that the period before the offset dip is 
pre-outage normal operation. The period of larger batches is from just after 
the outage, where producers have a back log to processes when the partition 
becomes available, and then things return to normal batch sizes again once the 
back log clears.

We did also look through the Kafka's application logs to try and piece together 
the series of events leading up to this:

Here’s what I know happened, with regards to one partition that has issues, 
from the logs:

Prior to outage:
Replicas for the partition are brokers 2011, 2012,  2024, with 2024 being the 
preferred leader.
Producers using acks=1, compression=gzip
Brokers configured with unclean.elections=false, zk.session-timeout=36s

Post outage:
2011 comes up first, (also as the Controller), recovers unflushed log set 
1239444214, completes load with offset 1239740602, and becomes leader of the 
partition.
2012 comes up next, recovers its log,  recovers unflushed log set 1239444214, 
truncates to offset 1239742830, (thats 2,228 records ahead of the recovered 
offset of the current leader), and starts following.
2024 comes up quickly after 2012.  recovers unflushed log set 1239444214, 
truncates to offset  1239742250, (thats 1,648 records ahead of the recovered 
offset of the current leader), and starts following.
The Controller adds 2024 to the replica set just before 2024 halts due to 
another partition having an offset greater than the leader.
The Controller adds 2012 to the replica set just before 2012 halts due to 
another partition having an offset greater than the leader.
When 2012 is next restarted, it fails to fully start as its complaining of 
invalid offsets in the log.

Our working hypothesis here is that the partition becomes writeable again, 
possibly as brokers 2012 & 2024 get added to the ISR set before halting, and 
maybe don’t remove themselves when they halt? - hence remain in the ISR set for 
36 seconds. Mean while our producers are happily sending large compressed 
batches, as they have a backlog, to broker 2011, which is accepting them, (as 
there are enough replicas in the ISR set), and appending them to its log - 
moving its offset beyond brokers 2012 and 2024.

Log entries:

(Interleaved log entries from the three brokers - the broker id is in the [id] 
brackets)

Just as the power was going out I see this in the broker that was the 
controller:

2016-04-11 12:01:42 - [2026] - "[Partition state machine on Controller 2026]: 
Invoking state change to OnlinePartition for partitions 
[mt_xp_its_music_main_itsevent,20]”

2016-04-11 12:01:56 - [2026] - "[Replica state machine on controller 2026]: 
Invoking state change to OfflineReplica for replicas
[Topic=mt_xp_its_music_main_itsevent,Partition=20,Replica=2024] 

2016-04-11 12:01:56 - [2026] - "[Controller 2026]: Cannot remove replica 2024 
from ISR of partition [mt_xp_its_music_main_itsevent,20] since it is not in the 
ISR. Leader = 2011 ; ISR = List(2011, 2012)”

2016-04-11 12:01:56 - [2026] - "[Channel manager on controller 2026]: Not 
sending request 
{controller_id=2026,controller_epoch=111,delete_partitions=0,partitions=[{topic=mt_xp_its_music_main_itsevent,partition=20}]}
 to broker 2024, since it is offline.”

2016-04-11 12:04:46 - [2026] - [Replica state machine on controller 2026]: 
Invoking state change to OnlineReplica for replicas
[Topic=mt_xp_its_music_main_itsevent,Partition=20,Replica=2024]

2016-04-11 12:04:58 - [2026] - "[Controller 2026]: Starting preferred replica 
leader election for partitions [mt_xp_its_music_main_itsevent,20]”

2016-04-11 12:04:58 - [2026] - "[Partition state machine on Controller 2026]: 
Invoking state change to OnlinePartition for partitions 
[mt_xp_its_music_main_itsevent,20]”

2016-04-11 12:04:58 - [2026] 

[jira] [Commented] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-07-01 Thread Andy Coates (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358704#comment-15358704
 ] 

Andy Coates commented on KAFKA-3919:


Hi [~junrao], thanks for taking the time to look at this.

Note: I've incorporated some of what I say below into the problem description 
above, so that it doesn't get lost in the comments.

First off, we have unclean leadership elections disable. (We did later enable 
them to help get around some other issues we were having, but this was several 
hours after this issue manifested).

We did look through data logs that were causing the brokers to not start. What 
we found before the incident was a monotonically increasing offset, where each 
compressed batch normally contained one or two records. Then the is a batch 
that contains many records, whose first records has an offset below the 
previous batch and whose last record has an offset above the previous batch. 
Following on from this there continues a period of large batches, with 
monotonically increasing offsets, and then the log returns to batches with one 
or two records.

Our working assumption here is that the period before the offset dip is 
pre-outage normal operation. The period of larger batches is from just after 
the outage, where producers have a back log to processes when the partition 
becomes available, and then things return to normal batch sizes again once the 
back log clears.

We did also look through the Kafka's application logs to try and piece together 
the series of events leading up to this:

Here’s what I know happened, with regards to one partition that has issues, 
from the logs:

Prior to outage:
Replicas for the partition are brokers 2011, 2012,  2024, with 2024 being the 
preferred leader.
Producers using acks=1, compression=gzip
Brokers configured with unclean.elections=false, zk.session-timeout=36s

Post outage:
2011 comes up first, (also as the Controller), recovers unflushed log set 
1239444214, completes load with offset 1239740602, and becomes leader of the 
partition.
2012 comes up next, recovers its log,  recovers unflushed log set 1239444214, 
truncates to offset 1239742830, (thats 2,228 records ahead of the recovered 
offset of the current leader), and starts following.
2024 comes up quickly after 2012.  recovers unflushed log set 1239444214, 
truncates to offset  1239742250, (thats 1,648 records ahead of the recovered 
offset of the current leader), and starts following.
The Controller adds 2024 to the replica set just before 2024 halts due to 
another partition having an offset greater than the leader.
The Controller adds 2012 to the replica set just before 2012 halts due to 
another partition having an offset greater than the leader.
When 2012 is next restarted, it fails to fully start as its complaining of 
invalid offsets in the log.

Our working hypothesis here is that the partition becomes writeable again, 
possibly as brokers 2012 & 2024 get added to the ISR set before halting, and 
maybe don’t remove themselves when they halt? - hence remain in the ISR set for 
36 seconds. Mean while our producers are happily sending large compressed 
batches, as they have a backlog, to broker 2011, which is accepting them, (as 
there are enough replicas in the ISR set), and appending them to its log - 
moving its offset beyond brokers 2012 and 2024.

Log entries:

(Interleaved log entries from the three brokers - the broker id is in the [id] 
brackets)

Just as the power was going out I see this in the broker that was the 
controller:

2016-04-11 12:01:42 - [2026] - "[Partition state machine on Controller 2026]: 
Invoking state change to OnlinePartition for partitions 
[mt_xp_its_music_main_itsevent,20]”

2016-04-11 12:01:56 - [2026] - "[Replica state machine on controller 2026]: 
Invoking state change to OfflineReplica for replicas
[Topic=mt_xp_its_music_main_itsevent,Partition=20,Replica=2024] 

2016-04-11 12:01:56 - [2026] - "[Controller 2026]: Cannot remove replica 2024 
from ISR of partition [mt_xp_its_music_main_itsevent,20] since it is not in the 
ISR. Leader = 2011 ; ISR = List(2011, 2012)”

2016-04-11 12:01:56 - [2026] - "[Channel manager on controller 2026]: Not 
sending request 
{controller_id=2026,controller_epoch=111,delete_partitions=0,partitions=[{topic=mt_xp_its_music_main_itsevent,partition=20}]}
 to broker 2024, since it is offline.”

2016-04-11 12:04:46 - [2026] - [Replica state machine on controller 2026]: 
Invoking state change to OnlineReplica for replicas
[Topic=mt_xp_its_music_main_itsevent,Partition=20,Replica=2024]

2016-04-11 12:04:58 - [2026] - "[Controller 2026]: Starting preferred replica 
leader election for partitions [mt_xp_its_music_main_itsevent,20]”

2016-04-11 12:04:58 - [2026] - "[Partition state machine on Controller 2026]: 
Invoking state change to OnlinePartition for partitions 
[mt_xp_its_music_main_itsevent,20]”

2016-04-11 12:04:58 

[jira] [Updated] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-07-01 Thread Andy Coates (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-3919:
---
Description: 
Hi All,

I encountered an issue with Kafka following a power outage that saw a 
proportion of our cluster disappear. When the power came back on several 
brokers halted on start up with the error:

{noformat}
Fatal error during KafkaServerStartable startup. Prepare to shutdown”
kafka.common.InvalidOffsetException: Attempt to append an offset 
(1239742691) to position 35728 no larger than the last offset appended 
(1239742822) to 
/data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
at 
kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
at kafka.log.LogSegment.recover(LogSegment.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

The only way to recover the brokers was to delete the log files that contained 
non monotonically incrementing offsets.

I’ve spent some time digging through the logs and I feel I may have worked out 
the sequence of events leading to this issue, (though this is based on some 
assumptions I've made about the way Kafka is working, which may be wrong):

Given:
* A topic that is produced to using acks = 1
* A topic that is produced to using gzip compression
* A topic that has min.isr set to less than the number of replicas, (i.e. 
min.isr=2, #replicas=3)
* Following ISRs are lagging behind the leader by some small number of 
messages, (which is normal with acks=1)
* brokers are configured with fairly large zk session timeout e.g. 30s.
* brokers are configured so that unclean leader elections are disabled.

Then:
When something like a power outage take out all three replicas, its possible to 
get into a state such that the indexes won’t rebuild on a restart and a broker 
fails to start. This can happen when:
* Enough brokers, but not the pre-outage leader, come on-line for the partition 
to be writeable
* Producers produce enough records to the partition that the head offset is now 
greater than the pre-outage leader head offset.
* The pre-outage leader comes back online.

At this point the logs on the pre-outage leader have diverged from the other 
replicas.  It has some messages that are not in the other replicas, and the 
other replicas have some records not in the pre-outage leader's log - at the 
same offsets.

I’m assuming that because the current leader has at higher offset than the 
pre-outage leader, the pre-outage leader just starts following the leader and 
requesting the records it thinks its missing.

I’m also assuming that because the producers were using gzip, so each record is 
actual a compressed message set, that iwhen the pre-outage leader requests 
records from the leader, the offset it requests could just happened to be in 
the middle of a compressed batch, but the leader returns the full batch.  When 
the pre-outage leader appends this batch to its own log it thinks all is OK. 
But what has happened is that the offsets in the log are no longer 
monotonically incrementing. Instead they actually dip by the number of records 
in the compressed batch that were before the requested offset.  If and when 
this broker restarts this dip may be at the 4K boundary the indexer checks. If 
it is, the broker won’t start.

Several of our brokers were unlucky enough to hit that 4K boundary, causing a 
protracted outage.  We’ve written a little utility that shows several more 

Re: [VOTE] KIP-55: Secure quotas for authenticated users

2016-07-01 Thread Rajini Sivaram
Thank you, Jun.

Hi all,

Please let me know if you have any comments or suggestions on the updated
KIP. If there are no objections, I will initiate voting next week.

Thank you...


On Thu, Jun 30, 2016 at 10:37 PM, Jun Rao  wrote:

> Rajini,
>
> The latest wiki looks good to me. Perhaps you want to ask other people to
> also take a look and then we can start the voting.
>
> Thanks,
>
> Jun
>
> On Tue, Jun 28, 2016 at 6:27 AM, Rajini Sivaram <
> rajinisiva...@googlemail.com> wrote:
>
> > Jun,
> >
> > Thank you for the review. I have changed all default property configs to
> be
> > stored with the node name . So the defaults are
> > /config/clients/ for default client-id quota,
> > /config/users/ for default user quota and
> > /config/users/ for default 
> > quota. Hope that makes sense.
> >
> > On Mon, Jun 27, 2016 at 10:25 PM, Jun Rao  wrote:
> >
> > > Rajini,
> > >
> > > Thanks for the update. Looks good to me. My only comment is that
> > > instead of /config/users//clients,
> > > would it be better to represent it as
> > > /config/users//clients/
> > > so that it's more consistent?
> > >
> > > Jun
> > >
> > >
> > > On Thu, Jun 23, 2016 at 2:16 PM, Rajini Sivaram <
> > > rajinisiva...@googlemail.com> wrote:
> > >
> > > > Jun,
> > > >
> > > > Yes, I agree that it makes sense to retain the existing semantics for
> > > > client-id quotas for compatibility. Especially if we can provide the
> > > option
> > > > to enable secure client-id quotas for multi-user clusters as well.
> > > >
> > > > I have updated the KIP - each of these levels can have defaults as
> well
> > > as
> > > > specific entries:
> > > >
> > > >- /config/clients : Insecure  quotas with the same
> > > semantics
> > > >as now
> > > >- /config/users: User quotas
> > > >- /config/users/userA/clients:  quotas for userA
> > > >- /config/users//clients: Default 
> quotas
> > > >
> > > > Now it is fully flexible as well as compatible with the current
> > > > implementation. I used /config/users//clients rather than
> > > > /config/users/clients since "clients" is a valid (unlikely, but still
> > > > possible) user principal. I used , but it could be anything
> > that
> > > > is a valid Zookeeper node name, but not a valid URL-encoded name.
> > > >
> > > > Thank you,
> > > >
> > > > Rajini
> > > >
> > > > On Thu, Jun 23, 2016 at 3:43 PM, Jun Rao  wrote:
> > > >
> > > > > Hi, Rajini,
> > > > >
> > > > > For the following statements, would it be better to allocate the
> > quota
> > > to
> > > > > all connections whose client-id is clientX? This way, existing
> > > client-id
> > > > > quotas are fully compatible in the new release whether the cluster
> is
> > > in
> > > > a
> > > > > single user or multi-user environment.
> > > > >
> > > > > 4. If client-id quota override is defined for clientX in
> > > > > /config/clients/clientX, this quota is allocated for the sole use
> of
> > > > >  > > > > clientX>
> > > > > 5. If dynamic client-id default is configured in /config/clients,
> > this
> > > > > default quota is allocated for the sole use of 
> > > > > 6. If quota.producer.default is configured for the broker in
> > > > > server.properties, this default quota is allocated for the sole use
> > of
> > > > >  > > > > clientX>
> > > > >
> > > > > We can potentially add a default quota for both user and client at
> > path
> > > > > /config/users/clients?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Wed, Jun 22, 2016 at 3:01 AM, Rajini Sivaram <
> > > > > rajinisiva...@googlemail.com> wrote:
> > > > >
> > > > > > Ismael, Jun,
> > > > > >
> > > > > > Thank you both for the feedback. Have updated the KIP to add
> > dynamic
> > > > > > default quotas for client-id with deprecation of existing static
> > > > default
> > > > > > properties.
> > > > > >
> > > > > >
> > > > > > On Wed, Jun 22, 2016 at 12:50 AM, Jun Rao 
> > wrote:
> > > > > >
> > > > > > > Yes, for consistency, perhaps we can allow client-id quota to
> be
> > > > > > configured
> > > > > > > dynamically too and mark the static config in the broker as
> > > > deprecated.
> > > > > > If
> > > > > > > both are set, the dynamic one wins.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Tue, Jun 21, 2016 at 3:56 AM, Ismael Juma <
> ism...@juma.me.uk>
> > > > > wrote:
> > > > > > >
> > > > > > > > On Tue, Jun 21, 2016 at 12:50 PM, Rajini Sivaram <
> > > > > > > > rajinisiva...@googlemail.com> wrote:
> > > > > > > >
> > > > > > > > > It is actually quite tempting to do the same for client-id
> > > quotas
> > > > > as
> > > > > > > > well,
> > > > > > > > > but I suppose we can't break existing users who have
> > configured
> > > > > > > defaults
> > > > > > > > in
> > > > > > > > > server.properties and providing two ways of setting
> client-id

[jira] [Comment Edited] (KAFKA-3705) Support non-key joining in KTable

2016-07-01 Thread Jan Filipiak (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358566#comment-15358566
 ] 

Jan Filipiak edited comment on KAFKA-3705 at 7/1/16 7:28 AM:
-

Hi, yes that is kinda where I am coming from. I completely understand where you 
are. 
Doing the change log case ( logging Change<> objects) is just one 
implementation of this repartitioning and mine is another one. I am very 
familiar with my approach as I wrote some Samza apps using this approach. It 
has many benefits that may or may not be of interest. (repartition-topics can 
also be used to bootstrap, fewer copies of the data (no need to make state HA, 
see previous) etc.). What we are still missing here is a mutual understanding 
of what I think keywidening does and how to expose that to users in a non 
insane manner.

Maybe I try it with your Json syntax. This is the very example we have and 
where this tickets feature would allow me to build it in the dsl level of the 
api.

So lets say I have 3 tables. A, B, C, i want to reach a point where I have C => 
> this will then be read by our application servers and 
servers them as a faster way to retrieves this than lets say the original 
mysql. B has foreign keys in A and C.

 
All tables start of as one topic. keyed by this tables primary key
Topic mysq__jadajadajada_A
A.PK => A
Topic mysq_B
B.PK => B
Topic mysq_C
C.PK => C

I am going to repartition B to A.PK now. In the first example without a widened 
key.
Then it stays B.PK => B but partitioned by A.PK accordingly.

then I can do the join with A and get
B.PK => joined

as of your previous comment:
{quote}
Then a join result of
\{a="a1", joined = join("a1-pre", "c1")} 
{quote}
Note the Key stays B.PK (unwindened).
Now I am going to repartition based on C.PK still maintaining
B.PK => joined
as the topic layout. 
Now, shit hits the fan. As I am doing my aggregation to become 
C,PK => List>

How would this aggregator looks now?

{code:java}
List> apply(B key, Joined value, List> current)
{
   Map m = listToMap(current, bKeyExtractorValueMapper,B.PK>);
   if(value == null)
   { 
  m.remove(key)
   }else
   {
 m.put(key,value)
   }
   return m.entrySet.asList

}
{code}

This wouldn't be much different with logged Changes> only the 
remove and add would be to methods. The problem is, that it doesn't
look wrong. But this code now has race conditions. Think about an update to the 
A.PK field of a B record that forces it to switch partitions.
(the C.PK value remains) then we publish a delete to the old partition and the 
new value to the new partition. Then we do the join. then we repartition on the 
non changed C.PK. This will make out code above see B.PK => null /remove B.PK 
=> Joined /add in no particular order. Hence the output is undefined. If 
we had forcefully by api widened the key to be Joined the error 
would not happen and users would be aware of what happens on repartitioning. I 
thought this through and it also happens with logging Change<>, as it is really 
just another implementation.

I hope this finally clarifies that key widening I am talking about. If not, 
maybe we should have a small skype or something. 
My recommendation is further to not implement this joins as logged Changes<> as 
it is just more resource intensive and less efficient also making the api more 
complicated.

PS.: Hive has seen all join types with MapJoins, Skewed Joines, you name it. 
all these are applicable to streams aswell. Maybe have them in the back of your 
head.








was (Author: jfilipiak):
Hi, yes that is kinda where I am coming from. I completely understand where you 
are. 
Doing the change log case ( logging Change<> objects) is just one 
implementation of this repartitioning and mine is another one. I am very 
familiar with my approach as I wrote some Samza apps using this approach. It 
has many benefits that may or may not be of interest. (repartition-topics can 
also be used to bootstrap, fewer copies of the data (no need to make state HA, 
see previous) etc.). What we are still missing here is a mutual understanding 
of what I think keywidening does and how to expose that to users in a non 
insane manner.

Maybe I try it with your Json syntax. This is the very example we have and 
where this tickets feature would allow me to build it in the dsl level of the 
api.

So lets say I have 3 tables. A, B, C, i want to reach a point where I have C => 
> this will then be read by our application servers and 
servers them as a faster way to retrieves this than lets say the original 
mysql. B has foreign keys in A and C.

 
All tables start of as one topic. keyed by this tables primary key
Topic mysq__jadajadajada_A
A.PK => A
Topic mysq_B
B.PK => B
Topic mysq_C
C.PK => C

I am going to 

[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable

2016-07-01 Thread Jan Filipiak (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358566#comment-15358566
 ] 

Jan Filipiak commented on KAFKA-3705:
-

Hi, yes that is kinda where I am coming from. I completely understand where you 
are. 
Doing the change log case ( logging Change<> objects) is just one 
implementation of this repartitioning and mine is another one. I am very 
familiar with my approach as I wrote some Samza apps using this approach. It 
has many benefits that may or may not be of interest. (repartition-topics can 
also be used to bootstrap, fewer copies of the data (no need to make state HA, 
see previous) etc.). What we are still missing here is a mutual understanding 
of what I think keywidening does and how to expose that to users in a non 
insane manner.

Maybe I try it with your Json syntax. This is the very example we have and 
where this tickets feature would allow me to build it in the dsl level of the 
api.

So lets say I have 3 tables. A, B, C, i want to reach a point where I have C => 
> this will then be read by our application servers and 
servers them as a faster way to retrieves this than lets say the original 
mysql. B has foreign keys in A and C.

 
All tables start of as one topic. keyed by this tables primary key
Topic mysq__jadajadajada_A
A.PK => A
Topic mysq_B
B.PK => B
Topic mysq_C
C.PK => C

I am going to repartition B to A.PK now. In the first example without a widened 
key.
Then it stays B.PK => B but partitioned by A.PK accordingly.

then I can do the join with A and get
B.PK => joined

as of your previous comment:
{quote}
Then a join result of
{a="a1", joined = join("a1-pre", "c1")} 
{quote}
Note the Key stays B.PK (unwindened).
Now I am going to repartition based on C.PK still maintaining
B.PK => joined
as the topic layout. 
Now, shit hits the fan. As I am doing my aggregation to become 
C,PK => List>

How would this aggregator looks now?

{code:java}
List> apply(B key, Joined value, List> current)
{
   Map m = listToMap(current, bKeyExtractorValueMapper,B.PK>);
   if(value == null)
   { 
  m.remove(key)
   }else
   {
 m.put(key,value)
   }
   return m.entrySet.asList

}
{code}

This wouldn't be much different with logged Changes> only the 
remove and add would be to methods. The problem is, that it doesn't
look wrong. But this code now has race conditions. Think about an update to the 
A.PK field of a B record that forces it to switch partitions.
(the C.PK value remains) then we publish a delete to the old partition and the 
new value to the new partition. Then we do the join. then we repartition on the 
non changed C.PK. This will make out code above see B.PK => null /remove B.PK 
=> Joined /add in no particular order. Hence the output is undefined. If 
we had forcefully by api widened the key to be Joined the error 
would not happen and users would be aware of what happens on repartitioning. I 
thought this through and it also happens with logging Change<>, as it is really 
just another implementation.

I hope this finally clarifies that key widening I am talking about. If not, 
maybe we should have a small skype or something. 
My recommendation is further to not implement this joins as logged Changes<> as 
it is just more resource intensive and less efficient also making the api more 
complicated.

PS.: Hive has seen all join types with MapJoins, Skewed Joines, you name it. 
all these are applicable to streams aswell. Maybe have them in the back of your 
head.







> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Liquan Pei
>  Labels: api
> Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk8 #727

2016-07-01 Thread Apache Jenkins Server
See 

Changes:

[me] MINOR: bug fixes to ducktape services

--
[...truncated 6656 lines...]

kafka.utils.SchedulerTest > testReentrantTaskInMockScheduler PASSED

kafka.utils.SchedulerTest > testPeriodicTask STARTED

kafka.utils.SchedulerTest > testPeriodicTask PASSED

kafka.utils.CommandLineUtilsTest > testParseEmptyArg STARTED

kafka.utils.CommandLineUtilsTest > testParseEmptyArg PASSED

kafka.utils.CommandLineUtilsTest > testParseSingleArg STARTED

kafka.utils.CommandLineUtilsTest > testParseSingleArg PASSED

kafka.utils.CommandLineUtilsTest > testParseArgs STARTED

kafka.utils.CommandLineUtilsTest > testParseArgs PASSED

kafka.utils.CommandLineUtilsTest > testParseEmptyArgAsValid STARTED

kafka.utils.CommandLineUtilsTest > testParseEmptyArgAsValid PASSED

kafka.utils.UtilsTest > testAbs STARTED

kafka.utils.UtilsTest > testAbs PASSED

kafka.utils.UtilsTest > testReplaceSuffix STARTED

kafka.utils.UtilsTest > testReplaceSuffix PASSED

kafka.utils.UtilsTest > testCircularIterator STARTED

kafka.utils.UtilsTest > testCircularIterator PASSED

kafka.utils.UtilsTest > testReadBytes STARTED

kafka.utils.UtilsTest > testReadBytes PASSED

kafka.utils.UtilsTest > testCsvList STARTED

kafka.utils.UtilsTest > testCsvList PASSED

kafka.utils.UtilsTest > testReadInt STARTED

kafka.utils.UtilsTest > testReadInt PASSED

kafka.utils.UtilsTest > testCsvMap STARTED

kafka.utils.UtilsTest > testCsvMap PASSED

kafka.utils.UtilsTest > testInLock STARTED

kafka.utils.UtilsTest > testInLock PASSED

kafka.utils.UtilsTest > testSwallow STARTED

kafka.utils.UtilsTest > testSwallow PASSED

kafka.utils.JsonTest > testJsonEncoding STARTED

kafka.utils.JsonTest > testJsonEncoding PASSED

kafka.utils.IteratorTemplateTest > testIterator STARTED

kafka.utils.IteratorTemplateTest > testIterator PASSED

kafka.metrics.KafkaTimerTest > testKafkaTimer STARTED

kafka.metrics.KafkaTimerTest > testKafkaTimer PASSED

kafka.metrics.MetricsTest > testMetricsReporterAfterDeletingTopic STARTED

kafka.metrics.MetricsTest > testMetricsReporterAfterDeletingTopic PASSED

kafka.metrics.MetricsTest > 
testBrokerTopicMetricsUnregisteredAfterDeletingTopic STARTED

kafka.metrics.MetricsTest > 
testBrokerTopicMetricsUnregisteredAfterDeletingTopic PASSED

kafka.metrics.MetricsTest > testMetricsLeak STARTED

kafka.metrics.MetricsTest > testMetricsLeak PASSED

kafka.consumer.TopicFilterTest > testWhitelists STARTED

kafka.consumer.TopicFilterTest > testWhitelists PASSED

kafka.consumer.TopicFilterTest > 
testWildcardTopicCountGetTopicCountMapEscapeJson STARTED

kafka.consumer.TopicFilterTest > 
testWildcardTopicCountGetTopicCountMapEscapeJson PASSED

kafka.consumer.TopicFilterTest > testBlacklists STARTED

kafka.consumer.TopicFilterTest > testBlacklists PASSED

kafka.consumer.PartitionAssignorTest > testRoundRobinPartitionAssignor STARTED
ERROR: Could not install GRADLE_2_4_RC_2_HOME
java.lang.NullPointerException
at 
hudson.plugins.toolenv.ToolEnvBuildWrapper$1.buildEnvVars(ToolEnvBuildWrapper.java:46)
at hudson.model.AbstractBuild.getEnvironment(AbstractBuild.java:947)
at hudson.plugins.git.GitSCM.getParamExpandedRepos(GitSCM.java:390)
at 
hudson.plugins.git.GitSCM.compareRemoteRevisionWithImpl(GitSCM.java:577)
at hudson.plugins.git.GitSCM.compareRemoteRevisionWith(GitSCM.java:527)
at hudson.scm.SCM.compareRemoteRevisionWith(SCM.java:381)
at hudson.scm.SCM.poll(SCM.java:398)
at hudson.model.AbstractProject._poll(AbstractProject.java:1453)
at hudson.model.AbstractProject.poll(AbstractProject.java:1356)
at hudson.triggers.SCMTrigger$Runner.runPolling(SCMTrigger.java:526)
at hudson.triggers.SCMTrigger$Runner.run(SCMTrigger.java:555)
at 
hudson.util.SequentialExecutionQueue$QueueEntry.run(SequentialExecutionQueue.java:119)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
ERROR: Could not install JDK1_8_0_66_HOME
java.lang.NullPointerException
at 
hudson.plugins.toolenv.ToolEnvBuildWrapper$1.buildEnvVars(ToolEnvBuildWrapper.java:46)
at hudson.model.AbstractBuild.getEnvironment(AbstractBuild.java:947)
at hudson.plugins.git.GitSCM.getParamExpandedRepos(GitSCM.java:390)
at 
hudson.plugins.git.GitSCM.compareRemoteRevisionWithImpl(GitSCM.java:577)
at hudson.plugins.git.GitSCM.compareRemoteRevisionWith(GitSCM.java:527)
at hudson.scm.SCM.compareRemoteRevisionWith(SCM.java:381)
at hudson.scm.SCM.poll(SCM.java:398)
at hudson.model.AbstractProject._poll(AbstractProject.java:1453)

Build failed in Jenkins: kafka-trunk-jdk7 #1397

2016-07-01 Thread Apache Jenkins Server
See 

Changes:

[me] MINOR: bug fixes to ducktape services

--
[...truncated 6716 lines...]

kafka.utils.ReplicationUtilsTest > testUpdateLeaderAndIsr STARTED

kafka.utils.ReplicationUtilsTest > testUpdateLeaderAndIsr PASSED

kafka.utils.ReplicationUtilsTest > testGetLeaderIsrAndEpochForPartition STARTED

kafka.utils.ReplicationUtilsTest > testGetLeaderIsrAndEpochForPartition PASSED

kafka.utils.timer.TimerTest > testAlreadyExpiredTask STARTED

kafka.utils.timer.TimerTest > testAlreadyExpiredTask PASSED

kafka.utils.timer.TimerTest > testTaskExpiration STARTED

kafka.utils.timer.TimerTest > testTaskExpiration PASSED

kafka.utils.timer.TimerTaskListTest > testAll STARTED

kafka.utils.timer.TimerTaskListTest > testAll PASSED

kafka.utils.SchedulerTest > testMockSchedulerNonPeriodicTask STARTED

kafka.utils.SchedulerTest > testMockSchedulerNonPeriodicTask PASSED

kafka.utils.SchedulerTest > testMockSchedulerPeriodicTask STARTED

kafka.utils.SchedulerTest > testMockSchedulerPeriodicTask PASSED

kafka.utils.SchedulerTest > testNonPeriodicTask STARTED

kafka.utils.SchedulerTest > testNonPeriodicTask PASSED

kafka.utils.SchedulerTest > testRestart STARTED

kafka.utils.SchedulerTest > testRestart PASSED

kafka.utils.SchedulerTest > testReentrantTaskInMockScheduler STARTED

kafka.utils.SchedulerTest > testReentrantTaskInMockScheduler PASSED

kafka.utils.SchedulerTest > testPeriodicTask STARTED

kafka.utils.SchedulerTest > testPeriodicTask PASSED

kafka.utils.CommandLineUtilsTest > testParseEmptyArg STARTED

kafka.utils.CommandLineUtilsTest > testParseEmptyArg PASSED

kafka.utils.CommandLineUtilsTest > testParseSingleArg STARTED

kafka.utils.CommandLineUtilsTest > testParseSingleArg PASSED

kafka.utils.CommandLineUtilsTest > testParseArgs STARTED

kafka.utils.CommandLineUtilsTest > testParseArgs PASSED

kafka.utils.CommandLineUtilsTest > testParseEmptyArgAsValid STARTED

kafka.utils.CommandLineUtilsTest > testParseEmptyArgAsValid PASSED

kafka.utils.UtilsTest > testAbs STARTED

kafka.utils.UtilsTest > testAbs PASSED

kafka.utils.UtilsTest > testReplaceSuffix STARTED

kafka.utils.UtilsTest > testReplaceSuffix PASSED

kafka.utils.UtilsTest > testCircularIterator STARTED

kafka.utils.UtilsTest > testCircularIterator PASSED

kafka.utils.UtilsTest > testReadBytes STARTED

kafka.utils.UtilsTest > testReadBytes PASSED

kafka.utils.UtilsTest > testCsvList STARTED

kafka.utils.UtilsTest > testCsvList PASSED

kafka.utils.UtilsTest > testReadInt STARTED

kafka.utils.UtilsTest > testReadInt PASSED

kafka.utils.UtilsTest > testCsvMap STARTED

kafka.utils.UtilsTest > testCsvMap PASSED

kafka.utils.UtilsTest > testInLock STARTED

kafka.utils.UtilsTest > testInLock PASSED

kafka.utils.UtilsTest > testSwallow STARTED

kafka.utils.UtilsTest > testSwallow PASSED

kafka.utils.JsonTest > testJsonEncoding STARTED

kafka.utils.JsonTest > testJsonEncoding PASSED

kafka.utils.IteratorTemplateTest > testIterator STARTED

kafka.utils.IteratorTemplateTest > testIterator PASSED

kafka.metrics.KafkaTimerTest > testKafkaTimer STARTED

kafka.metrics.KafkaTimerTest > testKafkaTimer PASSED

kafka.metrics.MetricsTest > testMetricsReporterAfterDeletingTopic STARTED

kafka.metrics.MetricsTest > testMetricsReporterAfterDeletingTopic PASSED

kafka.metrics.MetricsTest > 
testBrokerTopicMetricsUnregisteredAfterDeletingTopic STARTED

kafka.metrics.MetricsTest > 
testBrokerTopicMetricsUnregisteredAfterDeletingTopic PASSED

kafka.metrics.MetricsTest > testMetricsLeak STARTED

kafka.metrics.MetricsTest > testMetricsLeak PASSED

kafka.consumer.TopicFilterTest > testWhitelists STARTED

kafka.consumer.TopicFilterTest > testWhitelists PASSED

kafka.consumer.TopicFilterTest > 
testWildcardTopicCountGetTopicCountMapEscapeJson STARTED

kafka.consumer.TopicFilterTest > 
testWildcardTopicCountGetTopicCountMapEscapeJson PASSED

kafka.consumer.TopicFilterTest > testBlacklists STARTED

kafka.consumer.TopicFilterTest > testBlacklists PASSED

kafka.consumer.PartitionAssignorTest > testRoundRobinPartitionAssignor STARTED

kafka.consumer.PartitionAssignorTest > testRoundRobinPartitionAssignor PASSED

kafka.consumer.PartitionAssignorTest > testRangePartitionAssignor STARTED

kafka.consumer.PartitionAssignorTest > testRangePartitionAssignor PASSED

kafka.consumer.ZookeeperConsumerConnectorTest > testBasic STARTED

kafka.consumer.ZookeeperConsumerConnectorTest > testBasic PASSED

kafka.consumer.ZookeeperConsumerConnectorTest > testCompressionSetConsumption 
STARTED

kafka.consumer.ZookeeperConsumerConnectorTest > testCompressionSetConsumption 
PASSED

kafka.consumer.ZookeeperConsumerConnectorTest > testLeaderSelectionForPartition 
STARTED

kafka.consumer.ZookeeperConsumerConnectorTest > testLeaderSelectionForPartition 
PASSED

kafka.consumer.ZookeeperConsumerConnectorTest > testConsumerDecoder STARTED

kafka.consumer.ZookeeperConsumerConnectorTest > testConsumerDecoder PASSED


[GitHub] kafka pull request #1578: MINOR: remove "auto.commit.interval.ms" from "Manu...

2016-07-01 Thread xuwei-k
GitHub user xuwei-k opened a pull request:

https://github.com/apache/kafka/pull/1578

MINOR: remove "auto.commit.interval.ms" from "Manual Offset Control" example



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuwei-k/kafka Manual-Offset-Control-example

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1578.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1578


commit eb2c9e2602a94352e6b1e4c729512fa5dc554e62
Author: xuwei-k <6b656e6...@gmail.com>
Date:   2016-07-01T06:05:33Z

MINOR: remove "auto.commit.interval.ms" from "Manual Offset Control" example




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---