Re: [VOTE] KIP-376: Implement AutoClosable on appropriate classes that want to be used in a try-with-resource statement

2018-10-18 Thread Harsha Chintalapani
LGTM. +1 (binding)

--
Harsha


On Oct 18, 2018, 5:53 PM -0700, Matthias J. Sax , wrote:
> +1 (binding)
>
> Thanks for the KIP. And thanks for bumping the thread regularly. As
> 2.1.0 and 2.0.1 releases are running atm, it takes some time to get
> attention.
>
> -Matthias
>
> On 10/18/18 11:47 AM, Yishun Guan wrote:
> > Bumping this thread up again, thanks!
> > On Tue, Oct 16, 2018 at 11:24 AM Yishun Guan  wrote:
> > >
> > > Bumping this thread up again, thanks!
> > >
> > > On Fri, Oct 12, 2018, 4:53 PM Colin McCabe  wrote:
> > > >
> > > > On Fri, Oct 12, 2018, at 15:45, Yishun Guan wrote:
> > > > > Hi Colin,
> > > > >
> > > > > Thanks for your suggestions. I have modified the current KIP with your
> > > > > comments. However, I still think I should keep the entire list, 
> > > > > because it
> > > > > is a good way to keep track of which class need to be change, and 
> > > > > others
> > > > > can discuss if changes on these internal classes are necessary?
> > > >
> > > > Hi Yishun,
> > > >
> > > > I guess I don't feel that strongly about it. If you want to keep the 
> > > > internal classes in the list, that's fine. They don't really need to be 
> > > > in the KIP but it's OK if they're there.
> > > >
> > > > Thanks for working on this. +1 (binding).
> > > >
> > > > best,
> > > > Colin
> > > >
> > > > >
> > > > > Thanks,
> > > > > Yishun
> > > > >
> > > > > On Fri, Oct 12, 2018 at 11:42 AM Colin McCabe  
> > > > > wrote:
> > > > >
> > > > > > Hi Yishun,
> > > > > >
> > > > > > Thanks for looking at this.
> > > > > >
> > > > > > Under "proposed changes," it's not necessary to add a section where 
> > > > > > you
> > > > > > demonstrate adding "implements AutoCloseable" to the code. We know 
> > > > > > what
> > > > > > adding that would look like.
> > > > > >
> > > > > > Can you create a full, single, list of all the classes that would be
> > > > > > affected? It's not necessary to write who suggested which classes 
> > > > > > in the
> > > > > > KIP. Also, I noticed some of the classes here are in "internals"
> > > > > > packages. Given that these are internal classes that aren't part of 
> > > > > > our
> > > > > > API, it's not necessary to add them to the KIP, I think. Since they 
> > > > > > are
> > > > > > implementation details, they can be changed at any time without a 
> > > > > > KIP.
> > > > > >
> > > > > > The "compatibility" section should have a discussion of the fact 
> > > > > > that we
> > > > > > can add the new interface without requiring any 
> > > > > > backwards-incompatible
> > > > > > changes at the source or binary level. In particular, it would be 
> > > > > > good to
> > > > > > highlight that we are not renaming or changing the existing "close" 
> > > > > > methods.
> > > > > >
> > > > > > Under "rejected alternatives" we could explain why we chose to 
> > > > > > implement
> > > > > > AutoCloseable rather than Closeable.
> > > > > >
> > > > > > cheers,
> > > > > > Colin
> > > > > >
> > > > > >
> > > > > > On Thu, Oct 11, 2018, at 13:48, Yishun Guan wrote:
> > > > > > > Hi,
> > > > > > >
> > > > > > > Just to bump this voting thread up again. Thanks!
> > > > > > >
> > > > > > > Best,
> > > > > > > Yishun
> > > > > > > On Fri, Oct 5, 2018 at 12:58 PM Yishun Guan  
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > I think we have discussed this well enough to put this into a 
> > > > > > > > vote.
> > > > > > > >
> > > > > > > > Suggestions are welcome!
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Yishun
> > > > > > > >
> > > > > > > > On Wed, Oct 3, 2018, 2:30 PM Yishun Guan  
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > Hi All,
> > > > > > > > >
> > > > > > > > > I want to start a voting on this KIP:
> > > > > > > > >
> > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=93325308
> > > > > > > > >
> > > > > > > > > Here is the discussion thread:
> > > > > > > > >
> > > > > > https://lists.apache.org/thread.html/9f6394c28d3d11a67600d5d7001e8aaa318f1ad497b50645654bbe3f@%3Cdev.kafka.apache.org%3E
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Yishun
> > > > > >
>


Re: Request for contributor permissions

2018-10-18 Thread Gwen Shapira
Done :)

Welcome on board.
On Thu, Oct 18, 2018 at 6:58 AM Kent  wrote:

> JIRA ID: cjfan2...@gmail.com
> Cwiki ID: cjfan2...@gmail.com
>
> --
> Best Regards,
>
> Kent Fan
>


-- 
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter  | blog



Request for contributor permissions

2018-10-18 Thread Stone Huang
 JIRA ID: godisren
Cwiki ID: Ren Jr Huang

Thanks in advance!


Re: [VOTE] KIP-376: Implement AutoClosable on appropriate classes that want to be used in a try-with-resource statement

2018-10-18 Thread Matthias J. Sax
+1 (binding)

Thanks for the KIP. And thanks for bumping the thread regularly. As
2.1.0 and 2.0.1 releases are running atm, it takes some time to get
attention.

-Matthias

On 10/18/18 11:47 AM, Yishun Guan wrote:
> Bumping this thread up again, thanks!
> On Tue, Oct 16, 2018 at 11:24 AM Yishun Guan  wrote:
>>
>> Bumping this thread up again, thanks!
>>
>> On Fri, Oct 12, 2018, 4:53 PM Colin McCabe  wrote:
>>>
>>> On Fri, Oct 12, 2018, at 15:45, Yishun Guan wrote:
 Hi Colin,

 Thanks for your suggestions. I have modified the current KIP with your
 comments. However, I still think I should keep the entire list, because it
 is a good way to keep track of which class need to be change, and others
 can discuss if changes on these internal classes are necessary?
>>>
>>> Hi Yishun,
>>>
>>> I guess I don't feel that strongly about it.  If you want to keep the 
>>> internal classes in the list, that's fine.  They don't really need to be in 
>>> the KIP but it's OK if they're there.
>>>
>>> Thanks for working on this.  +1 (binding).
>>>
>>> best,
>>> Colin
>>>

 Thanks,
 Yishun

 On Fri, Oct 12, 2018 at 11:42 AM Colin McCabe  wrote:

> Hi Yishun,
>
> Thanks for looking at this.
>
> Under "proposed changes," it's not necessary to add a section where you
> demonstrate adding "implements AutoCloseable" to the code.  We know what
> adding that would look like.
>
> Can you create a full, single, list of all the classes that would be
> affected?  It's not necessary to write who suggested which classes in the
> KIP.  Also, I noticed some of the classes here are in "internals"
> packages.  Given that these are internal classes that aren't part of our
> API, it's not necessary to add them to the KIP, I think.  Since they are
> implementation details, they can be changed at any time without a KIP.
>
> The "compatibility" section should have a discussion of the fact that we
> can add the new interface without requiring any backwards-incompatible
> changes at the source or binary level.  In particular, it would be good to
> highlight that we are not renaming or changing the existing "close" 
> methods.
>
> Under "rejected alternatives" we could explain why we chose to implement
> AutoCloseable rather than Closeable.
>
> cheers,
> Colin
>
>
> On Thu, Oct 11, 2018, at 13:48, Yishun Guan wrote:
>> Hi,
>>
>> Just to bump this voting thread up again. Thanks!
>>
>> Best,
>> Yishun
>> On Fri, Oct 5, 2018 at 12:58 PM Yishun Guan  wrote:
>>>
>>> Hi,
>>>
>>> I think we have discussed this well enough to put this into a vote.
>>>
>>> Suggestions are welcome!
>>>
>>> Best,
>>> Yishun
>>>
>>> On Wed, Oct 3, 2018, 2:30 PM Yishun Guan  wrote:

 Hi All,

 I want to start a voting on this KIP:

> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=93325308

 Here is the discussion thread:

> https://lists.apache.org/thread.html/9f6394c28d3d11a67600d5d7001e8aaa318f1ad497b50645654bbe3f@%3Cdev.kafka.apache.org%3E

 Thanks,
 Yishun
>



signature.asc
Description: OpenPGP digital signature


Build failed in Jenkins: kafka-trunk-jdk8 #3149

2018-10-18 Thread Apache Jenkins Server
See 


Changes:

[lindong28] MINOR: Add a note about Zstandard compression in the upgrade docs

[lindong28] KAFKA-7464; catch exceptions in "leaderEndpoint.close()" when 
shutting

