Re: Throwing away prefetched records optimisation.

2018-10-17 Thread Zahari Dichev
Thanks a lot Jan,

I will read it.

Zahari

On Thu, Oct 18, 2018 at 9:31 AM Jan Filipiak 
wrote:

> especially my suggestions ;)
>
> On 18.10.2018 08:30, Jan Filipiak wrote:
> > Hi Zahari,
> >
> > would you be willing to scan through the KIP-349 discussion a little?
> > I think it has suggestions that could be interesting for you
> >
> > Best Jan
> >
> > On 16.10.2018 09:29, Zahari Dichev wrote:
> >> Hi there Kafka developers,
> >>
> >> I am currently trying to find a solution to an issue that has been
> >> manifesting itself in the Akka streams implementation of the Kafka
> >> connector. When it comes to consuming messages, the implementation
> relies
> >> heavily on the fact that we can pause and resume partitions. In some
> >> situations when a single consumer instance is shared among several
> >> streams,
> >> we might end up with frequently pausing and unpausing a set of topic
> >> partitions, which is the main facility that allows us to implement back
> >> pressure. This however has certain disadvantages, especially when
> >> there are
> >> two consumers that differ in terms of processing speed.
> >>
> >> To articulate the issue more clearly, imagine that a consumer maintains
> >> assignments for two topic partitions *TP1* and *TP2*. This consumer is
> >> shared by two streams - S1 and S2. So effectively when we have demand
> >> from
> >> only one of the streams - *S1*, we will pause one of the topic
> partitions
> >> *TP2* and call *poll()* on the consumer to only retrieve the records for
> >> the demanded topic partition - *TP1*. The result of that is all the
> >> records
> >> that have been prefetched for *TP2* are now thrown away by the fetcher
> >> ("*Not
> >> returning fetched records for assigned partition TP2 since it is no
> >> longer
> >> fetchable"*). If we extrapolate that to multiple streams sharing the
> same
> >> consumer, we might quickly end up in a situation where we throw
> >> prefetched
> >> data quite often. This does not seem like the most efficient approach
> and
> >> in fact produces quite a lot of overlapping fetch requests as
> illustrated
> >> in the following issue:
> >>
> >> https://github.com/akka/alpakka-kafka/issues/549
> >>
> >> I am writing this email to get some initial opinion on a KIP I was
> >> thinking
> >> about. What if we give the clients of the Consumer API a bit more
> control
> >> of what to do with this prefetched data. Two options I am wondering
> >> about:
> >>
> >> 1. Introduce a configuration setting, such as*
> >> "return-prefetched-data-for-paused-topic-partitions = false"* (have to
> >> think of a better name), which when set to true will return what is
> >> prefetched instead of throwing it away on calling *poll()*. Since this
> is
> >> amount of data that is bounded by the maximum size of the prefetch, we
> >> can
> >> control what is the most amount of records returned. The client of the
> >> consumer API can then be responsible for keeping that data around and
> use
> >> it when appropriate (i.e. when demand is present)
> >>
> >> 2. Introduce a facility to pass in a buffer into which the prefetched
> >> records are drained when poll is called and paused partitions have some
> >> prefetched records.
> >>
> >> Any opinions on the matter are welcome. Thanks a lot !
> >>
> >> Zahari Dichev
> >>
>


Re: Throwing away prefetched records optimisation.

2018-10-17 Thread Jan Filipiak

especially my suggestions ;)

On 18.10.2018 08:30, Jan Filipiak wrote:

Hi Zahari,

would you be willing to scan through the KIP-349 discussion a little?
I think it has suggestions that could be interesting for you

Best Jan

On 16.10.2018 09:29, Zahari Dichev wrote:

Hi there Kafka developers,

I am currently trying to find a solution to an issue that has been
manifesting itself in the Akka streams implementation of the Kafka
connector. When it comes to consuming messages, the implementation relies
heavily on the fact that we can pause and resume partitions. In some
situations when a single consumer instance is shared among several
streams,
we might end up with frequently pausing and unpausing a set of topic
partitions, which is the main facility that allows us to implement back
pressure. This however has certain disadvantages, especially when
there are
two consumers that differ in terms of processing speed.