--
[...truncated 2.51 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 

[jira] [Resolved] (KAFKA-7464) Fail to shutdown ReplicaManager during broker cleaned shutdown

2018-10-18 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-7464.
-
Resolution: Fixed

> Fail to shutdown ReplicaManager during broker cleaned shutdown
> --
>
> Key: KAFKA-7464
> URL: https://issues.apache.org/jira/browse/KAFKA-7464
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Zhanxiang (Patrick) Huang
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Critical
> Fix For: 2.1.0
>
>
> In 2.0 deployment, we saw the following log when shutting down the 
> ReplicaManager in broker cleaned shutdown:
> {noformat}
> 2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null
> java.lang.IllegalArgumentException: null
> at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121]
> at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121]
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) 
> ~[?:1.8.0_121]
> at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214)
>  ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164)
>  ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.Selector.doClose(Selector.java:751) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:739) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:701) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:315) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  ~[scala-library-2.11.12.jar:?]
> at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> {noformat}
> After that, we noticed that some of the replica fetcher thread fail to 
> shutdown:
> {noformat}
> 2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] 
> [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log 
> segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches
> java.nio.channels.ClosedChannelException: null
> at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) 
> ~[?:1.8.0_121]
> at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) 
> ~[?:1.8.0_121]
> at 
> 

Build failed in Jenkins: kafka-trunk-jdk11 #43

2018-10-18 Thread Apache Jenkins Server
See 


Changes:

[lindong28] MINOR: Add a note about Zstandard compression in the upgrade docs

[lindong28] KAFKA-7464; catch exceptions in "leaderEndpoint.close()" when 
shutting

--
[...truncated 2.34 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = true] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = true] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = true] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldCloseProcessor[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldCloseProcessor[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFeedStoreFromGlobalKTable[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFeedStoreFromGlobalKTable[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCleanUpPersistentStateStoresOnClose[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCleanUpPersistentStateStoresOnClose[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowPatternNotValidForTopicNameException[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowPatternNotValidForTopicNameException[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED


Jenkins build is back to normal : kafka-2.1-jdk8 #31

2018-10-18 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-18 Thread Bridger Howell (JIRA)
Bridger Howell created KAFKA-7519:
-

 Summary: Transactional Ids Left in Pending State by 
TransactionStateManager During Transactional Id Expiration Are Unusable
 Key: KAFKA-7519
 URL: https://issues.apache.org/jira/browse/KAFKA-7519
 Project: Kafka
  Issue Type: Bug
  Components: core, producer 
Reporter: Bridger Howell
 Attachments: image-2018-10-18-13-02-22-371.png

 

After digging into a case where an exactly-once streams process was bizarrely 
unable to process incoming data, we observed the following:
 * StreamThreads stalling while creating a producer, eventually resulting in no 
consumption by that streams process. Looking into those threads, we found they 
were stuck in a loop, sending InitProducerIdRequests and always receiving back 
the retriable error CONCURRENT_TRANSACTIONS and trying again. These requests 
always had the same transactional id.
 * After changing the streams process to not use exactly-once, it was able to 
process messages with no problems.
 * Alternatively, changing the applicationId for that streams process, it was 
able to process with no problems.
 * Every hour,  every broker would fail the task `transactionalId-expiration` 
with the following error:
 ** 
{code:java}
{"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
transaction state transition to Dead while it already a pending sta
te Dead
    at 
kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
    at kafka.coordinator
.transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
    at kafka.coordinator.transaction.TransactionStateManager$$a
nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
a:151)
    at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
    at
 
kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
    at kafka.coordinator.transaction.TransactionSt
ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
ala:150)
    at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
Like.scala:234)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.Li
st.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.Li
st.map(List.scala:296)
    at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
    at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
    at scala.collection.Traversabl
eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
scala:241)
    at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
    at scala.collection.mutable.HashMap$$anon
fun$foreach$1.apply(HashMap.scala:130)
    at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
    at scala.collec
tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
    at scala.collecti
on.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
    a
t 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr
ansactionStateManager.scala:142)
    at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a
nonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140)
    at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enable
TransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140)
    at kafka.utils.CoreUtils$.inLock(CoreUtils
.scala:251)
    at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
    at kafka.coordinator.transaction.TransactionStateManager$$anon
fun$enableTransactionalIdExpiration$1.apply$mcV$sp(TransactionStateManager.scala:140)
    at 

Re: [VOTE] KIP-376: Implement AutoClosable on appropriate classes that want to be used in a try-with-resource statement

2018-10-18 Thread Yishun Guan
Bumping this thread up again, thanks!
On Tue, Oct 16, 2018 at 11:24 AM Yishun Guan  wrote:
>
> Bumping this thread up again, thanks!
>
> On Fri, Oct 12, 2018, 4:53 PM Colin McCabe  wrote:
>>
>> On Fri, Oct 12, 2018, at 15:45, Yishun Guan wrote:
>> > Hi Colin,
>> >
>> > Thanks for your suggestions. I have modified the current KIP with your
>> > comments. However, I still think I should keep the entire list, because it
>> > is a good way to keep track of which class need to be change, and others
>> > can discuss if changes on these internal classes are necessary?
>>
>> Hi Yishun,
>>
>> I guess I don't feel that strongly about it.  If you want to keep the 
>> internal classes in the list, that's fine.  They don't really need to be in 
>> the KIP but it's OK if they're there.
>>
>> Thanks for working on this.  +1 (binding).
>>
>> best,
>> Colin
>>
>> >
>> > Thanks,
>> > Yishun
>> >
>> > On Fri, Oct 12, 2018 at 11:42 AM Colin McCabe  wrote:
>> >
>> > > Hi Yishun,
>> > >
>> > > Thanks for looking at this.
>> > >
>> > > Under "proposed changes," it's not necessary to add a section where you
>> > > demonstrate adding "implements AutoCloseable" to the code.  We know what
>> > > adding that would look like.
>> > >
>> > > Can you create a full, single, list of all the classes that would be
>> > > affected?  It's not necessary to write who suggested which classes in the
>> > > KIP.  Also, I noticed some of the classes here are in "internals"
>> > > packages.  Given that these are internal classes that aren't part of our
>> > > API, it's not necessary to add them to the KIP, I think.  Since they are
>> > > implementation details, they can be changed at any time without a KIP.
>> > >
>> > > The "compatibility" section should have a discussion of the fact that we
>> > > can add the new interface without requiring any backwards-incompatible
>> > > changes at the source or binary level.  In particular, it would be good 
>> > > to
>> > > highlight that we are not renaming or changing the existing "close" 
>> > > methods.
>> > >
>> > > Under "rejected alternatives" we could explain why we chose to implement
>> > > AutoCloseable rather than Closeable.
>> > >
>> > > cheers,
>> > > Colin
>> > >
>> > >
>> > > On Thu, Oct 11, 2018, at 13:48, Yishun Guan wrote:
>> > > > Hi,
>> > > >
>> > > > Just to bump this voting thread up again. Thanks!
>> > > >
>> > > > Best,
>> > > > Yishun
>> > > > On Fri, Oct 5, 2018 at 12:58 PM Yishun Guan  wrote:
>> > > > >
>> > > > > Hi,
>> > > > >
>> > > > > I think we have discussed this well enough to put this into a vote.
>> > > > >
>> > > > > Suggestions are welcome!
>> > > > >
>> > > > > Best,
>> > > > > Yishun
>> > > > >
>> > > > > On Wed, Oct 3, 2018, 2:30 PM Yishun Guan  wrote:
>> > > > >>
>> > > > >> Hi All,
>> > > > >>
>> > > > >> I want to start a voting on this KIP:
>> > > > >>
>> > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=93325308
>> > > > >>
>> > > > >> Here is the discussion thread:
>> > > > >>
>> > > https://lists.apache.org/thread.html/9f6394c28d3d11a67600d5d7001e8aaa318f1ad497b50645654bbe3f@%3Cdev.kafka.apache.org%3E
>> > > > >>
>> > > > >> Thanks,
>> > > > >> Yishun
>> > >


Re: Throwing away prefetched records optimisation.

2018-10-18 Thread Jan Filipiak
The idea for you would be that Messagechooser could hang on to the 
prefetched messages.

ccing cmcc...@apache.org

@Collin
just for you to see that MessageChooser is a powerfull abstraction.

:)

Best jan

On 18.10.2018 13:59, Zahari Dichev wrote:
> Jan,
>
> Quite insightful indeed. I think your propositions are valid.
>
> Ryanne,
>
> I understand that consumers are using a pull model... And yes, indeed if a
> consumer is not ready for more records it surely should not call poll.
> Except that it needs to do so periodically in order to indicate that its
> live. Forget about the "backpressure", I guess I was wrong with phrasing
> this so lets not get caught up on it.
>
> You say pause/resume can be used to prioritise certain topics/partitions
> over others. And indeed this is the case. So instead of thinking about it
> in terms of backpressure, lets put it in a different way. The Akka streams
> connector would like to prioritise certain topics over others, using once
> consumer instance. On top of that, add the detail that the priorities
> change quite frequently (which translates to calling pause/resume
> frequently). So all that being said, what would be a proper way to handle
> the situation without throwing the pre-fetched records away when calling
> poll on a consumer that happens to have a topic that was recently paused
> (and that might be un-paused soon )? Am I the only one who considers that
> an actual problem with the use os pause/resume ? Not sure how to explain
> the situation in a better way..
>
> Zahari
>
>
> On Thu, Oct 18, 2018 at 9:46 AM Zahari Dichev 
> wrote:
>
>> Thanks a lot Jan,
>>
>> I will read it.
>>
>> Zahari
>>
>> On Thu, Oct 18, 2018 at 9:31 AM Jan Filipiak 
>> wrote:
>>
>>> especially my suggestions ;)
>>>
>>> On 18.10.2018 08:30, Jan Filipiak wrote:
 Hi Zahari,

 would you be willing to scan through the KIP-349 discussion a little?
 I think it has suggestions that could be interesting for you

 Best Jan

 On 16.10.2018 09:29, Zahari Dichev wrote:
> Hi there Kafka developers,
>
> I am currently trying to find a solution to an issue that has been
> manifesting itself in the Akka streams implementation of the Kafka
> connector. When it comes to consuming messages, the implementation
>>> relies
> heavily on the fact that we can pause and resume partitions. In some
> situations when a single consumer instance is shared among several
> streams,
> we might end up with frequently pausing and unpausing a set of topic
> partitions, which is the main facility that allows us to implement back
> pressure. This however has certain disadvantages, especially when
> there are
> two consumers that differ in terms of processing speed.
>
> To articulate the issue more clearly, imagine that a consumer maintains
> assignments for two topic partitions *TP1* and *TP2*. This consumer is
> shared by two streams - S1 and S2. So effectively when we have demand
> from
> only one of the streams - *S1*, we will pause one of the topic
>>> partitions
> *TP2* and call *poll()* on the consumer to only retrieve the records
>>> for
> the demanded topic partition - *TP1*. The result of that is all the
> records
> that have been prefetched for *TP2* are now thrown away by the fetcher
> ("*Not
> returning fetched records for assigned partition TP2 since it is no
> longer
> fetchable"*). If we extrapolate that to multiple streams sharing the
>>> same
> consumer, we might quickly end up in a situation where we throw
> prefetched
> data quite often. This does not seem like the most efficient approach
>>> and
> in fact produces quite a lot of overlapping fetch requests as
>>> illustrated
> in the following issue:
>
> https://github.com/akka/alpakka-kafka/issues/549
>
> I am writing this email to get some initial opinion on a KIP I was
> thinking
> about. What if we give the clients of the Consumer API a bit more
>>> control
> of what to do with this prefetched data. Two options I am wondering
> about:
>
> 1. Introduce a configuration setting, such as*
> "return-prefetched-data-for-paused-topic-partitions = false"* (have to
> think of a better name), which when set to true will return what is
> prefetched instead of throwing it away on calling *poll()*. Since this
>>> is
> amount of data that is bounded by the maximum size of the prefetch, we
> can
> control what is the most amount of records returned. The client of the
> consumer API can then be responsible for keeping that data around and
>>> use
> it when appropriate (i.e. when demand is present)
>
> 2. Introduce a facility to pass in a buffer into which the prefetched
> records are drained when poll is called and paused partitions have some
> prefetched records.
>
> Any opinions on the matter are welcome. Thanks a 

Re: [VOTE] KIP-349 Priorities for Source Topics

2018-10-18 Thread nick


> On Oct 12, 2018, at 5:06 PM, Colin McCabe  wrote:
> 
> Maybe there's some really cool use-case that I haven't thought of.  But so 
> far I can't really think of any time I would need topic priorities if I was 
> muting topics and offloading blocking operations in a reasonable way.  It 
> would be good to identify use-cases 


Hi Colin,

How about the use-case where there are multiple streams/topics, and the intent 
is to have a single consumer interleave the messages so that higher priority 
messages are processed first ?
That seems to be what the reporter of the associated Jira ticket

   https://issues.apache.org/jira/browse/KAFKA-6690 


has identified as a use-case he frequently encounters.  I’ve asked him to 
elaborate on the dev list though he has not responded yet.

Best,
--
  Nick





Jenkins build is back to normal : kafka-trunk-jdk8 #3147

2018-10-18 Thread Apache Jenkins Server
See 




RE: [DISCUSS] KIP-383 Pluggable interface for SSL Factory

2018-10-18 Thread Pellerin, Clement
I guess it could but unless I'm mistaken, you have the same problem as before 
since it is not pluggable.
The idea is you should be able to replace the implementation through 
configuration, without rebuilding a custom Kafka distribution or resorting to 
classpath tricks to shadow Kafka classes.
I will make that goal explicit in the KIP.

If I could convince Kafka to create my subclass of SslChannelBuilder or 
SaslChannelBuilder when building a channel, that would get me pretty far for 
what I'm trying to do.
I would simply need to move the call to the SslFactory constructor to its own 
method so I could override that method to create my own subclass of SslFactory.
Please enlighten me if that's already possible while respecting the goal above.

I concentrated the solution towards SslFactory since that's what the Jira was 
asking for.
I like the smaller scope of the proposed solution because it solves a problem 
reported by many people.
I am not sure people are clamoring to have a pluggable ChannelBuilder interface.
Finally, the proposed solution gets reused twice in SslChannelBuilder and 
SaslChannelBuilder for that part of the protocol they share.

-Original Message-
From: Harsha [mailto:ka...@harsha.io] 
Sent: Thursday, October 18, 2018 10:58 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-383 Pluggable interface for SSL Factory

Hi,
  Thanks for the KIP. Curious to understand why the ChannelBuilder 
interface doesn't solve the stated reasons in Motiviation section.

Thanks,
Harsha


On Wed, Oct 17, 2018, at 12:10 PM, Pellerin, Clement wrote:
> I would like feedback on this proposal to make it possible to replace 
> SslFactory with a custom implementation.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-383%3A++Pluggable+interface+for+SSL+Factory
> 


Re: [DISCUSS] KIP-382: MirrorMaker 2.0

2018-10-18 Thread Ryanne Dolan
Harsha, concerning uReplicator specifically, the project is a major
inspiration for MM2, but I don't think it is a good foundation for anything
included in Apache Kafka. uReplicator uses Helix to solve problems that
Connect also solves, e.g. REST API, live configuration changes, cluster
management, coordination etc. This also means that existing tooling,
dashboards etc that work with Connectors do not work with uReplicator, and
any future tooling would need to treat uReplicator as a special case.

Ryanne

On Wed, Oct 17, 2018 at 12:30 PM Ryanne Dolan  wrote:

> Harsha, yes I can do that. I'll update the KIP accordingly, thanks.
>
> Ryanne
>
> On Wed, Oct 17, 2018 at 12:18 PM Harsha  wrote:
>
>> Hi Ryanne,
>>Thanks for the KIP. I am also curious about why not use
>> the uReplicator design as the foundation given it alreadys resolves some of
>> the fundamental issues in current MIrrorMaker, updating the confifgs on the
>> fly and running the mirror maker agents in a worker model which can
>> deployed in mesos or container orchestrations.  If possible can you
>> document in the rejected alternatives what are missing parts that made you
>> to consider a new design from ground up.
>>
>> Thanks,
>> Harsha
>>
>> On Wed, Oct 17, 2018, at 8:34 AM, Ryanne Dolan wrote:
>> > Jan, these are two separate issues.
>> >
>> > 1) consumer coordination should not, ideally, involve unreliable or slow
>> > connections. Naively, a KafkaSourceConnector would coordinate via the
>> > source cluster. We can do better than this, but I'm deferring this
>> > optimization for now.
>> >
>> > 2) exactly-once between two clusters is mind-bending. But keep in mind
>> that
>> > transactions are managed by the producer, not the consumer. In fact,
>> it's
>> > the producer that requests that offsets be committed for the current
>> > transaction. Obviously, these offsets are committed in whatever cluster
>> the
>> > producer is sending to.
>> >
>> > These two issues are closely related. They are both resolved by not
>> > coordinating or committing via the source cluster. And in fact, this is
>> the
>> > general model of SourceConnectors anyway, since most SourceConnectors
>> > _only_ have a destination cluster.
>> >
>> > If there is a lot of interest here, I can expound further on this
>> aspect of
>> > MM2, but again I think this is premature until this first KIP is
>> approved.
>> > I intend to address each of these in separate KIPs following this one.
>> >
>> > Ryanne
>> >
>> > On Wed, Oct 17, 2018 at 7:09 AM Jan Filipiak 
>> > wrote:
>> >
>> > > This is not a performance optimisation. Its a fundamental design
>> choice.
>> > >
>> > >
>> > > I never really took a look how streams does exactly once. (its a trap
>> > > anyways and you usually can deal with at least once donwstream pretty
>> > > easy). But I am very certain its not gonna get somewhere if offset
>> > > commit and record produce cluster are not the same.
>> > >
>> > > Pretty sure without this _design choice_ you can skip on that exactly
>> > > once already
>> > >
>> > > Best Jan
>> > >
>> > > On 16.10.2018 18:16, Ryanne Dolan wrote:
>> > > >  >  But one big obstacle in this was
>> > > > always that group coordination happened on the source cluster.
>> > > >
>> > > > Jan, thank you for bringing up this issue with legacy MirrorMaker. I
>> > > > totally agree with you. This is one of several problems with
>> MirrorMaker
>> > > > I intend to solve in MM2, and I already have a design and prototype
>> that
>> > > > solves this and related issues. But as you pointed out, this KIP is
>> > > > already rather complex, and I want to focus on the core feature set
>> > > > rather than performance optimizations for now. If we can agree on
>> what
>> > > > MM2 looks like, it will be very easy to agree to improve its
>> performance
>> > > > and reliability.
>> > > >
>> > > > That said, I look forward to your support on a subsequent KIP that
>> > > > addresses consumer coordination and rebalance issues. Stay tuned!
>> > > >
>> > > > Ryanne
>> > > >
>> > > > On Tue, Oct 16, 2018 at 6:58 AM Jan Filipiak <
>> jan.filip...@trivago.com
>> > > > > wrote:
>> > > >
>> > > > Hi,
>> > > >
>> > > > Currently MirrorMaker is usually run collocated with the target
>> > > > cluster.
>> > > > This is all nice and good. But one big obstacle in this was
>> > > > always that group coordination happened on the source cluster.
>> So
>> > > when
>> > > > then network was congested, you sometimes loose group
>> membership and
>> > > > have to rebalance and all this.
>> > > >
>> > > > So one big request from we would be the support of having
>> > > coordination
>> > > > cluster != source cluster.
>> > > >
>> > > > I would generally say a LAN is better than a WAN for doing group
>> > > > coordinaton and there is no reason we couldn't have a group
>> consuming
>> > > > topics from a different cluster and committing offsets to

Re: [DISCUSS] KIP-383 Pluggable interface for SSL Factory

2018-10-18 Thread Harsha
Hi,
  Thanks for the KIP. Curious to understand why the ChannelBuilder 
interface doesn't solve the stated reasons in Motiviation section.

Thanks,
Harsha


On Wed, Oct 17, 2018, at 12:10 PM, Pellerin, Clement wrote:
> I would like feedback on this proposal to make it possible to replace 
> SslFactory with a custom implementation.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-383%3A++Pluggable+interface+for+SSL+Factory
> 


Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

2018-10-18 Thread Ryanne Dolan
Per Steffenson, getting sequence numbers correct is definitely difficult,
but this is not Connect's fault. I'd like to see Connect implement
exactly-once from end-to-end, but that requires coordination between
sources and sinks along the lines that you allude to, using sequence
numbers and transactions and whatnot.

The problem with commit() is knowing when it's okay to delete the files in
your example. I don't believe that issue has anything to do with avoiding
dupes or assigning unique sequence numbers. I believe it is safe to delete
a file if you know it has been delivered successfully, which the present
API exposes.

That said, I'm not opposed to your proposed callbacks, and I agree that
commit() and commitRecord() are poorly named. I just don't believe the
present API is incorrect.

Ryanne



On Thu, Oct 18, 2018 at 7:04 AM Per Steffensen  wrote:

> On 17/10/2018 18.17, Ryanne Dolan wrote:
>
> > this does not guarantee that the
> > offsets of R have been written/flushed at the next commit() call
>
> True, but does it matter? So long as you can guarantee the records are
> delivered to the downstream Kafka cluster, it shouldn't matter if they have
> been committed or not.
>
> The worst that can happen is that the worker gets bounced and asks for the
> same records a second time. Even if those records have since been dropped
> from the upstream data source, it doesn't matter cuz you know they were
> previously delivered successfully.
>
> You are kinda arguing that offsets are not usable at all. I think they
> are. Below I will explain a fairly simple source-connector, and how it
> would be mislead by the way source-connector-framework currently works, and
> how my fix would help it not be. The source-connector is picked out of blue
> air, but not too far from what I have had to deal with in real life
>
> Lets assume I write a fairly simple source-connector, that picks up data
> from files in a given folder. For simplicity lets just assume that each
> file fits in a Kafka-message. My source-connector just sorts the files by
> timestamp and sends out the data in the files, oldest file first. It is
> possible that the receiving side of the data my source-connector sends out,
> will get the same data twice, for one of the following reasons
> * There were actually two input-files that contained exactly the same data
> (in that case the receiving side should handle it twice)
> * The data from that same file may be sent twice in two Kafka-messages,
> due to global atomicy being impossible (in that case the receiving side
> should only handle the data once)
> I order to allow the receiving side to know, when two consecutive messages
> are essentially the same, so that it will know only to handle one of them,
> I introduce a simple sequence-numbering system in my source-connector. I
> simply write a sequence-number in the Kafka-messages, and I use
> Kafka-connect offsets to keep track of the next sequence-number to be used,
> so that I can pick up with the correct sequence-number in case of a
> crash/restart. If there is no offsets when the source-connector starts
> (first start) it will just start with sequence-number 1.
>
> *Assume the following files are in the input-folder:*
> * 2018-01-01_10_00_00-.data
> * 2018-01-01_10_00_00-.data
> * 2018-01-01_10_00_01-.data
> * 2018-01-01_10_00_02-.data
> …
>
> *Now this sequence of events are possible*
> * mySourceConnector.poll() —> [
>   R1 = record({seq: 1, data= 2018-01-01_10_00_00-.data>},{ nextSeq=2 }},
>   R2 = record({seq: 2, data= 2018-01-01_10_00_00-.data>},{ nextSeq=3 }}
> ]
> * data of R1 was sent and acknowledged
> * mySourceConnector.commitRecord(R1)
> * data of R2 was sent and acknowledged
> * mySourceConnector.commitRecord(R2)
> * offsets-committer kicks in around here and picks up the offsets from R1
> and R2, resulting in the merged offsets to written and flushed to be {
> nextSeq=3 }
> * mySourceConnector.poll() —> [
>   R3 = record({seq: 3, data= 2018-01-01_10_00_01-.data>},{ nextSeq=4 }}
> ]
> * data of R3 was sent and acknowledged
> * mySourceConnector.commitRecord(R3)
> * offsets-committer finishes writing and flushing offsets { nextSeq=3 }
> * mySourceConnector.commit()
>
> In mySourceConnector.commit() implementation I believe that the data and
> offsets for R1, R2 and R3 has been sent/written/flushed/acknowledged, and
> therefore I delete the following files
> * 2018-01-01_10_00_00-.data
> * 2018-01-01_10_00_00-.data
> * 2018-01-01_10_00_01-.data
> But the truth is that data for R1, R2 and R3 has been sent with
> sequence-number 1, 2 and 3 respectively, but the flushed offsets says {
> nextSeq=3 }, and not { nextSeq=4 } which I would indirectly expect
> If the system crashes here, upon restart I will get { nextSeq=3 }, but
> file containing the data supposed to get sequence-number 3 has already been
> deleted. Therefore I will end up with this next poll
> * poll() —> [
>   R4 = record({seq: 3, data=.data},{
> nextSeq=4 }}
> ]
> If my system 

[jira] [Resolved] (KAFKA-7505) Flaky test: SslTransportLayerTest.testListenerConfigOverride

2018-10-18 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-7505.
---
   Resolution: Fixed
 Reviewer: Ismael Juma
Fix Version/s: 2.1.0