To articulate the issue more clearly, imagine that a consumer maintains
assignments for two topic partitions *TP1* and *TP2*. This consumer is
shared by two streams - S1 and S2. So effectively when we have demand
from
only one of the streams - *S1*, we will pause one of the topic partitions
*TP2* and call *poll()* on the consumer to only retrieve the records for
the demanded topic partition - *TP1*. The result of that is all the
records
that have been prefetched for *TP2* are now thrown away by the fetcher
("*Not
returning fetched records for assigned partition TP2 since it is no
longer
fetchable"*). If we extrapolate that to multiple streams sharing the same
consumer, we might quickly end up in a situation where we throw
prefetched
data quite often. This does not seem like the most efficient approach and
in fact produces quite a lot of overlapping fetch requests as illustrated
in the following issue:

https://github.com/akka/alpakka-kafka/issues/549

I am writing this email to get some initial opinion on a KIP I was
thinking
about. What if we give the clients of the Consumer API a bit more control
of what to do with this prefetched data. Two options I am wondering
about:

1. Introduce a configuration setting, such as*
"return-prefetched-data-for-paused-topic-partitions = false"* (have to
think of a better name), which when set to true will return what is
prefetched instead of throwing it away on calling *poll()*. Since this is
amount of data that is bounded by the maximum size of the prefetch, we
can
control what is the most amount of records returned. The client of the
consumer API can then be responsible for keeping that data around and use
it when appropriate (i.e. when demand is present)

2. Introduce a facility to pass in a buffer into which the prefetched
records are drained when poll is called and paused partitions have some
prefetched records.

Any opinions on the matter are welcome. Thanks a lot !

Zahari Dichev



Re: Throwing away prefetched records optimisation.

2018-10-17 Thread Jan Filipiak

Hi Zahari,

would you be willing to scan through the KIP-349 discussion a little?
I think it has suggestions that could be interesting for you

Best Jan

On 16.10.2018 09:29, Zahari Dichev wrote:

Hi there Kafka developers,

I am currently trying to find a solution to an issue that has been
manifesting itself in the Akka streams implementation of the Kafka
connector. When it comes to consuming messages, the implementation relies
heavily on the fact that we can pause and resume partitions. In some
situations when a single consumer instance is shared among several streams,
we might end up with frequently pausing and unpausing a set of topic
partitions, which is the main facility that allows us to implement back
pressure. This however has certain disadvantages, especially when there are
two consumers that differ in terms of processing speed.

To articulate the issue more clearly, imagine that a consumer maintains
assignments for two topic partitions *TP1* and *TP2*. This consumer is
shared by two streams - S1 and S2. So effectively when we have demand from
only one of the streams - *S1*, we will pause one of the topic partitions
*TP2* and call *poll()* on the consumer to only retrieve the records for
the demanded topic partition - *TP1*. The result of that is all the records
that have been prefetched for *TP2* are now thrown away by the fetcher ("*Not
returning fetched records for assigned partition TP2 since it is no longer
fetchable"*). If we extrapolate that to multiple streams sharing the same
consumer, we might quickly end up in a situation where we throw prefetched
data quite often. This does not seem like the most efficient approach and
in fact produces quite a lot of overlapping fetch requests as illustrated
in the following issue:

https://github.com/akka/alpakka-kafka/issues/549

I am writing this email to get some initial opinion on a KIP I was thinking
about. What if we give the clients of the Consumer API a bit more control
of what to do with this prefetched data. Two options I am wondering about:

1. Introduce a configuration setting, such as*
"return-prefetched-data-for-paused-topic-partitions = false"* (have to
think of a better name), which when set to true will return what is
prefetched instead of throwing it away on calling *poll()*. Since this is
amount of data that is bounded by the maximum size of the prefetch, we can
control what is the most amount of records returned. The client of the
consumer API can then be responsible for keeping that data around and use
it when appropriate (i.e. when demand is present)

2. Introduce a facility to pass in a buffer into which the prefetched
records are drained when poll is called and paused partitions have some
prefetched records.

Any opinions on the matter are welcome. Thanks a lot !

Zahari Dichev



Re: [DISCUSS]KIP-216: IQ should throw different exceptions for different errors

2018-10-17 Thread vito jeng
Just open a PR for further discussion:
https://github.com/apache/kafka/pull/5814

Any suggestion is welcome.
Thanks!

---
Vito


On Thu, Oct 11, 2018 at 12:14 AM vito jeng  wrote:

> Hi John,
>
> Thanks for reviewing the KIP.
>
> > I didn't follow the addition of a new method to the QueryableStoreType
> > interface. Can you elaborate why this is necessary to support the new
> > exception types?
>
> To support the new exception types, I would check stream state in the
> following classes:
>   - CompositeReadOnlyKeyValueStore class
>   - CompositeReadOnlySessionStore class
>   - CompositeReadOnlyWindowStore class
>   - DelegatingPeekingKeyValueIterator class
>
> It is also necessary to keep backward compatibility. So I plan passing
> stream
> instance to QueryableStoreType instance during KafkaStreams#store()
> invoked.
> It looks a most simple way, I think.
>
> It is why I add a new method to the QueryableStoreType interface. I can
> understand
> that we should try to avoid adding the public api method. However, at the
> moment
> I have no better ideas.
>
> Any thoughts?
>
>
> > Also, looking over your KIP again, it seems valuable to introduce
> > "retriable store exception" and "fatal store exception" marker interfaces
> > that the various exceptions can mix in. It would be nice from a usability
> > perspective to be able to just log and retry on any "retriable" exception
> > and log and shutdown on any fatal exception.
>
> I agree that this is valuable to the user.
> I'll update the KIP.
>
>
> Thanks
>
>
> ---
> Vito
>
>
> On Tue, Oct 9, 2018 at 2:30 AM John Roesler  wrote:
>
>> Hi Vito,
>>
>> I'm glad to hear you're well again!
>>
>> I didn't follow the addition of a new method to the QueryableStoreType
>> interface. Can you elaborate why this is necessary to support the new
>> exception types?
>>
>> Also, looking over your KIP again, it seems valuable to introduce
>> "retriable store exception" and "fatal store exception" marker interfaces
>> that the various exceptions can mix in. It would be nice from a usability
>> perspective to be able to just log and retry on any "retriable" exception
>> and log and shutdown on any fatal exception.
>>
>> Thanks,
>> -John
>>
>> On Fri, Oct 5, 2018 at 11:47 AM Guozhang Wang  wrote:
>>
>> > Thanks for the explanation, that makes sense.
>> >
>> >
>> > Guozhang
>> >
>> >
>> > On Mon, Jun 25, 2018 at 2:28 PM, Matthias J. Sax > >
>> > wrote:
>> >
>> > > The scenario I had I mind was, that KS is started in one thread while
>> a
>> > > second thread has a reference to the object to issue queries.
>> > >
>> > > If a query is issue before the "main thread" started KS, and the
>> "query
>> > > thread" knows that it will eventually get started, it can retry. On
>> the
>> > > other hand, if KS is in state PENDING_SHUTDOWN or DEAD, it is
>> impossible
>> > > to issue any query against it now or in the future and thus the error
>> is
>> > > not retryable.
>> > >
>> > >
>> > > -Matthias
>> > >
>> > > On 6/25/18 10:15 AM, Guozhang Wang wrote:
>> > > > I'm wondering if StreamThreadNotStarted could be merged into
>> > > > StreamThreadNotRunning, because I think users' handling logic for
>> the
>> > > third
>> > > > case would be likely the same as the second. Do you have some
>> scenarios
>> > > > where users may want to handle them differently?
>> > > >
>> > > > Guozhang
>> > > >
>> > > > On Sun, Jun 24, 2018 at 5:25 PM, Matthias J. Sax <
>> > matth...@confluent.io>
>> > > > wrote:
>> > > >
>> > > >> Sorry to hear! Get well soon!
>> > > >>
>> > > >> It's not a big deal if the KIP stalls a little bit. Feel free to
>> pick
>> > it
>> > > >> up again when you find time.
>> > > >>
>> > > > Is `StreamThreadNotRunningException` really an retryable error?
>> > > 
>> > >  When KafkaStream state is REBALANCING, I think it is a retryable
>> > > error.
>> > > 
>> > >  StreamThreadStateStoreProvider#stores() will throw
>> > >  StreamThreadNotRunningException when StreamThread state is not
>> > > >> RUNNING. The
>> > >  user can retry until KafkaStream state is RUNNING.
>> > > >>
>> > > >> I see. If this is the intention, than I would suggest to have two
>> (or
>> > > >> maybe three) different exceptions:
>> > > >>
>> > > >>  - StreamThreadRebalancingException (retryable)
>> > > >>  - StreamThreadNotRunning (not retryable -- thrown if in state
>> > > >> PENDING_SHUTDOWN or DEAD
>> > > >>  - maybe StreamThreadNotStarted (for state CREATED)
>> > > >>
>> > > >> The last one is tricky and could also be merged into one of the
>> first
>> > > >> two, depending if you want to argue that it's retryable or not.
>> (Just
>> > > >> food for though -- not sure what others think.)
>> > > >>
>> > > >>
>> > > >>
>> > > >> -Matthias
>> > > >>
>> > > >> On 6/22/18 8:06 AM, vito jeng wrote:
>> > > >>> Matthias,
>> > > >>>
>> > > >>> Thank you for your assistance.
>> > > >>>
>> > >  what is the status of this KIP?
>> > > >>>
>> > > >>> Unfortunately, there is no further progress.
>>

Build failed in Jenkins: kafka-2.1-jdk8 #30

2018-10-17 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-7080 and KAFKA-7222: Cleanup overlapping KIP changes Part 2

--
[...truncated 232.08 KB...]

kafka.admin.ConfigCommandTest > testDynamicBrokerConfigUpdateUsingZooKeeper 
STARTED

kafka.admin.ConfigCommandTest > testDynamicBrokerConfigUpdateUsingZooKeeper 
PASSED

kafka.admin.ConfigCommandTest > testQuotaDescribeEntities STARTED

kafka.admin.ConfigCommandTest > testQuotaDescribeEntities PASSED

kafka.admin.ConfigCommandTest > shouldParseArgumentsForClientsEntityType STARTED

kafka.admin.ConfigCommandTest > shouldParseArgumentsForClientsEntityType PASSED

kafka.admin.ConfigCommandTest > shouldExitWithNonZeroStatusOnArgError STARTED

kafka.admin.ConfigCommandTest > shouldExitWithNonZeroStatusOnArgError PASSED

kafka.admin.AddPartitionsTest > testReplicaPlacementAllServers STARTED

kafka.admin.AddPartitionsTest > testReplicaPlacementAllServers PASSED

kafka.admin.AddPartitionsTest > testMissingPartition0 STARTED

kafka.admin.AddPartitionsTest > testMissingPartition0 PASSED

kafka.admin.AddPartitionsTest > testWrongReplicaCount STARTED

kafka.admin.AddPartitionsTest > testWrongReplicaCount PASSED

kafka.admin.AddPartitionsTest > testReplicaPlacementPartialServers STARTED

kafka.admin.AddPartitionsTest > testReplicaPlacementPartialServers PASSED

kafka.admin.AddPartitionsTest > testIncrementPartitions STARTED

kafka.admin.AddPartitionsTest > testIncrementPartitions PASSED

kafka.admin.AddPartitionsTest > testManualAssignmentOfReplicas STARTED

kafka.admin.AddPartitionsTest > testManualAssignmentOfReplicas PASSED

kafka.admin.DeleteTopicTest > testDeleteTopicWithCleaner STARTED

kafka.admin.DeleteTopicTest > testDeleteTopicWithCleaner PASSED

kafka.admin.DeleteTopicTest > testResumeDeleteTopicOnControllerFailover STARTED

kafka.admin.DeleteTopicTest > testResumeDeleteTopicOnControllerFailover PASSED

kafka.admin.DeleteTopicTest > testResumeDeleteTopicWithRecoveredFollower STARTED

kafka.admin.DeleteTopicTest > testResumeDeleteTopicWithRecoveredFollower PASSED

kafka.admin.DeleteTopicTest > testDeleteTopicAlreadyMarkedAsDeleted STARTED

kafka.admin.DeleteTopicTest > testDeleteTopicAlreadyMarkedAsDeleted PASSED

kafka.admin.DeleteTopicTest > testIncreasePartitionCountDuringDeleteTopic 
STARTED

kafka.admin.DeleteTopicTest > testIncreasePartitionCountDuringDeleteTopic PASSED

kafka.admin.DeleteTopicTest > testPartitionReassignmentDuringDeleteTopic STARTED

kafka.admin.DeleteTopicTest > testPartitionReassignmentDuringDeleteTopic PASSED

kafka.admin.DeleteTopicTest > testDeleteNonExistingTopic STARTED

kafka.admin.DeleteTopicTest > testDeleteNonExistingTopic PASSED

kafka.admin.DeleteTopicTest > testRecreateTopicAfterDeletion STARTED

kafka.admin.DeleteTopicTest > testRecreateTopicAfterDeletion PASSED

kafka.admin.DeleteTopicTest > testDisableDeleteTopic STARTED

kafka.admin.DeleteTopicTest > testDisableDeleteTopic PASSED

kafka.admin.DeleteTopicTest > testAddPartitionDuringDeleteTopic STARTED

kafka.admin.DeleteTopicTest > testAddPartitionDuringDeleteTopic PASSED

kafka.admin.DeleteTopicTest > testDeleteTopicWithAllAliveReplicas STARTED

kafka.admin.DeleteTopicTest > testDeleteTopicWithAllAliveReplicas PASSED

kafka.admin.DeleteTopicTest > testDeleteTopicDuringAddPartition STARTED

kafka.admin.DeleteTopicTest > testDeleteTopicDuringAddPartition PASSED

kafka.admin.DeleteTopicTest > testDeletingPartiallyDeletedTopic STARTED

kafka.admin.DeleteTopicTest > testDeletingPartiallyDeletedTopic PASSED

kafka.admin.AdminTest > testGetBrokerMetadatas STARTED

kafka.admin.AdminTest > testGetBrokerMetadatas PASSED

kafka.admin.AdminTest > testBootstrapClientIdConfig STARTED

kafka.admin.AdminTest > testBootstrapClientIdConfig PASSED

kafka.admin.AdminTest > testManualReplicaAssignment STARTED

kafka.admin.AdminTest > testManualReplicaAssignment PASSED

kafka.admin.AdminTest > testConcurrentTopicCreation STARTED

kafka.admin.AdminTest > testConcurrentTopicCreation PASSED

kafka.admin.AdminTest > testTopicCreationWithCollision STARTED

kafka.admin.AdminTest > testTopicCreationWithCollision PASSED

kafka.admin.AdminTest > testTopicCreationInZK STARTED

kafka.admin.AdminTest > testTopicCreationInZK PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > shouldFailIfBlankArg STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > shouldFailIfBlankArg PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowVerifyWithoutReassignmentOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowVerifyWithoutReassignmentOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithoutBrokersOption STARTED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowGenerateWithoutBrokersOption PASSED

kafka.admin.ReassignPartitionsCommandArgsTest > 
shouldNotAllowTopicsOptionWithVerify STARTED

kafka.admin.ReassignPartitionsC

Re: Throwing away prefetched records optimisation.

2018-10-17 Thread Ryanne Dolan
Zahira,

Kafka consumers use a pull model. I'm not sure what backpressure means in
this context. If a consumer isn't ready for more records, it just doesn't
poll() for more.

The documentation talks about "flow control" but doesn't mention
"backpressure". I think these are related but different concepts.
Pause/resume lets you prioritize some topics/partitions over others ("flow
control"), but that isn't a signal to a sender to stop sending
("backpressure").

Ryanne

On Wed, Oct 17, 2018 at 1:55 PM Zahari Dichev 
wrote:

> Hi there Ryanne,
>
> Thanks for the response ! There is most likely quite a lot that I am
> missing here, but after I read the docs, it seems to me that the
> pause/resume API has been provided with the very purpose of implementing
> bespoke flow control. That being said, I see it as quite natural to be able
> to pause and resume as needed without facing the problems outlined in
> my previous email. So if we are going totally wrong about this and using
> the pause/resume the wrong way, feel free to elaborate. I am really not
> trying to argue my case here, just genuinely attempting to understand what
> can be done on our end to improve the Akka streams integration.. Thanks in
> advance :)
>
> Zahari
>
> On Wed, Oct 17, 2018 at 5:49 PM Ryanne Dolan 
> wrote:
>
> > Zahari,
> >
> > It sounds to me like this problem is due to Akka attempting to implement
> > additional backpressure on top of the Consumer API. I'd suggest they not
> do
> > that, and then this problem goes away.
> >
> > Ryanne
> >
> > On Wed, Oct 17, 2018 at 7:35 AM Zahari Dichev 
> > wrote:
> >
> > > Hi there,
> > >
> > > Are there any opinions on the matter described in my previous email? I
> > > think this is quite important when it comes to implementing any non
> > trivial
> > > functionality that relies on pause/resume. Of course if I am mistaken,
> > feel
> > > free to elaborate.
> > >
> > > Thanks,
> > > Zahari
> > >
> > > On Tue, Oct 16, 2018 at 10:29 AM Zahari Dichev  >
> > > wrote:
> > >
> > > > Hi there Kafka developers,
> > > >
> > > > I am currently trying to find a solution to an issue that has been
> > > > manifesting itself in the Akka streams implementation of the Kafka
> > > > connector. When it comes to consuming messages, the implementation
> > relies
> > > > heavily on the fact that we can pause and resume partitions. In some
> > > > situations when a single consumer instance is shared among several
> > > streams,
> > > > we might end up with frequently pausing and unpausing a set of topic
> > > > partitions, which is the main facility that allows us to implement
> back
> > > > pressure. This however has certain disadvantages, especially when
> there
> > > are
> > > > two consumers that differ in terms of processing speed.
> > > >
> > > > To articulate the issue more clearly, imagine that a consumer
> maintains
> > > > assignments for two topic partitions *TP1* and *TP2*. This consumer
> is
> > > > shared by two streams - S1 and S2. So effectively when we have demand
> > > from
> > > > only one of the streams - *S1*, we will pause one of the topic
> > partitions
> > > > *TP2* and call *poll()* on the consumer to only retrieve the records
> > for
> > > > the demanded topic partition - *TP1*. The result of that is all the
> > > > records that have been prefetched for *TP2* are now thrown away by
> the
> > > > fetcher ("*Not returning fetched records for assigned partition TP2
> > since
> > > > it is no longer fetchable"*). If we extrapolate that to multiple
> > streams
> > > > sharing the same consumer, we might quickly end up in a situation
> where
> > > we
> > > > throw prefetched data quite often. This does not seem like the most
> > > > efficient approach and in fact produces quite a lot of overlapping
> > fetch
> > > > requests as illustrated in the following issue:
> > > >
> > > > https://github.com/akka/alpakka-kafka/issues/549
> > > >
> > > > I am writing this email to get some initial opinion on a KIP I was
> > > > thinking about. What if we give the clients of the Consumer API a bit
> > > more
> > > > control of what to do with this prefetched data. Two options I am
> > > wondering
> > > > about:
> > > >
> > > > 1. Introduce a configuration setting, such as*
> > > > "return-prefetched-data-for-paused-topic-partitions = false"* (have
> to
> > > > think of a better name), which when set to true will return what is
> > > > prefetched instead of throwing it away on calling *poll()*. Since
> this
> > is
> > > > amount of data that is bounded by the maximum size of the prefetch,
> we
> > > can
> > > > control what is the most amount of records returned. The client of
> the
> > > > consumer API can then be responsible for keeping that data around and
> > use
> > > > it when appropriate (i.e. when demand is present)
> > > >
> > > > 2. Introduce a facility to pass in a buffer into which the prefetched
> > > > records are drained when poll is called and paused partitions have
> some
> > > > prefetched records

Build failed in Jenkins: kafka-trunk-jdk8 #3146

2018-10-17 Thread Apache Jenkins Server
See 


Changes:

[me] KAFKA-7080 and KAFKA-7222: Cleanup overlapping KIP changes Part 2

--
[...truncated 2.83 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompare

Jenkins build is back to normal : kafka-trunk-jdk11 #40

2018-10-17 Thread Apache Jenkins Server
See 




[DISCUSS] KIP-383 Pluggable interface for SSL Factory

2018-10-17 Thread Pellerin, Clement
I would like feedback on this proposal to make it possible to replace 
SslFactory with a custom implementation.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-383%3A++Pluggable+interface+for+SSL+Factory



Re: Throwing away prefetched records optimisation.

2018-10-17 Thread Zahari Dichev
Hi there Ryanne,

Thanks for the response ! There is most likely quite a lot that I am
missing here, but after I read the docs, it seems to me that the
pause/resume API has been provided with the very purpose of implementing
bespoke flow control. That being said, I see it as quite natural to be able
to pause and resume as needed without facing the problems outlined in
my previous email. So if we are going totally wrong about this and using
the pause/resume the wrong way, feel free to elaborate. I am really not
trying to argue my case here, just genuinely attempting to understand what
can be done on our end to improve the Akka streams integration.. Thanks in
advance :)

Zahari

On Wed, Oct 17, 2018 at 5:49 PM Ryanne Dolan  wrote:

> Zahari,
>
> It sounds to me like this problem is due to Akka attempting to implement
> additional backpressure on top of the Consumer API. I'd suggest they not do
> that, and then this problem goes away.
>
> Ryanne
>
> On Wed, Oct 17, 2018 at 7:35 AM Zahari Dichev 
> wrote:
>
> > Hi there,
> >
> > Are there any opinions on the matter described in my previous email? I
> > think this is quite important when it comes to implementing any non
> trivial
> > functionality that relies on pause/resume. Of course if I am mistaken,
> feel
> > free to elaborate.
> >
> > Thanks,
> > Zahari
> >
> > On Tue, Oct 16, 2018 at 10:29 AM Zahari Dichev 
> > wrote:
> >
> > > Hi there Kafka developers,
> > >
> > > I am currently trying to find a solution to an issue that has been
> > > manifesting itself in the Akka streams implementation of the Kafka
> > > connector. When it comes to consuming messages, the implementation
> relies
> > > heavily on the fact that we can pause and resume partitions. In some
> > > situations when a single consumer instance is shared among several
> > streams,
> > > we might end up with frequently pausing and unpausing a set of topic
> > > partitions, which is the main facility that allows us to implement back
> > > pressure. This however has certain disadvantages, especially when there
> > are
> > > two consumers that differ in terms of processing speed.
> > >
> > > To articulate the issue more clearly, imagine that a consumer maintains
> > > assignments for two topic partitions *TP1* and *TP2*. This consumer is
> > > shared by two streams - S1 and S2. So effectively when we have demand
> > from
> > > only one of the streams - *S1*, we will pause one of the topic
> partitions
> > > *TP2* and call *poll()* on the consumer to only retrieve the records
> for
> > > the demanded topic partition - *TP1*. The result of that is all the
> > > records that have been prefetched for *TP2* are now thrown away by the
> > > fetcher ("*Not returning fetched records for assigned partition TP2
> since
> > > it is no longer fetchable"*). If we extrapolate that to multiple
> streams
> > > sharing the same consumer, we might quickly end up in a situation where
> > we
> > > throw prefetched data quite often. This does not seem like the most
> > > efficient approach and in fact produces quite a lot of overlapping
> fetch
> > > requests as illustrated in the following issue:
> > >
> > > https://github.com/akka/alpakka-kafka/issues/549
> > >
> > > I am writing this email to get some initial opinion on a KIP I was
> > > thinking about. What if we give the clients of the Consumer API a bit
> > more
> > > control of what to do with this prefetched data. Two options I am
> > wondering
> > > about:
> > >
> > > 1. Introduce a configuration setting, such as*
> > > "return-prefetched-data-for-paused-topic-partitions = false"* (have to
> > > think of a better name), which when set to true will return what is
> > > prefetched instead of throwing it away on calling *poll()*. Since this
> is
> > > amount of data that is bounded by the maximum size of the prefetch, we
> > can
> > > control what is the most amount of records returned. The client of the
> > > consumer API can then be responsible for keeping that data around and
> use
> > > it when appropriate (i.e. when demand is present)
> > >
> > > 2. Introduce a facility to pass in a buffer into which the prefetched
> > > records are drained when poll is called and paused partitions have some
> > > prefetched records.
> > >
> > > Any opinions on the matter are welcome. Thanks a lot !
> > >
> > > Zahari Dichev
> > >
> >
>


Re: [ANNOUNCE] New Committer: Manikumar Reddy

2018-10-17 Thread Ray Chiang

Congrats Mani.

-Ray

On 10/17/18 10:19 AM, Harsha wrote:

Congrats Mani!! Very well deserved.

--Harsha
On Tue, Oct 16, 2018, at 5:20 PM, Attila Sasvari wrote:

Congratulations Manikumar! Keep up the good work.

On Tue, Oct 16, 2018 at 12:30 AM Jungtaek Lim  wrote:


Congrats Mani!
On Tue, 16 Oct 2018 at 1:45 PM Abhimanyu Nagrath <
abhimanyunagr...@gmail.com>
wrote:


Congratulations Manikumar

On Tue, Oct 16, 2018 at 10:09 AM Satish Duggana <

satish.dugg...@gmail.com>

wrote:


Congratulations Mani!


On Fri, Oct 12, 2018 at 9:41 PM Colin McCabe 

wrote:

Congratulations, Manikumar!  Well done.

best,
Colin


On Fri, Oct 12, 2018, at 01:25, Edoardo Comar wrote:

Well done Manikumar !
--

Edoardo Comar

IBM Event Streams
IBM UK Ltd, Hursley Park, SO21 2JN




From:   "Matthias J. Sax" 
To: dev 
Cc: users 
Date:   11/10/2018 23:41
Subject:Re: [ANNOUNCE] New Committer: Manikumar Reddy



Congrats!


On 10/11/18 2:31 PM, Yishun Guan wrote:

Congrats Manikumar!
On Thu, Oct 11, 2018 at 1:20 PM Sönke Liebau
 wrote:

Great news, congratulations Manikumar!!

On Thu, Oct 11, 2018 at 9:08 PM Vahid Hashemian



wrote:


Congrats Manikumar!

On Thu, Oct 11, 2018 at 11:49 AM Ryanne Dolan <

ryannedo...@gmail.com>

wrote:


Bravo!

On Thu, Oct 11, 2018 at 1:48 PM Ismael Juma <

ism...@juma.me.uk>

wrote:

Congratulations Manikumar! Thanks for your continued

contributions.

Ismael

On Thu, Oct 11, 2018 at 10:39 AM Jason Gustafson



wrote:


Hi all,

The PMC for Apache Kafka has invited Manikumar Reddy as a

committer

and

we

are
pleased to announce that he has accepted!

Manikumar has contributed 134 commits including significant

work to

add

support for delegation tokens in Kafka:

KIP-48:



https://cwiki.apache.org/confluence/display/KAFKA/KIP-48+Delegation+token+support+for+Kafka

KIP-249
<

https://cwiki.apache.org/confluence/display/KAFKA/KIP-48+Delegation+token+support+for+KafkaKIP-249

:



https://cwiki.apache.org/confluence/display/KAFKA/KIP-249%3A+Add+Delegation+Token+Operations+to+KafkaAdminClient

He has broad experience working with many of the core

components in

Kafka

and he has reviewed over 80 PRs. He has also made huge

progress

addressing

some of our technical debt.

We appreciate the contributions and we are looking forward

to

more.

Congrats Manikumar!

Jason, on behalf of the Apache Kafka PMC



--
Sönke Liebau
Partner
Tel. +49 179 7940878
OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel -

Germany

[attachment "signature.asc" deleted by Edoardo Comar/UK/IBM]


Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with

number

741598.
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire

PO6

3AU



--
--
Attila Sasvari
Software Engineer





Re: [DISCUSS] KIP-382: MirrorMaker 2.0

2018-10-17 Thread Ryanne Dolan
Harsha, yes I can do that. I'll update the KIP accordingly, thanks.

Ryanne

On Wed, Oct 17, 2018 at 12:18 PM Harsha  wrote:

> Hi Ryanne,
>Thanks for the KIP. I am also curious about why not use the
> uReplicator design as the foundation given it alreadys resolves some of the
> fundamental issues in current MIrrorMaker, updating the confifgs on the fly
> and running the mirror maker agents in a worker model which can deployed in
> mesos or container orchestrations.  If possible can you document in the
> rejected alternatives what are missing parts that made you to consider a
> new design from ground up.
>
> Thanks,
> Harsha
>
> On Wed, Oct 17, 2018, at 8:34 AM, Ryanne Dolan wrote:
> > Jan, these are two separate issues.
> >
> > 1) consumer coordination should not, ideally, involve unreliable or slow
> > connections. Naively, a KafkaSourceConnector would coordinate via the
> > source cluster. We can do better than this, but I'm deferring this
> > optimization for now.
> >
> > 2) exactly-once between two clusters is mind-bending. But keep in mind
> that
> > transactions are managed by the producer, not the consumer. In fact, it's
> > the producer that requests that offsets be committed for the current
> > transaction. Obviously, these offsets are committed in whatever cluster
> the
> > producer is sending to.
> >
> > These two issues are closely related. They are both resolved by not
> > coordinating or committing via the source cluster. And in fact, this is
> the
> > general model of SourceConnectors anyway, since most SourceConnectors
> > _only_ have a destination cluster.
> >
> > If there is a lot of interest here, I can expound further on this aspect
> of
> > MM2, but again I think this is premature until this first KIP is
> approved.
> > I intend to address each of these in separate KIPs following this one.
> >
> > Ryanne
> >
> > On Wed, Oct 17, 2018 at 7:09 AM Jan Filipiak 
> > wrote:
> >
> > > This is not a performance optimisation. Its a fundamental design
> choice.
> > >
> > >
> > > I never really took a look how streams does exactly once. (its a trap
> > > anyways and you usually can deal with at least once donwstream pretty
> > > easy). But I am very certain its not gonna get somewhere if offset
> > > commit and record produce cluster are not the same.
> > >
> > > Pretty sure without this _design choice_ you can skip on that exactly
> > > once already
> > >
> > > Best Jan
> > >
> > > On 16.10.2018 18:16, Ryanne Dolan wrote:
> > > >  >  But one big obstacle in this was
> > > > always that group coordination happened on the source cluster.
> > > >
> > > > Jan, thank you for bringing up this issue with legacy MirrorMaker. I
> > > > totally agree with you. This is one of several problems with
> MirrorMaker
> > > > I intend to solve in MM2, and I already have a design and prototype
> that
> > > > solves this and related issues. But as you pointed out, this KIP is
> > > > already rather complex, and I want to focus on the core feature set
> > > > rather than performance optimizations for now. If we can agree on
> what
> > > > MM2 looks like, it will be very easy to agree to improve its
> performance
> > > > and reliability.
> > > >
> > > > That said, I look forward to your support on a subsequent KIP that
> > > > addresses consumer coordination and rebalance issues. Stay tuned!
> > > >
> > > > Ryanne
> > > >
> > > > On Tue, Oct 16, 2018 at 6:58 AM Jan Filipiak <
> jan.filip...@trivago.com
> > > > > wrote:
> > > >
> > > > Hi,
> > > >
> > > > Currently MirrorMaker is usually run collocated with the target
> > > > cluster.
> > > > This is all nice and good. But one big obstacle in this was
> > > > always that group coordination happened on the source cluster. So
> > > when
> > > > then network was congested, you sometimes loose group membership
> and
> > > > have to rebalance and all this.
> > > >
> > > > So one big request from we would be the support of having
> > > coordination
> > > > cluster != source cluster.
> > > >
> > > > I would generally say a LAN is better than a WAN for doing group
> > > > coordinaton and there is no reason we couldn't have a group
> consuming
> > > > topics from a different cluster and committing offsets to another
> > > > one right?
> > > >
> > > > Other than that. It feels like the KIP has too much features
> where
> > > many
> > > > of them are not really wanted and counter productive but I will
> just
> > > > wait and see how the discussion goes.
> > > >
> > > > Best Jan
> > > >
> > > >
> > > > On 15.10.2018 18:16, Ryanne Dolan wrote:
> > > >  > Hey y'all!
> > > >  >
> > > >  > Please take a look at KIP-382:
> > > >  >
> > > >  >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0
> > > >  >
> > > >  > Thanks for your feedback and support.
> > > >  >
> > > >  > Ryanne
> > > > 

Re: [ANNOUNCE] New Committer: Manikumar Reddy

2018-10-17 Thread Harsha
Congrats Mani!! Very well deserved.

--Harsha
On Tue, Oct 16, 2018, at 5:20 PM, Attila Sasvari wrote:
> Congratulations Manikumar! Keep up the good work.
> 
> On Tue, Oct 16, 2018 at 12:30 AM Jungtaek Lim  wrote:
> 
> > Congrats Mani!
> > On Tue, 16 Oct 2018 at 1:45 PM Abhimanyu Nagrath <
> > abhimanyunagr...@gmail.com>
> > wrote:
> >
> > > Congratulations Manikumar
> > >
> > > On Tue, Oct 16, 2018 at 10:09 AM Satish Duggana <
> > satish.dugg...@gmail.com>
> > > wrote:
> > >
> > > > Congratulations Mani!
> > > >
> > > >
> > > > On Fri, Oct 12, 2018 at 9:41 PM Colin McCabe 
> > wrote:
> > > > >
> > > > > Congratulations, Manikumar!  Well done.
> > > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > >
> > > > > On Fri, Oct 12, 2018, at 01:25, Edoardo Comar wrote:
> > > > > > Well done Manikumar !
> > > > > > --
> > > > > >
> > > > > > Edoardo Comar
> > > > > >
> > > > > > IBM Event Streams
> > > > > > IBM UK Ltd, Hursley Park, SO21 2JN
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > From:   "Matthias J. Sax" 
> > > > > > To: dev 
> > > > > > Cc: users 
> > > > > > Date:   11/10/2018 23:41
> > > > > > Subject:Re: [ANNOUNCE] New Committer: Manikumar Reddy
> > > > > >
> > > > > >
> > > > > >
> > > > > > Congrats!
> > > > > >
> > > > > >
> > > > > > On 10/11/18 2:31 PM, Yishun Guan wrote:
> > > > > > > Congrats Manikumar!
> > > > > > > On Thu, Oct 11, 2018 at 1:20 PM Sönke Liebau
> > > > > > >  wrote:
> > > > > > >>
> > > > > > >> Great news, congratulations Manikumar!!
> > > > > > >>
> > > > > > >> On Thu, Oct 11, 2018 at 9:08 PM Vahid Hashemian
> > > > > > 
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >>> Congrats Manikumar!
> > > > > > >>>
> > > > > > >>> On Thu, Oct 11, 2018 at 11:49 AM Ryanne Dolan <
> > > > ryannedo...@gmail.com>
> > > > > > >>> wrote:
> > > > > > >>>
> > > > > >  Bravo!
> > > > > > 
> > > > > >  On Thu, Oct 11, 2018 at 1:48 PM Ismael Juma <
> > ism...@juma.me.uk>
> > > > > > wrote:
> > > > > > 
> > > > > > > Congratulations Manikumar! Thanks for your continued
> > > > contributions.
> > > > > > >
> > > > > > > Ismael
> > > > > > >
> > > > > > > On Thu, Oct 11, 2018 at 10:39 AM Jason Gustafson
> > > > > > 
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hi all,
> > > > > > >>
> > > > > > >> The PMC for Apache Kafka has invited Manikumar Reddy as a
> > > > committer
> > > > > > >>> and
> > > > > > > we
> > > > > > >> are
> > > > > > >> pleased to announce that he has accepted!
> > > > > > >>
> > > > > > >> Manikumar has contributed 134 commits including significant
> > > > work to
> > > > > > >>> add
> > > > > > >> support for delegation tokens in Kafka:
> > > > > > >>
> > > > > > >> KIP-48:
> > > > > > >>
> > > > > > >>
> > > > > > >
> > > > > > 
> > > > > > >>>
> > > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-48+Delegation+token+support+for+Kafka
> > > > > >
> > > > > > >> KIP-249
> > > > > > >> <
> > > > > > >
> > > > > > 
> > > > > > >>>
> > > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-48+Delegation+token+support+for+KafkaKIP-249
> > > > > >
> > > > > > >>
> > > > > > >> :
> > > > > > >>
> > > > > > >>
> > > > > > >
> > > > > > 
> > > > > > >>>
> > > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-249%3A+Add+Delegation+Token+Operations+to+KafkaAdminClient
> > > > > >
> > > > > > >>
> > > > > > >> He has broad experience working with many of the core
> > > > components in
> > > > > >  Kafka
> > > > > > >> and he has reviewed over 80 PRs. He has also made huge
> > > progress
> > > > > > > addressing
> > > > > > >> some of our technical debt.
> > > > > > >>
> > > > > > >> We appreciate the contributions and we are looking forward
> > to
> > > > more.
> > > > > > >> Congrats Manikumar!
> > > > > > >>
> > > > > > >> Jason, on behalf of the Apache Kafka PMC
> > > > > > >>
> > > > > > >
> > > > > > 
> > > > > > >>>
> > > > > > >>
> > > > > > >>
> > > > > > >> --
> > > > > > >> Sönke Liebau
> > > > > > >> Partner
> > > > > > >> Tel. +49 179 7940878
> > > > > > >> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel -
> > > > Germany
> > > > > >
> > > > > > [attachment "signature.asc" deleted by Edoardo Comar/UK/IBM]
> > > > > >
> > > > > >
> > > > > > Unless stated otherwise above:
> > > > > > IBM United Kingdom Limited - Registered in England and Wales with
> > > > number
> > > > > > 741598.
> > > > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
> > > PO6
> > > > 3AU
> > > >
> > >
> >
> 
> 
> -- 
> -- 
> Attila Sasvari
> Software Engineer
> 


Re: [DISCUSS] KIP-382: MirrorMaker 2.0

2018-10-17 Thread Harsha
Hi Ryanne,
   Thanks for the KIP. I am also curious about why not use the 
uReplicator design as the foundation given it alreadys resolves some of the 
fundamental issues in current MIrrorMaker, updating the confifgs on the fly and 
running the mirror maker agents in a worker model which can deployed in mesos 
or container orchestrations.  If possible can you document in the rejected 
alternatives what are missing parts that made you to consider a new design from 
ground up.

Thanks,
Harsha

On Wed, Oct 17, 2018, at 8:34 AM, Ryanne Dolan wrote:
> Jan, these are two separate issues.
> 
> 1) consumer coordination should not, ideally, involve unreliable or slow
> connections. Naively, a KafkaSourceConnector would coordinate via the
> source cluster. We can do better than this, but I'm deferring this
> optimization for now.
> 
> 2) exactly-once between two clusters is mind-bending. But keep in mind that
> transactions are managed by the producer, not the consumer. In fact, it's
> the producer that requests that offsets be committed for the current
> transaction. Obviously, these offsets are committed in whatever cluster the
> producer is sending to.
> 
> These two issues are closely related. They are both resolved by not
> coordinating or committing via the source cluster. And in fact, this is the
> general model of SourceConnectors anyway, since most SourceConnectors
> _only_ have a destination cluster.
> 
> If there is a lot of interest here, I can expound further on this aspect of
> MM2, but again I think this is premature until this first KIP is approved.
> I intend to address each of these in separate KIPs following this one.
> 
> Ryanne
> 
> On Wed, Oct 17, 2018 at 7:09 AM Jan Filipiak 
> wrote:
> 
> > This is not a performance optimisation. Its a fundamental design choice.
> >
> >
> > I never really took a look how streams does exactly once. (its a trap
> > anyways and you usually can deal with at least once donwstream pretty
> > easy). But I am very certain its not gonna get somewhere if offset
> > commit and record produce cluster are not the same.
> >
> > Pretty sure without this _design choice_ you can skip on that exactly
> > once already
> >
> > Best Jan
> >
> > On 16.10.2018 18:16, Ryanne Dolan wrote:
> > >  >  But one big obstacle in this was
> > > always that group coordination happened on the source cluster.
> > >
> > > Jan, thank you for bringing up this issue with legacy MirrorMaker. I
> > > totally agree with you. This is one of several problems with MirrorMaker
> > > I intend to solve in MM2, and I already have a design and prototype that
> > > solves this and related issues. But as you pointed out, this KIP is
> > > already rather complex, and I want to focus on the core feature set
> > > rather than performance optimizations for now. If we can agree on what
> > > MM2 looks like, it will be very easy to agree to improve its performance
> > > and reliability.
> > >
> > > That said, I look forward to your support on a subsequent KIP that
> > > addresses consumer coordination and rebalance issues. Stay tuned!
> > >
> > > Ryanne
> > >
> > > On Tue, Oct 16, 2018 at 6:58 AM Jan Filipiak  > > > wrote:
> > >
> > > Hi,
> > >
> > > Currently MirrorMaker is usually run collocated with the target
> > > cluster.
> > > This is all nice and good. But one big obstacle in this was
> > > always that group coordination happened on the source cluster. So
> > when
> > > then network was congested, you sometimes loose group membership and
> > > have to rebalance and all this.
> > >
> > > So one big request from we would be the support of having
> > coordination
> > > cluster != source cluster.
> > >
> > > I would generally say a LAN is better than a WAN for doing group
> > > coordinaton and there is no reason we couldn't have a group consuming
> > > topics from a different cluster and committing offsets to another
> > > one right?
> > >
> > > Other than that. It feels like the KIP has too much features where
> > many
> > > of them are not really wanted and counter productive but I will just
> > > wait and see how the discussion goes.
> > >
> > > Best Jan
> > >
> > >
> > > On 15.10.2018 18:16, Ryanne Dolan wrote:
> > >  > Hey y'all!
> > >  >
> > >  > Please take a look at KIP-382:
> > >  >
> > >  >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0
> > >  >
> > >  > Thanks for your feedback and support.
> > >  >
> > >  > Ryanne
> > >  >
> > >
> >


Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

2018-10-17 Thread Konstantine Karantasis
Indeed, implying that the flushing and acknowledgement of records happens
in order reveals an implementation detail that is not required by the
interface. Strictly speaking if that was required then you'd only need a
single record as an argument to offstesFlushedAndAcked to indicate up to
which record from the list of polled records the task managed to flush and
ack. But this wouldn't be elegant nor is certain that the task is aware at
the point when offstesFlushedAndAcked is called what was the order of the
records that it returned with poll.

I don't feel strong about it and I acknowledge the symmetry with poll. I
made this comment since we are passing a complete list of flushed records
and retaining the order seemed to restrict future implementations. But
probably makes things simpler for source connectors too.

Regarding the text sections I was mainly referring to importing information
from the jira discussion to the KIP. And my intention is the same: that
people will understand the improvement and the motivation just by reading
the KIP, without having to go over the code changes or the jira comments.
I'll let you to it and let's see where we are with implementation later in
the current KIP cycle. I find it a useful improvement, I hope it makes it
in.

-Konstantine



On Wed, Oct 17, 2018 at 5:55 AM Per Steffensen  wrote:

> The fix will definitely "facilitate" the source-connectors I have
> written. It will make them work 100% correctly. Today they dont.
>
> Fine for me to change "Acknowledged" to "Acked" in the method-naming.
>
> Not sure I would like to give a Collection instead of a List as the
> argument to offstesFlushedAndAck(nowledg)ed. poll() returns a List
> (ordered records), the records are handled in that order and I would
> like to hand the records back in that order as well. Handling back a
> Collection may indicate that order does not matter. Besides that it is
> likely to help the implementation of offstesFlushedAndAck(nowledg)ed
> that you get records back in order.
>
> Regarding adding stuff to the "rejected approaches" and "motivation"
> sections of the KIP, I am not sure I will get the time anytime soon.
> Please feel free to help adding this to the KIP. This way we also have
> at least two persons who really understands what this is about. Some
> times you only really understand what something is about, when you are
> forced to write about it (at least that is my excuse ).
>
> Regards, Per Steffensen
>
> On 16/10/2018 05.57, Konstantine Karantasis wrote:
> > This is a significant improvement to the semantics of source connectors.
> > I'm expecting that it will facilitate source connector implementations
> and
> > even enrich the application uses cases that we see. I only have a few
> minor
> > suggestions at the moment.
> >
> > I believe that Acked is a common abbreviation for Acknowledged and that
> we
> > could use it in this context. And this suggestion is coming from a big
> > proponent of complete words in variables and method names. Thus, feel
> free
> > to consider 'offsetsFlushedAndAcked' as well as 'recordSentAndAcked'.
> Since
> > this is a public interface, I'd also make the implementation specific
> > comment that a Collection might be more versatile than
> > List as argument in offsetsFlushedAndAcknowledged.
> >
> > The rejected approaches section could use some of the material in the
> > original jira ticket, which is pretty insightful in order to understand
> how
> > we arrived to this KIP. For example, I think it'd be useful to state
> > briefly why the 'commit' method is not getting removed completely but
> it's
> > substituted with 'offsetsFlushedAndAcked'. Also useful I believe it would
> > be to include in the motivation section some info related to why and how
> a
> > source system could use these method calls to safely recycle data that
> have
> > been surely imported to Kafka. I see this type of use cases having an
> > increased importance as Kafka is used more and more as the source of
> truth
> > and persistence layer for an increasing number of applications.
> >
> > These suggestions, although they are not strictly required in order to
> move
> > forward with this improvement, I believe can help a lot to understand the
> > context of this proposed changed, without having to read the complete
> > history in the jira ticket.
> >
> > Thanks for the KIP Per!
> >
> > -Konstantine
> >
> >
> > On Wed, Oct 10, 2018 at 6:50 AM Per Steffensen 
> wrote:
> >
> >> Please help make the proposed changes in KIP-381 become reality. Please
> >> comment.
> >>
> >> KIP:
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-381%3A+Connect%3A+Tell+about+records+that+had+their+offsets+flushed+in+callback
> >>
> >> JIRA: https://issues.apache.org/jira/browse/KAFKA-5716
> >>
> >> PR: https://github.com/apache/kafka/pull/3872
> >>
> >> Thanks!
> >>
> >>
> >>
>
>


Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

2018-10-17 Thread Ryanne Dolan
> this does not guarantee that the
> offsets of R have been written/flushed at the next commit() call

True, but does it matter? So long as you can guarantee the records are
delivered to the downstream Kafka cluster, it shouldn't matter if they have
been committed or not.

The worst that can happen is that the worker gets bounced and asks for the
same records a second time. Even if those records have since been dropped
from the upstream data source, it doesn't matter cuz you know they were
previously delivered successfully.

I do agree that commit() and commitRecord() are poorly named, but I don't
think there's anything fundamentally missing from the API.

Ryanne

On Wed, Oct 17, 2018 at 10:24 AM Per Steffensen  wrote:

> On 17/10/2018 16.43, Ryanne Dolan wrote:
> > I see, thanks.
> > On the other hand, the commitRecord() callback provides the functionality
> > you require in this case. In commitRecord() your SourceTask can track the
> > offsets of records that have been ack'd by the producer client, and then
> in
> > commit() you can be sure that those offsets have been flushed.
> That is the trick I am currently using - more or less.
> But unfortunately it does not work 100% either. It is possible that
> commitRecord() is called with a record R, and then commit() is called
> after that, without the offsets of R having been written/flushed. The
> call to commitRecord() means that the "actual data" of R has been
> send/acknowledged, but unfortunately this does not guarantee that the
> offsets of R have been written/flushed at the next commit() call
>
>


[jira] [Created] (KAFKA-7517) Add a minimum retention.bytes config value

2018-10-17 Thread Stanislav Kozlovski (JIRA)
Stanislav Kozlovski created KAFKA-7517:
--

 Summary: Add a minimum retention.bytes config value
 Key: KAFKA-7517
 URL: https://issues.apache.org/jira/browse/KAFKA-7517
 Project: Kafka
  Issue Type: Improvement
Reporter: Stanislav Kozlovski
Assignee: Stanislav Kozlovski


Some configs like `log.retention.bytes` make no sense to have values of 0 - 
every log has a size of 0 upon creation and therefore every log should be 
deleted in this case.

It would be useful to have some sort of guard, as limited as it could be, to 
help users not shoot themselves in the foot as easily (either by manual 
misconfiguration or some external tool (e.g k8s configmap
))



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-382: MirrorMaker 2.0

2018-10-17 Thread Ryanne Dolan
Jan, these are two separate issues.

1) consumer coordination should not, ideally, involve unreliable or slow
connections. Naively, a KafkaSourceConnector would coordinate via the
source cluster. We can do better than this, but I'm deferring this
optimization for now.