> Flaky test: SslTransportLayerTest.testListenerConfigOverride
> 
>
> Key: KAFKA-7505
> URL: https://issues.apache.org/jira/browse/KAFKA-7505
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.1.0
>
>
> This test seems to fail quite a bit recently. I've seen it happen with Java 
> 11 quite a bit so it could be more likely to fail there.
> {code:java}
> java.lang.AssertionError: expected: but 
> was: at org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.failNotEquals(Assert.java:834) at 
> org.junit.Assert.assertEquals(Assert.java:118) at 
> org.junit.Assert.assertEquals(Assert.java:144) at 
> org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:104)
>  at 
> org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:109)
>  at 
> org.apache.kafka.common.network.SslTransportLayerTest.testListenerConfigOverride(SslTransportLayerTest.java:319){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Request for contributor permissions

2018-10-18 Thread Kent
JIRA ID: cjfan2...@gmail.com
Cwiki ID: cjfan2...@gmail.com

-- 
Best Regards,

Kent Fan


Re: Throwing away prefetched records optimisation.

2018-10-18 Thread Zahari Dichev
Thanks Ryanne,
I am glad it makes sense. Should I put a KIP together and call for
discussion on it ? Its my first KIP, so have not quite locked in the
process yet.

Zahari

On Thu, Oct 18, 2018 at 3:26 PM Ryanne Dolan  wrote:

> Zahari, that makes sense, thanks for reframing your question. I suspect
> that pause/resume was not intended to be called at high frequency like
> that, but I agree with you that the current behavior is needlessly
> inefficient. I like your idea of making it configurable.
>
> Ryanne
>
> On Thu, Oct 18, 2018, 6:59 AM Zahari Dichev 
> wrote:
>
> > Jan,
> >
> > Quite insightful indeed. I think your propositions are valid.
> >
> > Ryanne,
> >
> > I understand that consumers are using a pull model... And yes, indeed if
> a
> > consumer is not ready for more records it surely should not call poll.
> > Except that it needs to do so periodically in order to indicate that its
> > live. Forget about the "backpressure", I guess I was wrong with phrasing
> > this so lets not get caught up on it.
> >
> > You say pause/resume can be used to prioritise certain topics/partitions
> > over others. And indeed this is the case. So instead of thinking about it
> > in terms of backpressure, lets put it in a different way. The Akka
> streams
> > connector would like to prioritise certain topics over others, using once
> > consumer instance. On top of that, add the detail that the priorities
> > change quite frequently (which translates to calling pause/resume
> > frequently). So all that being said, what would be a proper way to handle
> > the situation without throwing the pre-fetched records away when calling
> > poll on a consumer that happens to have a topic that was recently paused
> > (and that might be un-paused soon )? Am I the only one who considers that
> > an actual problem with the use os pause/resume ? Not sure how to explain
> > the situation in a better way..
> >
> > Zahari
> >
> >
> > On Thu, Oct 18, 2018 at 9:46 AM Zahari Dichev 
> > wrote:
> >
> > > Thanks a lot Jan,
> > >
> > > I will read it.
> > >
> > > Zahari
> > >
> > > On Thu, Oct 18, 2018 at 9:31 AM Jan Filipiak  >
> > > wrote:
> > >
> > >> especially my suggestions ;)
> > >>
> > >> On 18.10.2018 08:30, Jan Filipiak wrote:
> > >> > Hi Zahari,
> > >> >
> > >> > would you be willing to scan through the KIP-349 discussion a
> little?
> > >> > I think it has suggestions that could be interesting for you
> > >> >
> > >> > Best Jan
> > >> >
> > >> > On 16.10.2018 09:29, Zahari Dichev wrote:
> > >> >> Hi there Kafka developers,
> > >> >>
> > >> >> I am currently trying to find a solution to an issue that has been
> > >> >> manifesting itself in the Akka streams implementation of the Kafka
> > >> >> connector. When it comes to consuming messages, the implementation
> > >> relies
> > >> >> heavily on the fact that we can pause and resume partitions. In
> some
> > >> >> situations when a single consumer instance is shared among several
> > >> >> streams,
> > >> >> we might end up with frequently pausing and unpausing a set of
> topic
> > >> >> partitions, which is the main facility that allows us to implement
> > back
> > >> >> pressure. This however has certain disadvantages, especially when
> > >> >> there are
> > >> >> two consumers that differ in terms of processing speed.
> > >> >>
> > >> >> To articulate the issue more clearly, imagine that a consumer
> > maintains
> > >> >> assignments for two topic partitions *TP1* and *TP2*. This consumer
> > is
> > >> >> shared by two streams - S1 and S2. So effectively when we have
> demand
> > >> >> from
> > >> >> only one of the streams - *S1*, we will pause one of the topic
> > >> partitions
> > >> >> *TP2* and call *poll()* on the consumer to only retrieve the
> records
> > >> for
> > >> >> the demanded topic partition - *TP1*. The result of that is all the
> > >> >> records
> > >> >> that have been prefetched for *TP2* are now thrown away by the
> > fetcher
> > >> >> ("*Not
> > >> >> returning fetched records for assigned partition TP2 since it is no
> > >> >> longer
> > >> >> fetchable"*). If we extrapolate that to multiple streams sharing
> the
> > >> same
> > >> >> consumer, we might quickly end up in a situation where we throw
> > >> >> prefetched
> > >> >> data quite often. This does not seem like the most efficient
> approach
> > >> and
> > >> >> in fact produces quite a lot of overlapping fetch requests as
> > >> illustrated
> > >> >> in the following issue:
> > >> >>
> > >> >> https://github.com/akka/alpakka-kafka/issues/549
> > >> >>
> > >> >> I am writing this email to get some initial opinion on a KIP I was
> > >> >> thinking
> > >> >> about. What if we give the clients of the Consumer API a bit more
> > >> control
> > >> >> of what to do with this prefetched data. Two options I am wondering
> > >> >> about:
> > >> >>
> > >> >> 1. Introduce a configuration setting, such as*
> > >> >> "return-prefetched-data-for-paused-topic-partitions = false"* (have
> > to
> > >> >> think of a 

Re: [DISCUSS] KIP-382: MirrorMaker 2.0

2018-10-18 Thread Ryanne Dolan
Jan, thanks for the share. Also similar are Pulsar's concepts of namespaces
and global topics. I don't think these need to be supported in Kafka
itself, but there are many benefits to adopting naming conventions along
these lines, esp for tooling, dashboards etc.

> use it to copy my messages from A to B

MM2 is a drop-in replacement for MirrorMaker and allows the new features to
be disabled if you just want to blindly copy records.

Ryanne

On Thu, Oct 18, 2018, 2:28 AM Jan Filipiak  wrote:

> then I just hope that in the midsts of all this new features I can still
> at least use it to copy my messages from A to B later.
>
> Another hint you should be aware of:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics
>
> That was always a design I admired, with active / active replications
> and stuff, It feels like we are going away from this another step.
>
> On 17.10.2018 17:34, Ryanne Dolan wrote:
> > Jan, these are two separate issues.
> >
> > 1) consumer coordination should not, ideally, involve unreliable or slow
> > connections. Naively, a KafkaSourceConnector would coordinate via the
> > source cluster. We can do better than this, but I'm deferring this
> > optimization for now.
> >
> > 2) exactly-once between two clusters is mind-bending. But keep in mind
> > that transactions are managed by the producer, not the consumer. In
> > fact, it's the producer that requests that offsets be committed for the
> > current transaction. Obviously, these offsets are committed in whatever
> > cluster the producer is sending to.
> >
> > These two issues are closely related. They are both resolved by not
> > coordinating or committing via the source cluster. And in fact, this is
> > the general model of SourceConnectors anyway, since most
> > SourceConnectors _only_ have a destination cluster.
> >
> > If there is a lot of interest here, I can expound further on this aspect
> > of MM2, but again I think this is premature until this first KIP is
> > approved. I intend to address each of these in separate KIPs following
> > this one.
> >
> > Ryanne
> >
> > On Wed, Oct 17, 2018 at 7:09 AM Jan Filipiak  > > wrote:
> >
> > This is not a performance optimisation. Its a fundamental design
> choice.
> >
> >
> > I never really took a look how streams does exactly once. (its a trap
> > anyways and you usually can deal with at least once donwstream pretty
> > easy). But I am very certain its not gonna get somewhere if offset
> > commit and record produce cluster are not the same.
> >
> > Pretty sure without this _design choice_ you can skip on that exactly
> > once already
> >
> > Best Jan
> >
> > On 16.10.2018 18:16, Ryanne Dolan wrote:
> >  >  >  But one big obstacle in this was
> >  > always that group coordination happened on the source cluster.
> >  >
> >  > Jan, thank you for bringing up this issue with legacy
> MirrorMaker. I
> >  > totally agree with you. This is one of several problems with
> > MirrorMaker
> >  > I intend to solve in MM2, and I already have a design and
> > prototype that
> >  > solves this and related issues. But as you pointed out, this KIP
> is
> >  > already rather complex, and I want to focus on the core feature
> set
> >  > rather than performance optimizations for now. If we can agree on
> > what
> >  > MM2 looks like, it will be very easy to agree to improve its
> > performance
> >  > and reliability.
> >  >
> >  > That said, I look forward to your support on a subsequent KIP that
> >  > addresses consumer coordination and rebalance issues. Stay tuned!
> >  >
> >  > Ryanne
> >  >
> >  > On Tue, Oct 16, 2018 at 6:58 AM Jan Filipiak
> > mailto:jan.filip...@trivago.com>
> >  >  > >> wrote:
> >  >
> >  > Hi,
> >  >
> >  > Currently MirrorMaker is usually run collocated with the
> target
> >  > cluster.
> >  > This is all nice and good. But one big obstacle in this was
> >  > always that group coordination happened on the source
> > cluster. So when
> >  > then network was congested, you sometimes loose group
> > membership and
> >  > have to rebalance and all this.
> >  >
> >  > So one big request from we would be the support of having
> > coordination
> >  > cluster != source cluster.
> >  >
> >  > I would generally say a LAN is better than a WAN for doing
> group
> >  > coordinaton and there is no reason we couldn't have a group
> > consuming
> >  > topics from a different cluster and committing offsets to
> another
> >  > one right?
> >  >
> >  > Other than that. It feels like the KIP has too much features
> > where many
> >  > of them are not really wanted and counter productive but 

Re: Throwing away prefetched records optimisation.

2018-10-18 Thread Ryanne Dolan
Zahari, that makes sense, thanks for reframing your question. I suspect
that pause/resume was not intended to be called at high frequency like
that, but I agree with you that the current behavior is needlessly
inefficient. I like your idea of making it configurable.

Ryanne

On Thu, Oct 18, 2018, 6:59 AM Zahari Dichev  wrote:

> Jan,
>
> Quite insightful indeed. I think your propositions are valid.
>
> Ryanne,
>
> I understand that consumers are using a pull model... And yes, indeed if a
> consumer is not ready for more records it surely should not call poll.
> Except that it needs to do so periodically in order to indicate that its
> live. Forget about the "backpressure", I guess I was wrong with phrasing
> this so lets not get caught up on it.
>
> You say pause/resume can be used to prioritise certain topics/partitions
> over others. And indeed this is the case. So instead of thinking about it
> in terms of backpressure, lets put it in a different way. The Akka streams
> connector would like to prioritise certain topics over others, using once
> consumer instance. On top of that, add the detail that the priorities
> change quite frequently (which translates to calling pause/resume
> frequently). So all that being said, what would be a proper way to handle
> the situation without throwing the pre-fetched records away when calling
> poll on a consumer that happens to have a topic that was recently paused
> (and that might be un-paused soon )? Am I the only one who considers that
> an actual problem with the use os pause/resume ? Not sure how to explain
> the situation in a better way..
>
> Zahari
>
>
> On Thu, Oct 18, 2018 at 9:46 AM Zahari Dichev 
> wrote:
>
> > Thanks a lot Jan,
> >
> > I will read it.
> >
> > Zahari
> >
> > On Thu, Oct 18, 2018 at 9:31 AM Jan Filipiak 
> > wrote:
> >
> >> especially my suggestions ;)
> >>
> >> On 18.10.2018 08:30, Jan Filipiak wrote:
> >> > Hi Zahari,
> >> >
> >> > would you be willing to scan through the KIP-349 discussion a little?
> >> > I think it has suggestions that could be interesting for you
> >> >
> >> > Best Jan
> >> >
> >> > On 16.10.2018 09:29, Zahari Dichev wrote:
> >> >> Hi there Kafka developers,
> >> >>
> >> >> I am currently trying to find a solution to an issue that has been
> >> >> manifesting itself in the Akka streams implementation of the Kafka
> >> >> connector. When it comes to consuming messages, the implementation
> >> relies
> >> >> heavily on the fact that we can pause and resume partitions. In some
> >> >> situations when a single consumer instance is shared among several
> >> >> streams,
> >> >> we might end up with frequently pausing and unpausing a set of topic
> >> >> partitions, which is the main facility that allows us to implement
> back
> >> >> pressure. This however has certain disadvantages, especially when
> >> >> there are
> >> >> two consumers that differ in terms of processing speed.
> >> >>
> >> >> To articulate the issue more clearly, imagine that a consumer
> maintains
> >> >> assignments for two topic partitions *TP1* and *TP2*. This consumer
> is
> >> >> shared by two streams - S1 and S2. So effectively when we have demand
> >> >> from
> >> >> only one of the streams - *S1*, we will pause one of the topic
> >> partitions
> >> >> *TP2* and call *poll()* on the consumer to only retrieve the records
> >> for
> >> >> the demanded topic partition - *TP1*. The result of that is all the
> >> >> records
> >> >> that have been prefetched for *TP2* are now thrown away by the
> fetcher
> >> >> ("*Not
> >> >> returning fetched records for assigned partition TP2 since it is no
> >> >> longer
> >> >> fetchable"*). If we extrapolate that to multiple streams sharing the
> >> same
> >> >> consumer, we might quickly end up in a situation where we throw
> >> >> prefetched
> >> >> data quite often. This does not seem like the most efficient approach
> >> and
> >> >> in fact produces quite a lot of overlapping fetch requests as
> >> illustrated
> >> >> in the following issue:
> >> >>
> >> >> https://github.com/akka/alpakka-kafka/issues/549
> >> >>
> >> >> I am writing this email to get some initial opinion on a KIP I was
> >> >> thinking
> >> >> about. What if we give the clients of the Consumer API a bit more
> >> control
> >> >> of what to do with this prefetched data. Two options I am wondering
> >> >> about:
> >> >>
> >> >> 1. Introduce a configuration setting, such as*
> >> >> "return-prefetched-data-for-paused-topic-partitions = false"* (have
> to
> >> >> think of a better name), which when set to true will return what is
> >> >> prefetched instead of throwing it away on calling *poll()*. Since
> this
> >> is
> >> >> amount of data that is bounded by the maximum size of the prefetch,
> we
> >> >> can
> >> >> control what is the most amount of records returned. The client of
> the
> >> >> consumer API can then be responsible for keeping that data around and
> >> use
> >> >> it when appropriate (i.e. when demand is present)
> >> >>
> >> 

Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

2018-10-18 Thread Per Steffensen

On 17/10/2018 18.17, Ryanne Dolan wrote:

> this does not guarantee that the
> offsets of R have been written/flushed at the next commit() call

True, but does it matter? So long as you can guarantee the records are 
delivered to the downstream Kafka cluster, it shouldn't matter if they 
have been committed or not.


The worst that can happen is that the worker gets bounced and asks for 
the same records a second time. Even if those records have since been 
dropped from the upstream data source, it doesn't matter cuz you know 
they were previously delivered successfully.


You are kinda arguing that offsets are not usable at all. I think they 
are. Below I will explain a fairly simple source-connector, and how it 
would be mislead by the way source-connector-framework currently works, 
and how my fix would help it not be. The source-connector is picked out 
of blue air, but not too far from what I have had to deal with in real life


Lets assume I write a fairly simple source-connector, that picks up data 
from files in a given folder. For simplicity lets just assume that each 
file fits in a Kafka-message. My source-connector just sorts the files 
by timestamp and sends out the data in the files, oldest file first. It 
is possible that the receiving side of the data my source-connector 
sends out, will get the same data twice, for one of the following reasons
* There were actually two input-files that contained exactly the same 
data (in that case the receiving side should handle it twice)
* The data from that same file may be sent twice in two Kafka-messages, 
due to global atomicy being impossible (in that case the receiving side 
should only handle the data once)
I order to allow the receiving side to know, when two consecutive 
messages are essentially the same, so that it will know only to handle 
one of them, I introduce a simple sequence-numbering system in my 
source-connector. I simply write a sequence-number in the 
Kafka-messages, and I use Kafka-connect offsets to keep track of the 
next sequence-number to be used, so that I can pick up with the correct 
sequence-number in case of a crash/restart. If there is no offsets when 
the source-connector starts (first start) it will just start with 
sequence-number 1.


*Assume the following files are in the input-folder:*
* 2018-01-01_10_00_00-.data
* 2018-01-01_10_00_00-.data
* 2018-01-01_10_00_01-.data
* 2018-01-01_10_00_02-.data
…

*Now this sequence of events are possible*
* mySourceConnector.poll() —> [
  R1 = record({seq: 1, data=2018-01-01_10_00_00-.data>},{ nextSeq=2 }},
  R2 = record({seq: 2, data=2018-01-01_10_00_00-.data>},{ nextSeq=3 }}

]
* data of R1 was sent and acknowledged
* mySourceConnector.commitRecord(R1)
* data of R2 was sent and acknowledged
* mySourceConnector.commitRecord(R2)
* offsets-committer kicks in around here and picks up the offsets from 
R1 and R2, resulting in the merged offsets to written and flushed to be 
{ nextSeq=3 }

* mySourceConnector.poll() —> [
  R3 = record({seq: 3, data=2018-01-01_10_00_01-.data>},{ nextSeq=4 }}

]
* data of R3 was sent and acknowledged
* mySourceConnector.commitRecord(R3)
* offsets-committer finishes writing and flushing offsets { nextSeq=3 }
* mySourceConnector.commit()

In mySourceConnector.commit() implementation I believe that the data and 
offsets for R1, R2 and R3 has been sent/written/flushed/acknowledged, 
and therefore I delete the following files

* 2018-01-01_10_00_00-.data
* 2018-01-01_10_00_00-.data
* 2018-01-01_10_00_01-.data
But the truth is that data for R1, R2 and R3 has been sent with 
sequence-number 1, 2 and 3 respectively, but the flushed offsets says { 
nextSeq=3 }, and not { nextSeq=4 } which I would indirectly expect
If the system crashes here, upon restart I will get { nextSeq=3 }, but 
file containing the data supposed to get sequence-number 3 has already 
been deleted. Therefore I will end up with this next poll

* poll() —> [
  R4 = record({seq: 3, data=2018-01-01_10_00_02-.data},{ nextSeq=4 }}

]
If my system had worked I should have ended up with this next poll
* poll() —> [
  R4 = record({seq: 4, data=2018-01-01_10_00_02-.data},{ nextSeq=5 }}

]
The receiving side of my data will get two messages containing the same 
sequence-number 3. It will therefore incorrectly ignore the second 
message. Even if it double check by looking at the actual data of the 
two message, and If the content of 2018-01-01_10_00_01-.data and 2018-01-01_10_00_02-.data was actually identical, it has no way 
of figuring out to do the right thing (actually handle both messages)


*With my fix to the problem*, the call to commit() would have been
mySourceConnector.commit([R1, R2])
I would know only to delete the following files
* 2018-01-01_10_00_00-.data
* 2018-01-01_10_00_00-.data
And after crash/restart I would end up sending the correct next message
mySourceConnector.poll() —> [
  R3 = record({seq: 3, data=2018-01-01_10_00_01-.data>},{ nextSeq=4 }}

]



Re: Throwing away prefetched records optimisation.

2018-10-18 Thread Zahari Dichev
Jan,

Quite insightful indeed. I think your propositions are valid.

Ryanne,

I understand that consumers are using a pull model... And yes, indeed if a
consumer is not ready for more records it surely should not call poll.
Except that it needs to do so periodically in order to indicate that its
live. Forget about the "backpressure", I guess I was wrong with phrasing
this so lets not get caught up on it.

You say pause/resume can be used to prioritise certain topics/partitions
over others. And indeed this is the case. So instead of thinking about it
in terms of backpressure, lets put it in a different way. The Akka streams
connector would like to prioritise certain topics over others, using once
consumer instance. On top of that, add the detail that the priorities
change quite frequently (which translates to calling pause/resume
frequently). So all that being said, what would be a proper way to handle
the situation without throwing the pre-fetched records away when calling
poll on a consumer that happens to have a topic that was recently paused
(and that might be un-paused soon )? Am I the only one who considers that
an actual problem with the use os pause/resume ? Not sure how to explain
the situation in a better way..

Zahari


On Thu, Oct 18, 2018 at 9:46 AM Zahari Dichev 
wrote:

> Thanks a lot Jan,
>
> I will read it.
>
> Zahari
>
> On Thu, Oct 18, 2018 at 9:31 AM Jan Filipiak 
> wrote:
>
>> especially my suggestions ;)
>>
>> On 18.10.2018 08:30, Jan Filipiak wrote:
>> > Hi Zahari,
>> >
>> > would you be willing to scan through the KIP-349 discussion a little?
>> > I think it has suggestions that could be interesting for you
>> >
>> > Best Jan
>> >
>> > On 16.10.2018 09:29, Zahari Dichev wrote:
>> >> Hi there Kafka developers,
>> >>
>> >> I am currently trying to find a solution to an issue that has been
>> >> manifesting itself in the Akka streams implementation of the Kafka
>> >> connector. When it comes to consuming messages, the implementation
>> relies
>> >> heavily on the fact that we can pause and resume partitions. In some
>> >> situations when a single consumer instance is shared among several
>> >> streams,
>> >> we might end up with frequently pausing and unpausing a set of topic
>> >> partitions, which is the main facility that allows us to implement back
>> >> pressure. This however has certain disadvantages, especially when
>> >> there are
>> >> two consumers that differ in terms of processing speed.
>> >>
>> >> To articulate the issue more clearly, imagine that a consumer maintains
>> >> assignments for two topic partitions *TP1* and *TP2*. This consumer is
>> >> shared by two streams - S1 and S2. So effectively when we have demand
>> >> from
>> >> only one of the streams - *S1*, we will pause one of the topic
>> partitions
>> >> *TP2* and call *poll()* on the consumer to only retrieve the records
>> for
>> >> the demanded topic partition - *TP1*. The result of that is all the
>> >> records
>> >> that have been prefetched for *TP2* are now thrown away by the fetcher
>> >> ("*Not
>> >> returning fetched records for assigned partition TP2 since it is no
>> >> longer
>> >> fetchable"*). If we extrapolate that to multiple streams sharing the
>> same
>> >> consumer, we might quickly end up in a situation where we throw
>> >> prefetched
>> >> data quite often. This does not seem like the most efficient approach
>> and
>> >> in fact produces quite a lot of overlapping fetch requests as
>> illustrated
>> >> in the following issue:
>> >>
>> >> https://github.com/akka/alpakka-kafka/issues/549
>> >>
>> >> I am writing this email to get some initial opinion on a KIP I was
>> >> thinking
>> >> about. What if we give the clients of the Consumer API a bit more
>> control
>> >> of what to do with this prefetched data. Two options I am wondering
>> >> about:
>> >>
>> >> 1. Introduce a configuration setting, such as*
>> >> "return-prefetched-data-for-paused-topic-partitions = false"* (have to
>> >> think of a better name), which when set to true will return what is
>> >> prefetched instead of throwing it away on calling *poll()*. Since this
>> is
>> >> amount of data that is bounded by the maximum size of the prefetch, we
>> >> can
>> >> control what is the most amount of records returned. The client of the
>> >> consumer API can then be responsible for keeping that data around and
>> use
>> >> it when appropriate (i.e. when demand is present)
>> >>
>> >> 2. Introduce a facility to pass in a buffer into which the prefetched
>> >> records are drained when poll is called and paused partitions have some
>> >> prefetched records.
>> >>
>> >> Any opinions on the matter are welcome. Thanks a lot !
>> >>
>> >> Zahari Dichev
>> >>
>>
>


[jira] [Created] (KAFKA-7518) FutureRecordMetadata.get deadline calculation from timeout is not using timeunit

2018-10-18 Thread Andras Katona (JIRA)
Andras Katona created KAFKA-7518:


 Summary: FutureRecordMetadata.get deadline calculation from 
timeout is not using timeunit
 Key: KAFKA-7518
 URL: https://issues.apache.org/jira/browse/KAFKA-7518
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: Andras Katona
Assignee: Andras Katona


Code below assumes that timeout is in milliseconds when calculating deadline.

{code}
@Override
public RecordMetadata get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
// Handle overflow.
long now = System.currentTimeMillis();
long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now + 
timeout;
{code}

It was causing 
{{kafka.server.DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate}} 
to fail sometimes and it took me to this code segment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-382: MirrorMaker 2.0

2018-10-18 Thread Jan Filipiak
then I just hope that in the midsts of all this new features I can still 
at least use it to copy my messages from A to B later.


Another hint you should be aware of:

https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics

That was always a design I admired, with active / active replications 
and stuff, It feels like we are going away from this another step.


On 17.10.2018 17:34, Ryanne Dolan wrote:

Jan, these are two separate issues.

1) consumer coordination should not, ideally, involve unreliable or slow
connections. Naively, a KafkaSourceConnector would coordinate via the
source cluster. We can do better than this, but I'm deferring this
optimization for now.