2) exactly-once between two clusters is mind-bending. But keep in mind that
transactions are managed by the producer, not the consumer. In fact, it's
the producer that requests that offsets be committed for the current
transaction. Obviously, these offsets are committed in whatever cluster the
producer is sending to.

These two issues are closely related. They are both resolved by not
coordinating or committing via the source cluster. And in fact, this is the
general model of SourceConnectors anyway, since most SourceConnectors
_only_ have a destination cluster.

If there is a lot of interest here, I can expound further on this aspect of
MM2, but again I think this is premature until this first KIP is approved.
I intend to address each of these in separate KIPs following this one.

Ryanne

On Wed, Oct 17, 2018 at 7:09 AM Jan Filipiak 
wrote:

> This is not a performance optimisation. Its a fundamental design choice.
>
>
> I never really took a look how streams does exactly once. (its a trap
> anyways and you usually can deal with at least once donwstream pretty
> easy). But I am very certain its not gonna get somewhere if offset
> commit and record produce cluster are not the same.
>
> Pretty sure without this _design choice_ you can skip on that exactly
> once already
>
> Best Jan
>
> On 16.10.2018 18:16, Ryanne Dolan wrote:
> >  >  But one big obstacle in this was
> > always that group coordination happened on the source cluster.
> >
> > Jan, thank you for bringing up this issue with legacy MirrorMaker. I
> > totally agree with you. This is one of several problems with MirrorMaker
> > I intend to solve in MM2, and I already have a design and prototype that
> > solves this and related issues. But as you pointed out, this KIP is
> > already rather complex, and I want to focus on the core feature set
> > rather than performance optimizations for now. If we can agree on what
> > MM2 looks like, it will be very easy to agree to improve its performance
> > and reliability.
> >
> > That said, I look forward to your support on a subsequent KIP that
> > addresses consumer coordination and rebalance issues. Stay tuned!
> >
> > Ryanne
> >
> > On Tue, Oct 16, 2018 at 6:58 AM Jan Filipiak  > > wrote:
> >
> > Hi,
> >
> > Currently MirrorMaker is usually run collocated with the target
> > cluster.
> > This is all nice and good. But one big obstacle in this was
> > always that group coordination happened on the source cluster. So
> when
> > then network was congested, you sometimes loose group membership and
> > have to rebalance and all this.
> >
> > So one big request from we would be the support of having
> coordination
> > cluster != source cluster.
> >
> > I would generally say a LAN is better than a WAN for doing group
> > coordinaton and there is no reason we couldn't have a group consuming
> > topics from a different cluster and committing offsets to another
> > one right?
> >
> > Other than that. It feels like the KIP has too much features where
> many
> > of them are not really wanted and counter productive but I will just
> > wait and see how the discussion goes.
> >
> > Best Jan
> >
> >
> > On 15.10.2018 18:16, Ryanne Dolan wrote:
> >  > Hey y'all!
> >  >
> >  > Please take a look at KIP-382:
> >  >
> >  >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0
> >  >
> >  > Thanks for your feedback and support.
> >  >
> >  > Ryanne
> >  >
> >
>


Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