2) exactly-once between two clusters is mind-bending. But keep in mind
that transactions are managed by the producer, not the consumer. In
fact, it's the producer that requests that offsets be committed for the
current transaction. Obviously, these offsets are committed in whatever
cluster the producer is sending to.

These two issues are closely related. They are both resolved by not
coordinating or committing via the source cluster. And in fact, this is
the general model of SourceConnectors anyway, since most
SourceConnectors _only_ have a destination cluster.

If there is a lot of interest here, I can expound further on this aspect
of MM2, but again I think this is premature until this first KIP is
approved. I intend to address each of these in separate KIPs following
this one.

Ryanne

On Wed, Oct 17, 2018 at 7:09 AM Jan Filipiak mailto:jan.filip...@trivago.com>> wrote:

This is not a performance optimisation. Its a fundamental design choice.


I never really took a look how streams does exactly once. (its a trap
anyways and you usually can deal with at least once donwstream pretty
easy). But I am very certain its not gonna get somewhere if offset
commit and record produce cluster are not the same.

Pretty sure without this _design choice_ you can skip on that exactly
once already

Best Jan

On 16.10.2018 18:16, Ryanne Dolan wrote:
 >  >  But one big obstacle in this was
 > always that group coordination happened on the source cluster.
 >
 > Jan, thank you for bringing up this issue with legacy MirrorMaker. I
 > totally agree with you. This is one of several problems with
MirrorMaker
 > I intend to solve in MM2, and I already have a design and
prototype that
 > solves this and related issues. But as you pointed out, this KIP is
 > already rather complex, and I want to focus on the core feature set
 > rather than performance optimizations for now. If we can agree on
what
 > MM2 looks like, it will be very easy to agree to improve its
performance
 > and reliability.
 >
 > That said, I look forward to your support on a subsequent KIP that
 > addresses consumer coordination and rebalance issues. Stay tuned!
 >
 > Ryanne
 >
 > On Tue, Oct 16, 2018 at 6:58 AM Jan Filipiak
mailto:jan.filip...@trivago.com>
 > >> wrote:
 >
 > Hi,
 >
 > Currently MirrorMaker is usually run collocated with the target
 > cluster.
 > This is all nice and good. But one big obstacle in this was
 > always that group coordination happened on the source
cluster. So when
 > then network was congested, you sometimes loose group