2018-10-17 Thread Per Steffensen

On 17/10/2018 16.43, Ryanne Dolan wrote:

I see, thanks.
On the other hand, the commitRecord() callback provides the functionality
you require in this case. In commitRecord() your SourceTask can track the
offsets of records that have been ack'd by the producer client, and then in
commit() you can be sure that those offsets have been flushed.

That is the trick I am currently using - more or less.
But unfortunately it does not work 100% either. It is possible that 
commitRecord() is called with a record R, and then commit() is called 
after that, without the offsets of R having been written/flushed. The 
call to commitRecord() means that the "actual data" of R has been 
send/acknowledged, but unfortunately this does not guarantee that the 
offsets of R have been written/flushed at the next commit() call




Re: Throwing away prefetched records optimisation.

2018-10-17 Thread Ryanne Dolan
Zahari,

It sounds to me like this problem is due to Akka attempting to implement
additional backpressure on top of the Consumer API. I'd suggest they not do
that, and then this problem goes away.

Ryanne

On Wed, Oct 17, 2018 at 7:35 AM Zahari Dichev 
wrote:

> Hi there,
>
> Are there any opinions on the matter described in my previous email? I
> think this is quite important when it comes to implementing any non trivial
> functionality that relies on pause/resume. Of course if I am mistaken, feel
> free to elaborate.
>
> Thanks,
> Zahari
>
> On Tue, Oct 16, 2018 at 10:29 AM Zahari Dichev 
> wrote:
>
> > Hi there Kafka developers,
> >
> > I am currently trying to find a solution to an issue that has been
> > manifesting itself in the Akka streams implementation of the Kafka
> > connector. When it comes to consuming messages, the implementation relies
> > heavily on the fact that we can pause and resume partitions. In some
> > situations when a single consumer instance is shared among several
> streams,
> > we might end up with frequently pausing and unpausing a set of topic
> > partitions, which is the main facility that allows us to implement back
> > pressure. This however has certain disadvantages, especially when there
> are
> > two consumers that differ in terms of processing speed.
> >
> > To articulate the issue more clearly, imagine that a consumer maintains
> > assignments for two topic partitions *TP1* and *TP2*. This consumer is
> > shared by two streams - S1 and S2. So effectively when we have demand
> from
> > only one of the streams - *S1*, we will pause one of the topic partitions
> > *TP2* and call *poll()* on the consumer to only retrieve the records for
> > the demanded topic partition - *TP1*. The result of that is all the
> > records that have been prefetched for *TP2* are now thrown away by the
> > fetcher ("*Not returning fetched records for assigned partition TP2 since
> > it is no longer fetchable"*). If we extrapolate that to multiple streams
> > sharing the same consumer, we might quickly end up in a situation where
> we
> > throw prefetched data quite often. This does not seem like the most
> > efficient approach and in fact produces quite a lot of overlapping fetch
> > requests as illustrated in the following issue:
> >
> > https://github.com/akka/alpakka-kafka/issues/549
> >
> > I am writing this email to get some initial opinion on a KIP I was
> > thinking about. What if we give the clients of the Consumer API a bit
> more
> > control of what to do with this prefetched data. Two options I am
> wondering
> > about:
> >
> > 1. Introduce a configuration setting, such as*
> > "return-prefetched-data-for-paused-topic-partitions = false"* (have to
> > think of a better name), which when set to true will return what is
> > prefetched instead of throwing it away on calling *poll()*. Since this is
> > amount of data that is bounded by the maximum size of the prefetch, we
> can
> > control what is the most amount of records returned. The client of the
> > consumer API can then be responsible for keeping that data around and use
> > it when appropriate (i.e. when demand is present)
> >
> > 2. Introduce a facility to pass in a buffer into which the prefetched
> > records are drained when poll is called and paused partitions have some
> > prefetched records.
> >
> > Any opinions on the matter are welcome. Thanks a lot !
> >
> > Zahari Dichev
> >
>


Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

2018-10-17 Thread Ryanne Dolan
> There is no guarantee that the data in R has been sent/acknowledged
> to/by Kafka, nor that the offsets in R has been flushed to offset-store
(it
> is likely, though).

I see, thanks.

On the other hand, the commitRecord() callback provides the functionality
you require in this case. In commitRecord() your SourceTask can track the
offsets of records that have been ack'd by the producer client, and then in
commit() you can be sure that those offsets have been flushed.

I'm not opposed, however, to baking this into the framework and exposing a
new callback. Otherwise every correct SourceConnector would need to
implement similar logic.

Ryanne

On Wed, Oct 17, 2018 at 7:25 AM Per Steffensen  wrote:

> Lets use X for the the point in time where commit() is called. Lets use
> Rs(X) for the recorders returned by poll()s at time X.
> At time X, it is not necessarily true that all records in Rs(X) have been
> sent to Kafka (and acknowledged) and had their offsets flushed to
> offset-store.
>
> Example
> * Time X-1: poll() is called and one records R is returned
> * Time X: commit() is called. There is no guarantee that the data in R has
> been sent/acknowledged to/by Kafka, nor that the offsets in R has been
> flushed to offset-store (it is likely, though).
>
> Due to synchronization necessary, it is probably hard to make that
> guarantee, without reducing throughput significantly. But it is feasible to
> make the change that commit() is given (via argument) a list/collection of
> the records for which it is a guarantee. Thats what my current fix does
> (see PR).
>
>
> On 16/10/2018 19.33, Ryanne Dolan wrote:
>
> Steff,
>
> > Guess people have used it, assuming that all records that have been
> polled > at the time of callback to "commit", have also had their offsets
> committed. > But that is not true.
>
> (excerpt from KIP)
>
> The documentation for SourceTask.commit() reads:
>
> > Commit the offsets, up to the offsets that have been returned by {@link
> #poll()}. This > method should block until the commit is complete.
>
> I'm confused by these seemingly contradictory statements. My assumption
> (as you say) is that all records returned by poll() will have been
> committed before commit() is invoked by the framework. Is that not the case?
>
> Ryanne
>
> On Wed, Oct 10, 2018 at 8:50 AM Per Steffensen  wrote:
>
>> Please help make the proposed changes in KIP-381 become reality. Please
>> comment.
>>
>> KIP:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-381%3A+Connect%3A+Tell+about+records+that+had+their+offsets+flushed+in+callback
>>
>> JIRA: https://issues.apache.org/jira/browse/KAFKA-5716
>>
>> PR: https://github.com/apache/kafka/pull/3872
>>
>> Thanks!
>>
>>
>>
>


Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

2018-10-17 Thread Per Steffensen
The fix will definitely "facilitate" the source-connectors I have 
written. It will make them work 100% correctly. Today they dont.


Fine for me to change "Acknowledged" to "Acked" in the method-naming.

Not sure I would like to give a Collection instead of a List as the 
argument to offstesFlushedAndAck(nowledg)ed. poll() returns a List 
(ordered records), the records are handled in that order and I would 
like to hand the records back in that order as well. Handling back a 
Collection may indicate that order does not matter. Besides that it is 
likely to help the implementation of offstesFlushedAndAck(nowledg)ed 
that you get records back in order.


Regarding adding stuff to the "rejected approaches" and "motivation" 
sections of the KIP, I am not sure I will get the time anytime soon. 
Please feel free to help adding this to the KIP. This way we also have 
at least two persons who really understands what this is about. Some 
times you only really understand what something is about, when you are 
forced to write about it (at least that is my excuse ).


Regards, Per Steffensen

On 16/10/2018 05.57, Konstantine Karantasis wrote:

This is a significant improvement to the semantics of source connectors.
I'm expecting that it will facilitate source connector implementations and
even enrich the application uses cases that we see. I only have a few minor
suggestions at the moment.

I believe that Acked is a common abbreviation for Acknowledged and that we
could use it in this context. And this suggestion is coming from a big
proponent of complete words in variables and method names. Thus, feel free
to consider 'offsetsFlushedAndAcked' as well as 'recordSentAndAcked'. Since
this is a public interface, I'd also make the implementation specific
comment that a Collection might be more versatile than
List as argument in offsetsFlushedAndAcknowledged.

The rejected approaches section could use some of the material in the
original jira ticket, which is pretty insightful in order to understand how
we arrived to this KIP. For example, I think it'd be useful to state
briefly why the 'commit' method is not getting removed completely but it's
substituted with 'offsetsFlushedAndAcked'. Also useful I believe it would
be to include in the motivation section some info related to why and how a
source system could use these method calls to safely recycle data that have
been surely imported to Kafka. I see this type of use cases having an
increased importance as Kafka is used more and more as the source of truth
and persistence layer for an increasing number of applications.

These suggestions, although they are not strictly required in order to move
forward with this improvement, I believe can help a lot to understand the
context of this proposed changed, without having to read the complete
history in the jira ticket.

Thanks for the KIP Per!

-Konstantine


On Wed, Oct 10, 2018 at 6:50 AM Per Steffensen  wrote:


Please help make the proposed changes in KIP-381 become reality. Please
comment.

KIP:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-381%3A+Connect%3A+Tell+about+records+that+had+their+offsets+flushed+in+callback

JIRA: https://issues.apache.org/jira/browse/KAFKA-5716

PR: https://github.com/apache/kafka/pull/3872

Thanks!







[jira] [Created] (KAFKA-7516) Client (Producer and/or Consumer) crashes during initialization on Android

2018-10-17 Thread alex kamenetsky (JIRA)
alex kamenetsky created KAFKA-7516:
--

 Summary: Client (Producer and/or Consumer) crashes during 
initialization on Android
 Key: KAFKA-7516
 URL: https://issues.apache.org/jira/browse/KAFKA-7516
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 2.0.0
Reporter: alex kamenetsky
 Fix For: 2.0.1


Attempt to incorporate kafka client (both Producer and Consumer) on Android 
Dalvik fails during initialization stage: Dalvik doesn't support 
javax.management (JMX).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Throwing away prefetched records optimisation.

2018-10-17 Thread Zahari Dichev
Hi there,

Are there any opinions on the matter described in my previous email? I
think this is quite important when it comes to implementing any non trivial
functionality that relies on pause/resume. Of course if I am mistaken, feel
free to elaborate.

Thanks,
Zahari

On Tue, Oct 16, 2018 at 10:29 AM Zahari Dichev 
wrote:

> Hi there Kafka developers,
>
> I am currently trying to find a solution to an issue that has been
> manifesting itself in the Akka streams implementation of the Kafka
> connector. When it comes to consuming messages, the implementation relies
> heavily on the fact that we can pause and resume partitions. In some
> situations when a single consumer instance is shared among several streams,
> we might end up with frequently pausing and unpausing a set of topic
> partitions, which is the main facility that allows us to implement back
> pressure. This however has certain disadvantages, especially when there are
> two consumers that differ in terms of processing speed.
>
> To articulate the issue more clearly, imagine that a consumer maintains
> assignments for two topic partitions *TP1* and *TP2*. This consumer is
> shared by two streams - S1 and S2. So effectively when we have demand from
> only one of the streams - *S1*, we will pause one of the topic partitions
> *TP2* and call *poll()* on the consumer to only retrieve the records for
> the demanded topic partition - *TP1*. The result of that is all the
> records that have been prefetched for *TP2* are now thrown away by the
> fetcher ("*Not returning fetched records for assigned partition TP2 since
> it is no longer fetchable"*). If we extrapolate that to multiple streams
> sharing the same consumer, we might quickly end up in a situation where we
> throw prefetched data quite often. This does not seem like the most
> efficient approach and in fact produces quite a lot of overlapping fetch
> requests as illustrated in the following issue:
>
> https://github.com/akka/alpakka-kafka/issues/549
>
> I am writing this email to get some initial opinion on a KIP I was
> thinking about. What if we give the clients of the Consumer API a bit more
> control of what to do with this prefetched data. Two options I am wondering
> about:
>
> 1. Introduce a configuration setting, such as*
> "return-prefetched-data-for-paused-topic-partitions = false"* (have to
> think of a better name), which when set to true will return what is
> prefetched instead of throwing it away on calling *poll()*. Since this is
> amount of data that is bounded by the maximum size of the prefetch, we can
> control what is the most amount of records returned. The client of the
> consumer API can then be responsible for keeping that data around and use
> it when appropriate (i.e. when demand is present)
>
> 2. Introduce a facility to pass in a buffer into which the prefetched
> records are drained when poll is called and paused partitions have some
> prefetched records.
>
> Any opinions on the matter are welcome. Thanks a lot !
>
> Zahari Dichev
>


Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

2018-10-17 Thread Per Steffensen
Lets use X for the the point in time where commit() is called. Lets use 
Rs(X) for the recorders returned by poll()s at time X.
At time X, it is not necessarily true that all records in Rs(X) have 
been sent to Kafka (and acknowledged) and had their offsets flushed to 
offset-store.


Example
* Time X-1: poll() is called and one records R is returned
* Time X: commit() is called. There is no guarantee that the data in R 
has been sent/acknowledged to/by Kafka, nor that the offsets in R has 
been flushed to offset-store (it is likely, though).


Due to synchronization necessary, it is probably hard to make that 
guarantee, without reducing throughput significantly. But it is feasible 
to make the change that commit() is given (via argument) a 
list/collection of the records for which it is a guarantee. Thats what 
my current fix does (see PR).


On 16/10/2018 19.33, Ryanne Dolan wrote:

Steff,

> Guess people have used it, assuming that all records that have been 
polled > at the time of callback to "commit", have also had their 
offsets committed. > But that is not true.


(excerpt from KIP)

The documentation for SourceTask.commit() reads:

> Commit the offsets, up to the offsets that have been returned by 
{@link #poll()}. This > method should block until the commit is complete.


I'm confused by these seemingly contradictory statements. My 
assumption (as you say) is that all records returned by poll() will 
have been committed before commit() is invoked by the framework. Is 
that not the case?


Ryanne

On Wed, Oct 10, 2018 at 8:50 AM Per Steffensen > wrote:


Please help make the proposed changes in KIP-381 become reality.
Please
comment.

KIP:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-381%3A+Connect%3A+Tell+about+records+that+had+their+offsets+flushed+in+callback

JIRA: https://issues.apache.org/jira/browse/KAFKA-5716

PR: https://github.com/apache/kafka/pull/3872

Thanks!






Re: [DISCUSS] KIP-382: MirrorMaker 2.0

2018-10-17 Thread Jan Filipiak
This is not a performance optimisation. Its a fundamental design choice.


I never really took a look how streams does exactly once. (its a trap 
anyways and you usually can deal with at least once donwstream pretty 
easy). But I am very certain its not gonna get somewhere if offset 
commit and record produce cluster are not the same.

Pretty sure without this _design choice_ you can skip on that exactly 
once already

Best Jan

On 16.10.2018 18:16, Ryanne Dolan wrote:
>  >  But one big obstacle in this was
> always that group coordination happened on the source cluster.
>
> Jan, thank you for bringing up this issue with legacy MirrorMaker. I
> totally agree with you. This is one of several problems with MirrorMaker
> I intend to solve in MM2, and I already have a design and prototype that
> solves this and related issues. But as you pointed out, this KIP is
> already rather complex, and I want to focus on the core feature set
> rather than performance optimizations for now. If we can agree on what
> MM2 looks like, it will be very easy to agree to improve its performance
> and reliability.
>
> That said, I look forward to your support on a subsequent KIP that
> addresses consumer coordination and rebalance issues. Stay tuned!
>
> Ryanne
>
> On Tue, Oct 16, 2018 at 6:58 AM Jan Filipiak  > wrote:
>
> Hi,
>
> Currently MirrorMaker is usually run collocated with the target
> cluster.
> This is all nice and good. But one big obstacle in this was
> always that group coordination happened on the source cluster. So when
> then network was congested, you sometimes loose group membership and
> have to rebalance and all this.
>
> So one big request from we would be the support of having coordination
> cluster != source cluster.
>
> I would generally say a LAN is better than a WAN for doing group
> coordinaton and there is no reason we couldn't have a group consuming
> topics from a different cluster and committing offsets to another
> one right?
>
> Other than that. It feels like the KIP has too much features where many
> of them are not really wanted and counter productive but I will just
> wait and see how the discussion goes.
>
> Best Jan
>
>
> On 15.10.2018 18:16, Ryanne Dolan wrote:
>  > Hey y'all!
>  >
>  > Please take a look at KIP-382:
>  >
>  >
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0
>  >
>  > Thanks for your feedback and support.
>  >
>  > Ryanne
>  >
>