membership and
 > have to rebalance and all this.
 >
 > So one big request from we would be the support of having
coordination
 > cluster != source cluster.
 >
 > I would generally say a LAN is better than a WAN for doing group
 > coordinaton and there is no reason we couldn't have a group
consuming
 > topics from a different cluster and committing offsets to another
 > one right?
 >
 > Other than that. It feels like the KIP has too much features
where many
 > of them are not really wanted and counter productive but I
will just
 > wait and see how the discussion goes.
 >
 > Best Jan
 >
 >
 > On 15.10.2018 18:16, Ryanne Dolan wrote:
 >  > Hey y'all!
 >  >
 >  > Please take a look at KIP-382:
 >  >
 >  >
 >
https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0
 >  >
 >  > Thanks for your feedback and support.
 >  >
 >  > Ryanne
 >  >
 >



Re: Throwing away prefetched records optimisation.

2018-10-18 Thread Zahari Dichev
Thanks a lot Jan,

I will read it.

Zahari

On Thu, Oct 18, 2018 at 9:31 AM Jan Filipiak 
wrote:

> especially my suggestions ;)
>
> On 18.10.2018 08:30, Jan Filipiak wrote:
> > Hi Zahari,
> >
> > would you be willing to scan through the KIP-349 discussion a little?
> > I think it has suggestions that could be interesting for you
> >
> > Best Jan
> >
> > On 16.10.2018 09:29, Zahari Dichev wrote:
> >> Hi there Kafka developers,
> >>
> >> I am currently trying to find a solution to an issue that has been
> >> manifesting itself in the Akka streams implementation of the Kafka
> >> connector. When it comes to consuming messages, the implementation
> relies
> >> heavily on the fact that we can pause and resume partitions. In some
> >> situations when a single consumer instance is shared among several
> >> streams,
> >> we might end up with frequently pausing and unpausing a set of topic
> >> partitions, which is the main facility that allows us to implement back
> >> pressure. This however has certain disadvantages, especially when
> >> there are
> >> two consumers that differ in terms of processing speed.
> >>
> >> To articulate the issue more clearly, imagine that a consumer maintains
> >> assignments for two topic partitions *TP1* and *TP2*. This consumer is
> >> shared by two streams - S1 and S2. So effectively when we have demand
> >> from
> >> only one of the streams - *S1*, we will pause one of the topic
> partitions
> >> *TP2* and call *poll()* on the consumer to only retrieve the records for
> >> the demanded topic partition - *TP1*. The result of that is all the
> >> records
> >> that have been prefetched for *TP2* are now thrown away by the fetcher
> >> ("*Not
> >> returning fetched records for assigned partition TP2 since it is no
> >> longer
> >> fetchable"*). If we extrapolate that to multiple streams sharing the
> same
> >> consumer, we might quickly end up in a situation where we throw
> >> prefetched
> >> data quite often. This does not seem like the most efficient approach
> and
> >> in fact produces quite a lot of overlapping fetch requests as
> illustrated
> >> in the following issue:
> >>
> >> https://github.com/akka/alpakka-kafka/issues/549
> >>
> >> I am writing this email to get some initial opinion on a KIP I was
> >> thinking
> >> about. What if we give the clients of the Consumer API a bit more
> control
> >> of what to do with this prefetched data. Two options I am wondering
> >> about:
> >>
> >> 1. Introduce a configuration setting, such as*
> >> "return-prefetched-data-for-paused-topic-partitions = false"* (have to
> >> think of a better name), which when set to true will return what is
> >> prefetched instead of throwing it away on calling *poll()*. Since this
> is
> >> amount of data that is bounded by the maximum size of the prefetch, we
> >> can
> >> control what is the most amount of records returned. The client of the
> >> consumer API can then be responsible for keeping that data around and
> use
> >> it when appropriate (i.e. when demand is present)
> >>
> >> 2. Introduce a facility to pass in a buffer into which the prefetched
> >> records are drained when poll is called and paused partitions have some
> >> prefetched records.
> >>
> >> Any opinions on the matter are welcome. Thanks a lot !
> >>
> >> Zahari Dichev
> >>
>


Re: Throwing away prefetched records optimisation.

2018-10-18 Thread Jan Filipiak

especially my suggestions ;)

On 18.10.2018 08:30, Jan Filipiak wrote:

Hi Zahari,

would you be willing to scan through the KIP-349 discussion a little?
I think it has suggestions that could be interesting for you

Best Jan

On 16.10.2018 09:29, Zahari Dichev wrote:

Hi there Kafka developers,

I am currently trying to find a solution to an issue that has been
manifesting itself in the Akka streams implementation of the Kafka
connector. When it comes to consuming messages, the implementation relies
heavily on the fact that we can pause and resume partitions. In some
situations when a single consumer instance is shared among several
streams,
we might end up with frequently pausing and unpausing a set of topic
partitions, which is the main facility that allows us to implement back
pressure. This however has certain disadvantages, especially when
there are
two consumers that differ in terms of processing speed.

To articulate the issue more clearly, imagine that a consumer maintains
assignments for two topic partitions *TP1* and *TP2*. This consumer is
shared by two streams - S1 and S2. So effectively when we have demand
from
only one of the streams - *S1*, we will pause one of the topic partitions
*TP2* and call *poll()* on the consumer to only retrieve the records for
the demanded topic partition - *TP1*. The result of that is all the
records
that have been prefetched for *TP2* are now thrown away by the fetcher
("*Not
returning fetched records for assigned partition TP2 since it is no
longer
fetchable"*). If we extrapolate that to multiple streams sharing the same
consumer, we might quickly end up in a situation where we throw
prefetched
data quite often. This does not seem like the most efficient approach and
in fact produces quite a lot of overlapping fetch requests as illustrated
in the following issue:

https://github.com/akka/alpakka-kafka/issues/549

I am writing this email to get some initial opinion on a KIP I was
thinking
about. What if we give the clients of the Consumer API a bit more control
of what to do with this prefetched data. Two options I am wondering
about:

1. Introduce a configuration setting, such as*
"return-prefetched-data-for-paused-topic-partitions = false"* (have to
think of a better name), which when set to true will return what is
prefetched instead of throwing it away on calling *poll()*. Since this is
amount of data that is bounded by the maximum size of the prefetch, we
can
control what is the most amount of records returned. The client of the
consumer API can then be responsible for keeping that data around and use
it when appropriate (i.e. when demand is present)

2. Introduce a facility to pass in a buffer into which the prefetched
records are drained when poll is called and paused partitions have some
prefetched records.

Any opinions on the matter are welcome. Thanks a lot !

Zahari Dichev



Re: Throwing away prefetched records optimisation.

2018-10-18 Thread Jan Filipiak

Hi Zahari,

would you be willing to scan through the KIP-349 discussion a little?
I think it has suggestions that could be interesting for you

Best Jan

On 16.10.2018 09:29, Zahari Dichev wrote:

Hi there Kafka developers,

I am currently trying to find a solution to an issue that has been
manifesting itself in the Akka streams implementation of the Kafka
connector. When it comes to consuming messages, the implementation relies
heavily on the fact that we can pause and resume partitions. In some
situations when a single consumer instance is shared among several streams,
we might end up with frequently pausing and unpausing a set of topic
partitions, which is the main facility that allows us to implement back
pressure. This however has certain disadvantages, especially when there are
two consumers that differ in terms of processing speed.

To articulate the issue more clearly, imagine that a consumer maintains
assignments for two topic partitions *TP1* and *TP2*. This consumer is
shared by two streams - S1 and S2. So effectively when we have demand from
only one of the streams - *S1*, we will pause one of the topic partitions
*TP2* and call *poll()* on the consumer to only retrieve the records for
the demanded topic partition - *TP1*. The result of that is all the records
that have been prefetched for *TP2* are now thrown away by the fetcher ("*Not
returning fetched records for assigned partition TP2 since it is no longer
fetchable"*). If we extrapolate that to multiple streams sharing the same
consumer, we might quickly end up in a situation where we throw prefetched
data quite often. This does not seem like the most efficient approach and
in fact produces quite a lot of overlapping fetch requests as illustrated
in the following issue:

https://github.com/akka/alpakka-kafka/issues/549

I am writing this email to get some initial opinion on a KIP I was thinking
about. What if we give the clients of the Consumer API a bit more control
of what to do with this prefetched data. Two options I am wondering about:

1. Introduce a configuration setting, such as*
"return-prefetched-data-for-paused-topic-partitions = false"* (have to
think of a better name), which when set to true will return what is
prefetched instead of throwing it away on calling *poll()*. Since this is
amount of data that is bounded by the maximum size of the prefetch, we can
control what is the most amount of records returned. The client of the
consumer API can then be responsible for keeping that data around and use
it when appropriate (i.e. when demand is present)

2. Introduce a facility to pass in a buffer into which the prefetched
records are drained when poll is called and paused partitions have some
prefetched records.

Any opinions on the matter are welcome. Thanks a lot !

Zahari Dichev