[jira] [Created] (KAFKA-8225) handle conflicting static member id

2019-04-11 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8225:
--

 Summary: handle conflicting static member id
 Key: KAFKA-8225
 URL: https://issues.apache.org/jira/browse/KAFKA-8225
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen
Assignee: Boyang Chen


We need an important fix for handling the user mis-configuration for duplicate 
group.instance.ids. Several approaches we have discussed so far:
 # Limit resetGeneration() call to only JoinGroupResponseHandler
 # Include InstanceId in the Heartbeat and OffsetCommit APIs. Then the 
coordinator can return the proper error code.
 # We can can use a convention to embed the instanceId into the generated 
memberId. At the moment, the current format is {{{clientId}-\{random uuid}}}. 
For static members, I think instanceId is more useful than clientId and we 
could probably use timestamp as a more concise alternative to uuid. So we could 
have {{{instanceId}-\{timestamp}}} as the memberId for static members. Then we 
would be able to extract this from any request and the coordinator could use 
the proper error code

Right now we are more inclined to option 2 or 3, however it requires 
non-trivial amount of code changes including protocol changes and fatal error 
handling on client side. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8224) Add static member id into Subscription Info for better rebalance behavior

2019-04-11 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8224:
--

 Summary: Add static member id into Subscription Info for better 
rebalance behavior
 Key: KAFKA-8224
 URL: https://issues.apache.org/jira/browse/KAFKA-8224
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


Based on discussion in [https://github.com/apache/kafka/pull/6177] and KIP-345, 
we plan to better utilize static member info to make wise rebalance call, such 
as assignors like Range or Round Robin could become more sticky to rely on 
static ids instead of coordinator auto-generated ids.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8223) Deprecate group.initial.rebalance.delay

2019-04-11 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8223:
--

 Summary: Deprecate group.initial.rebalance.delay
 Key: KAFKA-8223
 URL: https://issues.apache.org/jira/browse/KAFKA-8223
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen
Assignee: Boyang Chen


This is a stated step in KIP-345, however we reprioritize it since it is not 
ready to be removed for dynamic members.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8222) Admin client changes to add ability to batch remove static members

2019-04-11 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8222:
--

 Summary: Admin client changes to add ability to batch remove 
static members
 Key: KAFKA-8222
 URL: https://issues.apache.org/jira/browse/KAFKA-8222
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer, streams
Reporter: Boyang Chen
Assignee: Boyang Chen


After changes become effective in JIRA, we need to add admin support to use it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8221) Augment LeaveGroupRequest to batch operation

2019-04-11 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8221:
--

 Summary: Augment LeaveGroupRequest to batch operation
 Key: KAFKA-8221
 URL: https://issues.apache.org/jira/browse/KAFKA-8221
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Boyang Chen
Assignee: Boyang Chen


Having a batch leave group request is a required protocol change to remove a 
set of static members all at once.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8220) Avoid kicking out members through rebalance timeout

2019-04-11 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8220:
--

 Summary: Avoid kicking out members through rebalance timeout
 Key: KAFKA-8220
 URL: https://issues.apache.org/jira/browse/KAFKA-8220
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen
Assignee: Boyang Chen


As stated in KIP-345, we will no longer evict unjoined members out of the 
group. We need to take care the edge case when the leader fails to rejoin and 
switch to a new leader in this case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8219) Add web documentation for static membership

2019-04-11 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8219:
--

 Summary: Add web documentation for static membership
 Key: KAFKA-8219
 URL: https://issues.apache.org/jira/browse/KAFKA-8219
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer, streams
Reporter: Boyang Chen
Assignee: Boyang Chen


Need official documentation update.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-433: Block old clients on brokers

2019-04-11 Thread Ismael Juma
Hi Ying,

It looks to me that all the examples given in the KIP can be handled with
the existing "message.downconversion.enable" config and by configuring the
message format to be the latest:

1. Kafka 8 / 9 / 10 consumer hangs when the message contains message header
> ( KAFKA-6739 - Down-conversion fails for records with headers RESOLVED  )
> 2. LZ4 is not correctly handled in Kafka 8 and Kafka 9 ( KAFKA-3160 -
> Kafka LZ4 framing code miscalculates header checksum RESOLVED  )
> 3. Performance penalty of converting message format from V3 to V1 or V2
> for the old consumers (KIP-31 - Move to relative offsets in compressed
> message sets)


Am I missing something? Are there other examples that are not related to
message conversion?

Ismael

On Thu, Apr 11, 2019 at 11:53 PM Ying Zheng  wrote:

> Hi here,
>
> Please vote for
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-433%3A+Block+old+clients+on+brokers
>
> Thank you!
>


Use of version attribute in reassignment json of kafka-reassign-partitions.sh

2019-04-11 Thread Koushik Chitta
Hi,

What is the use/intention of version attribute in kafka-reassign-partitions.sh 
command ? I see the code(at least version 1.1) just ignores this attribute.

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
> --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
  {"topic":"foo1","partition":0,"replicas":[3,4]},
  {"topic":"foo2","partition":2,"replicas":[1,2]},
  {"topic":"foo2","partition":0,"replicas":[3,4]},
  {"topic":"foo1","partition":1,"replicas":[2,3]},
  {"topic":"foo2","partition":1,"replicas":[2,3]}]
}


Thanks,
Koushik


Re: [VOTE] KIP-433: Block old clients on brokers

2019-04-11 Thread Ying Zheng
Hi Gwen,

Thank you very much for the feedback! I have updated the KIP.

"reject request" means return an UNSUPPORTED_VERSION (35) error. Client
libraries after 0.10.2 will show the message "The version of API is not
supported." Client libraries before 0.10.2 will treat this error as an
unknown server error.

On Thu, Apr 11, 2019 at 3:01 PM Gwen Shapira  wrote:

> In general, I support this proposal, but I'd like some more details on what
> "rejecting API requests" mean? Close the connections? Return some kind of
> error? Is there a way for the client to know what happened? Is there a way
> for the admin to know how many clients are rejected?
>
> As a nit, the "migration plan" part of the KIP still mentions the
> authorizer.
>
> Gwen
>
> On Thu, Apr 11, 2019 at 2:53 PM Ying Zheng  wrote:
>
> > Hi here,
> >
> > Please vote for
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-433%3A+Block+old+clients+on+brokers
> >
> > Thank you!
> >
>
>
> --
> *Gwen Shapira*
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter  | blog
> 
>


Re: Stream caching

2019-04-11 Thread Guozhang Wang
Hello Kohut,

Streams's default state.dir location is /tmp/kafka-streams, that may
explained it.

For more info about state management of Kafka Streams, you can read
https://kafka.apache.org/21/documentation/streams/architecture#streams_architecture_state


Guozhang

On Thu, Apr 11, 2019 at 1:09 AM Ярослав Когут 
wrote:

> Hi guys.
>
> I’ve deployed spring cloud application using docker, service is using
> KStream functionality, and I have problems with /tmp folder.
>
> On some instances of application /tmp folder is from 3 to 6 GB. Maybe I
> missed some property?
>
> I will be very grateful for the help.
>
> Thanks, best regards.
>
> Kohut Yaroslav



-- 
-- Guozhang


Re: [DISCUSS] KIP-439: Deprecate Interface WindowStoreIterator

2019-04-11 Thread Guozhang Wang
While working at KIP-444 (https://github.com/apache/kafka/pull/6498) I
realized there are a bunch of issues on metric names v.s. function names,
e.g. some function named `fetchAll` are actually measure with `fetch`, etc.
So in that KIP I proposed to make the function name aligned with metrics
name. So suppose we rename the functions from `fetch` to `range` I'd
suggest we make this change as part of KIP-444 as well. Note that it means
different functions with the same name `range` will be measured under a
single metric then.

But still for function named `all` it will be measured under a separate
metric named `all`, so I'm just clarifying with you if that's the intention.


Guozhang

On Thu, Apr 11, 2019 at 2:04 PM Matthias J. Sax 
wrote:

> I did not see a reason to rename `all()` to `range()`. `all()` does not
> take any parameters to limit a range and is a good name IMHO. But I am
> not married to keep `all()` and if we think we should rename it, too, I
> am fine with it.
>
> Not sure what connection you make to metrics though. Can you elaborate?
>
>
> Would be interested to hear others opinions on this, too.
>
>
> -Matthias
>
> On 4/11/19 8:38 AM, Guozhang Wang wrote:
> > I like the renaming, since it also aligns with our metrics cleanup
> > (KIP-444) which touches upon the store level metrics as well.
> >
> > One question: you seems still suggesting to keep "all" with the current
> > name (and also using a separate metric for it), what's the difference
> > between this one and other "range" functions?
> >
> >
> > Guozhang
> >
> > On Thu, Apr 11, 2019 at 2:26 AM Matthias J. Sax 
> > wrote:
> >
> >> Thanks for the input.
> >>
>  Just to clarify the naming conflicts is between the newly added
> function
>  and the old functions that we want to deprecate / remove right? The
> >>
> >> Yes, the conflict is just fort the existing `fetch()` methods for which
> >> we want to change the return type.
> >>
> >> IMHO, we should not make a breaking change in a minor release. Thus, we
> >> could either only deprecate those fetch methods that return
> >> `WindowStoreIterator` and do a "clean cut" in 3.0.
> >>
> >> Or we, follow the renaming path. No get a clean renaming, we need to
> >> consider all methods that are called "fetch":
> >>
> >> ReadOnlyWindowStore:
> >>
> >>> V fetch(K, long)
> >>> WindowStoreIterator fetch(K, Instant, Instant)
> >>> KeyValueIterator, V> fetch(K, K, Instant, Instant)
> >>
> >> WindowStore:
> >>
> >>> WindowStoreIterator fetch(K, long, long)
> >>> WindowStoreIterator fetch(K, Instant, Instant)>
> >> KeyValueIterator, V> fetch(K, K, long, long)
> >>> KeyValueIterator, V> fetch(K, K, Instant, Instant)
> >>
> >> There is also fetchAll(long, long) and fetchAll(Instant, Instant) that
> >> fetch over all keys.
> >>
> >> Maybe we could rename `V fetch(K, long)` to `V get(K, long)` and rename
> >> all `fetch()/fetchAll()` to `range()`? There is actually no reason to
> >> have range()/rangeAll().
> >>
> >> If we do this, we might consider to rename methods for SessionStore,
> >> too. There is
> >>
> >>> ReadOnlySessionStore#fetch(K)
> >>> ReadOnlySessionStore#fetch(K, K)
> >>> SessionStore#findSessions(K, long, long)
> >>> SessionStore#findSessions(K, K, long, long)
> >>> SessionStore#fetchSession(K, long, long);
> >>
> >> Consistent renaming might be:
> >>
> >> ReadOnlySessionStore#range(K)
> >> ReadOnlySessionStore#range(K, K)
> >> SessionStore#range(K, long, long)
> >> SessionStore#range(K, K, long, long)
> >> SessionStore#get(K, long, long);
> >>
> >>
> >> Not sure if extensive renaming might have too big of an impact. However,
> >> `range()` and `get()` might actually be better names than `fetch()`,
> >> thus, it might also provide some additional value.
> >>
> >> Thoughts?
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 3/27/19 10:19 AM, Guozhang Wang wrote:
> >>> Hello Matthias,
> >>>
> >>> Just to clarify the naming conflicts is between the newly added
> function
> >>> and the old functions that we want to deprecate / remove right? The
> >>> existing ones have different signatures with parameters so that they
> >> should
> >>> not have conflicts.
> >>>
> >>> I was thinking about just make the change directly without deprecating
> >>> existing ones which would require users of 2.3 to make code changes --
> >> this
> >>> code change looks reasonably straight-forward to me and not much worth
> >>> deferring to later when the deprecated ones are removed.
> >>>
> >>> On the other hand, just deprecating "WindowIterator" without add new
> >>> functions seems not very useful for users either since it is only used
> as
> >>> an indicator but users cannot make code changes during this phase
> >> anyways,
> >>> so it is still a `one-cut` deal when we eventually remove the
> deprecated
> >>> ones and add the new one.
> >>>
> >>> Hence I'm slightly inclining to trade compatibility and replace it with
> >> new
> >>> functions in one release, but if people have a good idea of the
> renaming
> >>> 

Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka Streams

2019-04-11 Thread Guozhang Wang
Hello Matthias:

Thanks for your review.

The background section uses streams assignor as well as the consumer's own
stick assignor as examples illustrating the situation, but this KIP is for
consumer coordinator itself, and the rest of the paragraph did not talk
about Streams any more. If you feel it's a bit distracted I can remove
those examples.

10). While working on the PR I realized that the revoked partitions on
assignment is not needed (this is being discussed on the PR itself:
https://github.com/apache/kafka/pull/6528#issuecomment-480009890

20). 1.a. Good question, I've updated the wiki to let the consumer's
cleanup assignment and re-join, and not letting assignor making any
proactive changes. The idea is to keep logic simpler and not doing any
"split brain" stuff.

20). 2.b. No we do not need, since the owned-partitions will be part of the
Subscription passed in to assign() already.

30). As Boyang mentioned, there are some drawbacks that can not be
addressed by rebalance delay still, hence still voted KIP-345 (some more
details can be found on the discussion thread of KIP-345 itself). One
example is that as the instance resumes, its member id will be empty so we
are still relying on assignor to give it the assignment from the old
member-id while keeping all other member's assignment unchanged.

40). Incomplete sentence, I've updated it.

50). Here's my idea: suppose we augment the join group schema with
`protocol version` in 2.3, and then with both brokers and clients being in
version 2.3+, on the first rolling bounce where subscription and assignment
schema and / or user metadata has changed, this protocol version will be
bumped. On the broker side, when receiving all member's join-group request,
it will choose the one that has the highest protocol version (also it
assumes higher versioned protocol is always backward compatible, i.e. the
coordinator can recognize lower versioned protocol as well) and select it
as the leader. Then the leader can decide, based on its received and
deserialized subscription information, how to assign partitions and how to
encode the assignment accordingly so that everyone can understand it. With
this, in Streams for example, no version probing would be needed since we
are guaranteed the leader knows everyone's version -- again it is assuming
that higher versioned protocol is always backward compatible -- and hence
can successfully do the assignment at that round.

60). My bad, this section was not updated while the design was evolved,
I've updated it.


On Tue, Apr 9, 2019 at 7:22 PM Boyang Chen  wrote:

>
> Thanks for the review Matthias! My 2-cent on the rebalance delay is that
> it is a rather fixed trade-off between
>
> task availability and resource shuffling. If we eventually trigger
> rebalance after rolling bounce, certain consumer
>
> setup is still faced with global shuffles, for example member.id ranking
> based round robin strategy, as rejoining dynamic
>
> members will be assigned with new member.id which reorders the
> assignment. So I think the primary goal of incremental
>
> rebalancing is still improving the cluster availability during rebalance,
> because it didn't revoke any partition during this
>
> process. Also, the perk is minimum configuration requirement :)
>
>
> Best,
>
> Boyang
>
> 
> From: Matthias J. Sax 
> Sent: Tuesday, April 9, 2019 7:47 AM
> To: dev
> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka Streams
>
> Thank for the KIP, Boyang and Guozhang!
>
>
> I made an initial pass and have some questions/comments. One high level
> comment: it seems that the KIP "mixes" plain consumer and Kafka Streams
> use case a little bit (at least in the presentation). It might be
> helpful to separate both cases clearly, or maybe limit the scope to
> plain consumer only.
>
>
>
> 10) For `PartitionAssignor.Assignment`: It seems we need a new method
> `List revokedPartitions()` ?
>
>
>
> 20) In Section "Consumer Coordinator Algorithm"
>
> Bullet point "1a)": If the subscription changes and a topic is
> removed from the subscription, why do we not revoke the partitions?
>
> Bullet point "1a)": What happens is a topic is deleted (or a
> partition is removed/deleted from a topic)? Should we call the new
> `onPartitionsEmigrated()` callback for this case?
>
> Bullet point "2b)" Should we update the `PartitionAssignor`
> interface to pass in the "old assignment" as third parameter into
> `assign()`?
>
>
>
> 30) Rebalance delay (as used in KIP-415): Could a rebalance delay
> subsume KIP-345? Configuring static members is rather complicated, and I
> am wondering if a rebalance delay would be sufficient?
>
>
>
> 40) Quote: "otherwise the we would fall into the case 3.b) forever."
>
> What is "case 3.b" ?
>
>
>
> 50) Section "Looking into the Future"
>
> Nit: the new "ProtocolVersion" field is missing in the first line
> describing "JoinGroupRequest"
>
> > This can also help saving "version 

Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka Streams

2019-04-11 Thread Guozhang Wang
Hello Jason,

1. Yeah I think `emigrate` is not a very good term maybe. On the other hand
`evicted` is quite similar to `revoked` to me.. I'd like to have another
proposal of `onPartitionLost` actually.

2. The interface's default implementation will just be
`onPartitionRevoked`, so for user's instantiation if they do not make any
code changes they should be able to recompile the code and continue.

3. Yes: this is to reduce the rebalance notification latency (green bar in
the digram).

4. Hmm.. not sure if it will work. The main issue is that the
consumer-coordinator behavior (whether to revoke all or none at
onRebalancePrepare) is independent of the selected protocol's assignor
(eager or cooperative), so even if the assignor is selected to be the
old-versioned one, we will still not revoke at the consumer-coordinator
layer and hence has the same risk of migrating still-owned partitions,
right?

5. Yup good point, I will add it to the PartitionAssignor interface.



Guozhang



On Wed, Apr 10, 2019 at 4:03 PM Jason Gustafson  wrote:

> Hi Guozhang and Boyang,
>
> Thanks for the KIP. A few comments/questions below:
>
> 1. More of a nitpick, but `onPartitionsEmigrated` is not a very clear name.
> How about `onPartitionsEvicted`? Or even perhaps `onMembershipLost`?
> 2. For `onPartitionsEmigrated`, how will we maintain compatibility with the
> old behavior? Seems like we might need an extension of
> `ConsumerRebalanceListener` with the new method. Otherwise we won't know if
> the application is expecting the old behavior.
> 3. Just making sure I understand this, but the reason we need the error
> code in the assignment is that the revoked partitions might be empty for
> some members and non-empty for others. We want all members to rejoin
> quickly even if they have no revoked partitions. Is that right?
> 4. I wanted to suggest an alternative approach for dealing with
> compatibility and the upgrade problem. In fact, the consumer already has a
> mechanism to change the assignment logic. Users can provide multiple
> PartitionAssignor implementations in the `partition.assignment.strategy`
> configuration. The coordinator will only select one which is supported by
> all members of the group. Rather than adding the new `rebalance.protocol`
> config, could we not reuse this mechanism? To support this, we would
> basically create new assignor implementations. For example,
> CooperativeRoundRobin instead of the usual RoundRobin. I think the benefit
> is that it is quite a bit easier to reason about the upgrade state when not
> all consumers have been updated. We are guaranteed that all members are
> following the same logic. My feeling is that this will be a less error
> prone solution since it depends less on state outside the system (i.e. the
> respective `rebalance.protocol` configurations for all members in the group
> and binary compatibility). The downside is that it will take more effort
> for PartitionAssignor implementations to get a benefit from this improved
> logic. But it's really hard to say that the new assignment logic would be
> compatible with a custom assignor in any case.
> 5. Where does the new ProtocolVersion come from in the new JoinGroup
> schema? I guess we need a new API on the PartitionAssignor interface?
>
> Thanks,
> Jason
>
>
> On Mon, Apr 8, 2019 at 9:39 PM Boyang Chen  wrote:
>
> >
> > Thanks for the review Matthias! My 2-cent on the rebalance delay is that
> > it is a rather fixed trade-off between
> >
> > task availability and resource shuffling. If we eventually trigger
> > rebalance after rolling bounce, certain consumer
> >
> > setup is still faced with global shuffles, for example member.id ranking
> > based round robin strategy, as rejoining dynamic
> >
> > members will be assigned with new member.id which reorders the
> > assignment. So I think the primary goal of incremental
> >
> > rebalancing is still improving the cluster availability during rebalance,
> > because it didn't revoke any partition during this
> >
> > process. Also, the perk is minimum configuration requirement :)
> >
> >
> > Best,
> >
> > Boyang
> >
> > 
> > From: Matthias J. Sax 
> > Sent: Tuesday, April 9, 2019 7:47 AM
> > To: dev
> > Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka Streams
> >
> > Thank for the KIP, Boyang and Guozhang!
> >
> >
> > I made an initial pass and have some questions/comments. One high level
> > comment: it seems that the KIP "mixes" plain consumer and Kafka Streams
> > use case a little bit (at least in the presentation). It might be
> > helpful to separate both cases clearly, or maybe limit the scope to
> > plain consumer only.
> >
> >
> >
> > 10) For `PartitionAssignor.Assignment`: It seems we need a new method
> > `List revokedPartitions()` ?
> >
> >
> >
> > 20) In Section "Consumer Coordinator Algorithm"
> >
> > Bullet point "1a)": If the subscription changes and a topic is
> > removed from the subscription, why do we not 

Build failed in Jenkins: kafka-trunk-jdk8 #3533

2019-04-11 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: ConnectionStressWorker: add missing executor shutdown (#6558)

[github] MINOR: Remove SubscriptionState.Listener and replace with assignmentId

--
[...truncated 4.73 MB...]
org.apache.kafka.connect.header.ConnectHeadersTest > 
shouldNotAddHeadersWithNullObjectValuesWithNonOptionalSchema STARTED

org.apache.kafka.connect.header.ConnectHeadersTest > 
shouldNotAddHeadersWithNullObjectValuesWithNonOptionalSchema PASSED

org.apache.kafka.connect.header.ConnectHeadersTest > 
shouldDuplicateAndAlwaysReturnEquivalentButDifferentObject STARTED

org.apache.kafka.connect.header.ConnectHeadersTest > 
shouldDuplicateAndAlwaysReturnEquivalentButDifferentObject PASSED

org.apache.kafka.connect.header.ConnectHeadersTest > shouldAddTimestamp STARTED

org.apache.kafka.connect.header.ConnectHeadersTest > shouldAddTimestamp PASSED

org.apache.kafka.connect.header.ConnectHeadersTest > shouldAddDecimal STARTED

org.apache.kafka.connect.header.ConnectHeadersTest > shouldAddDecimal PASSED

org.apache.kafka.connect.header.ConnectHeadersTest > 
shouldAddHeadersWithNullObjectValuesWithOptionalSchema STARTED

org.apache.kafka.connect.header.ConnectHeadersTest > 
shouldAddHeadersWithNullObjectValuesWithOptionalSchema PASSED

org.apache.kafka.connect.sink.SinkRecordTest > 
shouldDuplicateRecordAndCloneHeaders STARTED

org.apache.kafka.connect.sink.SinkRecordTest > 
shouldDuplicateRecordAndCloneHeaders PASSED

org.apache.kafka.connect.sink.SinkRecordTest > 
shouldCreateSinkRecordWithEmptyHeaders STARTED

org.apache.kafka.connect.sink.SinkRecordTest > 
shouldCreateSinkRecordWithEmptyHeaders PASSED

org.apache.kafka.connect.sink.SinkRecordTest > 
shouldDuplicateRecordUsingNewHeaders STARTED

org.apache.kafka.connect.sink.SinkRecordTest > 
shouldDuplicateRecordUsingNewHeaders PASSED

org.apache.kafka.connect.sink.SinkRecordTest > shouldModifyRecordHeader STARTED

org.apache.kafka.connect.sink.SinkRecordTest > shouldModifyRecordHeader PASSED

org.apache.kafka.connect.sink.SinkRecordTest > 
shouldCreateSinkRecordWithHeaders STARTED

org.apache.kafka.connect.sink.SinkRecordTest > 
shouldCreateSinkRecordWithHeaders PASSED

org.apache.kafka.connect.source.SourceRecordTest > 
shouldDuplicateRecordAndCloneHeaders STARTED

org.apache.kafka.connect.source.SourceRecordTest > 
shouldDuplicateRecordAndCloneHeaders PASSED

org.apache.kafka.connect.source.SourceRecordTest > 
shouldDuplicateRecordUsingNewHeaders STARTED

org.apache.kafka.connect.source.SourceRecordTest > 
shouldDuplicateRecordUsingNewHeaders PASSED

org.apache.kafka.connect.source.SourceRecordTest > shouldModifyRecordHeader 
STARTED

org.apache.kafka.connect.source.SourceRecordTest > shouldModifyRecordHeader 
PASSED

org.apache.kafka.connect.source.SourceRecordTest > 
shouldCreateSinkRecordWithHeaders STARTED

org.apache.kafka.connect.source.SourceRecordTest > 
shouldCreateSinkRecordWithHeaders PASSED

org.apache.kafka.connect.source.SourceRecordTest > 
shouldCreateSinkRecordWithEmtpyHeaders STARTED

org.apache.kafka.connect.source.SourceRecordTest > 
shouldCreateSinkRecordWithEmtpyHeaders PASSED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldConvertEmptyListToListWithoutSchema STARTED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldConvertEmptyListToListWithoutSchema PASSED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldConvertStringWithQuotesAndOtherDelimiterCharacters STARTED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldConvertStringWithQuotesAndOtherDelimiterCharacters PASSED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldConvertListWithMixedValuesToListWithoutSchema STARTED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldConvertListWithMixedValuesToListWithoutSchema PASSED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldConvertMapWithStringKeys STARTED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldConvertMapWithStringKeys PASSED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldParseStringOfMapWithStringValuesWithWhitespaceAsMap STARTED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldParseStringOfMapWithStringValuesWithWhitespaceAsMap PASSED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldConvertMapWithStringKeysAndShortValues STARTED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldConvertMapWithStringKeysAndShortValues PASSED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldParseStringOfMapWithStringValuesWithoutWhitespaceAsMap STARTED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldParseStringOfMapWithStringValuesWithoutWhitespaceAsMap PASSED

org.apache.kafka.connect.storage.SimpleHeaderConverterTest > 
shouldConvertSimpleString 

Re: [VOTE] KIP-433: Block old clients on brokers

2019-04-11 Thread Gwen Shapira
In general, I support this proposal, but I'd like some more details on what
"rejecting API requests" mean? Close the connections? Return some kind of
error? Is there a way for the client to know what happened? Is there a way
for the admin to know how many clients are rejected?

As a nit, the "migration plan" part of the KIP still mentions the
authorizer.

Gwen

On Thu, Apr 11, 2019 at 2:53 PM Ying Zheng  wrote:

> Hi here,
>
> Please vote for
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-433%3A+Block+old+clients+on+brokers
>
> Thank you!
>


-- 
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter  | blog



[VOTE] KIP-433: Block old clients on brokers

2019-04-11 Thread Ying Zheng
Hi here,

Please vote for
https://cwiki.apache.org/confluence/display/KAFKA/KIP-433%3A+Block+old+clients+on+brokers

Thank you!


Re: [DISCUSS] KIP-433: Provide client API version to authorizer

2019-04-11 Thread Ying Zheng
@Colin, Thank you for the feedback!

I have updated the KIP, added the explanation of why we use API version
rather than Kafka version.

I will start a vote for this KIP


On Fri, Mar 29, 2019 at 9:47 AM Colin McCabe  wrote:

> Hi Ying,
>
> That's a fair point.  Maybe using API keys directly is reasonable here.
>
> One thing that's probably worth calling out is that if we make the name
> part of the configuration, we can't rename APIs in the future.  That's
> probably OK as long as it's documented.
>
> best,
> Colin
>
> On Thu, Mar 28, 2019, at 17:36, Ying Zheng wrote:
> > @Colin McCabe 
> >
> > I did think about that option. Yes, for most users, it's much easier to
> > understand Kafka version, rather than API version. However, the existing
> > Kafka clients only report API versions in the Kafka requests. So, the
> > brokers can only block clients by API versions rather than the real Kafka
> > versions. This can be very confusing for the users, if an API did not
> > change for a specified Kafka version.
> >
> > For example, a user sets the min Kafka version of produce request to
> Kafka
> > 1.1. She would expect the broker will reject Kafka 1.0 producers.
> However,
> > both Kafka 1.1 and Kafka 1.0 are using api version 5. The broker can't
> > distinguish the 2 version. So, Kafka 1.0 producers are still allowed.
> >
> > I think we can say this configuration is only for "advanced users". The
> > user has to know the concept of "api version", and know the function of
> > each API, to be able to use this feature. (Similarly, it's easier for the
> > users to say: "I want to block old consumers, or old admin clients.". But
> > there is no clear definition of which set of APIs are "consumer APIs".
> So,
> > we still have to use API names, rather than "client roles")
> >
> >
> >
> > On Wed, Mar 27, 2019 at 5:32 PM Colin McCabe  wrote:
> >
> > > Thanks, Ying Zheng.  Looks good overall.
> > >
> > > One question is, should the version be specified as a Kafka version
> rather
> > > than as a RPC API version?  I don't think most users are aware of RPC
> > > versions, but something like "min kafka version" would be easier to
> > > understand.  That is how we handle the minimum inter-broker protocol
> > > version and the minimum on-disk format version, after all.
> > >
> > > best,
> > > Colin
> > >
> > > On Tue, Mar 26, 2019, at 17:52, Ying Zheng wrote:
> > > > I have rewritten the KIP. The new proposal is adding a new
> configuration
> > > > min.api.version in Kafka broker.
> > > >
> > > > Please review the new KIP. Thank you!
> > > >
> > > > On Fri, Mar 1, 2019 at 11:06 AM Colin McCabe 
> wrote:
> > > >
> > > > > On Wed, Feb 27, 2019, at 15:53, Harsha wrote:
> > > > > > HI Colin,
> > > > > > Overlooked the IDEMPOTENT_WRITE ACL. This along with
> > > > > > client.min.version should solve the cases proposed in the KIP.
> > > > > > Can we turn this KIP into adding min.client.version config to
> broker
> > > > > > and it could be part of the dynamic config .
> > > > >
> > > > > +1, sounds like a good idea.
> > > > >
> > > > > Colin
> > > > >
> > > > >
> > > > > >
> > > > > > Thanks,
> > > > > > Harsha
> > > > > >
> > > > > > On Wed, Feb 27, 2019, at 12:17 PM, Colin McCabe wrote:
> > > > > > > On Tue, Feb 26, 2019, at 16:33, Harsha wrote:
> > > > > > > > Hi Colin,
> > > > > > > >
> > > > > > > > "> I think Ismael and Gwen here bring up a good point.  The
> > > version
> > > > > of the
> > > > > > > > > request is a technical detail that isn't really related to
> > > > > > > > > authorization.  There are a lot of other technical details
> like
> > > > > this
> > > > > > > > > like the size of the request, the protocol it came in on,
> etc.
> > > > > None of
> > > > > > > > > them are passed to the authorizer-- they all have
> configuration
> > > > > knobs
> > > > > > > > > to control how we handle them.  If we add this technical
> > > detail,
> > > > > > > > > logically we'll have to start adding all the others, and
> the
> > > > > authorizer
> > > > > > > > > API will get really bloated.  It's better to keep it
> focused on
> > > > > > > > > authorization, I think."
> > > > > > > >
> > > > > > > > probably my previous email is not clear but I am agreeing
> with
> > > > > Gwen's point.
> > > > > > > > I am not in favor of extending authorizer to support this.
> > > > > > > >
> > > > > > > >
> > > > > > > > "> Another thing to consider is that if we add a new broker
> > > > > configuration
> > > > > > > > > that lets us set a minimum client version which is allowed,
> > > that
> > > > > could
> > > > > > > > > be useful to other users as well.  On the other hand, most
> > > users
> > > > > are
> > > > > > > > > not likely to write a custom authorizer to try to take
> > > advantage
> > > > > of
> > > > > > > > > version information being passed to the authorizer.  So, I
> > > think
> > > > > using> a configuration is clearly the better way to go here.
> Perhaps
> > > it
> > > > > can
> > > > > > > > > be a 

Re: [DISCUSS] KIP-450: Sliding Window Aggregations in the DSL

2019-04-11 Thread Sophie Blee-Goldman
Thanks for the comments Guozhang! I've answered your questions below

On Tue, Apr 9, 2019 at 4:38 PM Guozhang Wang  wrote:

> Hi Sophie,
>
> Thanks for the proposed KIP. I've made a pass over it and here are some
> thoughts:
>
> 1. "The window size is effectively the grace and retention period". The
> grace time is defined as "the time to admit late-arriving events after the
> end of the window." hence it is the additional time beyond the window size.
> I guess your were trying to say it should be zero?
>
> Also for retention period, it is not a notion of the window spec any more,
> but only for the window store itself. So I'd suggest talking about window
> size here, and note that store retention time cannot be controlled via
> window spec at all.
>

Yes, I meant to say the grace period is effectively zero -- the retention
period will ultimately be the same as the window size, which is
configurable, but it can't be configured independently if that's what you
mean?


> 2. In the "O(sqrt(N)) Design" you did not mention when / how to expire a
> bucket, so I'd assume you will expire one bucket as a whole when its end
> time is smaller than the current window's starting time, right?
>

Since this design assumes we don't have a subtracter, each bucket would
expire when it's start time is outside the current window; the remaining
values in that bucket are then aggregated with the "running aggregate" of
the next bucket to get the total aggregate for the entire window. I'll try
to come up with a diagram and/or better way to explain what I have in mind
here...
(The values themselves in the buckets will expire automatically by setting
the retention period of the underlying window store)


> 3. Also in your algorithm how to choose "M" seems tricky, would it be a
> configurable parameter exposed to users or is it abstracted away and only
> being selected internally?
>

Good question. If we ignore the difference in cost between aggregation
operations and writes to the underlying store, the optimal value of M is
sqrt(N). But the reality is the aggregation might be very simple vs
expensive RocksDB writes -- conversely the aggregation itself could be
complicated/costly while the underlying store is cheap to write  (ie
in-memory). I do feel it should be abstracted away from the user however
and not an additional parameter they need to consider and tune (eg
segmentInterval) ... some profiling would probably be needed to determine a
reasonable choice


> 4. "There is some tradeoff between purely optimizing " seems incomplete
> paragraph?
>

Whoops


> 5. Meta comment: for many aggregations it is commutative and associative so
> we can require users to pass in a "substract" function as well. Given these
> two function I think we can propose two set of APIs, 1) with the adder and
> subtractor and 2) with the added only (if the aggregate logic is not comm.
> and assoc.).
>
> We just maintain an aggregate value for each bucket (call it
> bucket_aggregate) plus for the whole window (call it total_aggregate), i.e.
> at most M + 1 values per key. We use the total_aggregate for queries, and
> each update will cause 2 writes (to the bucket and to the total aggregate).
>
> And with 1) when expiring the oldest bucket we simply call
> subtract(total_aggregate, bucket_aggregate); with 2) when expiring the
> oldest bucket we can re-compute the total_aggregate by
> sum(bucket_aggregate) over other buckets again.
>

This is a good point, ie we can definitely be much smarter in our design if
we have a subtracter, in which case it's probably worth separate sets of
APIs/implementations based on what the user can provide. I'll work this
into the KIP


> 6. Meta comment: it is reasonable to assume in practice out-of-ordering
> data is not very common, hence most of the updates will be falling into the
> latest bucket. So I'm wondering if it makes sense to always store the first
> bucket in memory while making other buckets optionally on persistent
> storage. In practice, as long as M is large enough (we probably need it to
> be large enough to have sufficiently sensitive expiration anyways) then
> each bucket's aggregate data is small enough to be in memory.
>

This sounds reasonable to me (looking into the future, if we want to
eventually support a way to "tune" the total memory usage by Streams this
could be turned on/off)


> Guozhang
>
>
>
> On Fri, Apr 5, 2019 at 7:58 PM Sophie Blee-Goldman 
> wrote:
>
> > Hello all,
> >
> > I would like to kick off discussion of this KIP aimed at providing
> sliding
> > window semantics to DSL aggregations.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> >
> > Please take a look and share any thoughts you have regarding the API,
> > semantics, design, etc!
> >
> > I also have a POC PR open with the "naive" implementation for your
> > reference: https://github.com/apache/kafka/pull/6549
> >
> > Cheers,
> > Sophie
> >
>
>
> --
> -- Guozhang

Re: [DISCUSS] KIP-439: Deprecate Interface WindowStoreIterator

2019-04-11 Thread Matthias J. Sax
I did not see a reason to rename `all()` to `range()`. `all()` does not
take any parameters to limit a range and is a good name IMHO. But I am
not married to keep `all()` and if we think we should rename it, too, I
am fine with it.

Not sure what connection you make to metrics though. Can you elaborate?


Would be interested to hear others opinions on this, too.


-Matthias

On 4/11/19 8:38 AM, Guozhang Wang wrote:
> I like the renaming, since it also aligns with our metrics cleanup
> (KIP-444) which touches upon the store level metrics as well.
> 
> One question: you seems still suggesting to keep "all" with the current
> name (and also using a separate metric for it), what's the difference
> between this one and other "range" functions?
> 
> 
> Guozhang
> 
> On Thu, Apr 11, 2019 at 2:26 AM Matthias J. Sax 
> wrote:
> 
>> Thanks for the input.
>>
 Just to clarify the naming conflicts is between the newly added function
 and the old functions that we want to deprecate / remove right? The
>>
>> Yes, the conflict is just fort the existing `fetch()` methods for which
>> we want to change the return type.
>>
>> IMHO, we should not make a breaking change in a minor release. Thus, we
>> could either only deprecate those fetch methods that return
>> `WindowStoreIterator` and do a "clean cut" in 3.0.
>>
>> Or we, follow the renaming path. No get a clean renaming, we need to
>> consider all methods that are called "fetch":
>>
>> ReadOnlyWindowStore:
>>
>>> V fetch(K, long)
>>> WindowStoreIterator fetch(K, Instant, Instant)
>>> KeyValueIterator, V> fetch(K, K, Instant, Instant)
>>
>> WindowStore:
>>
>>> WindowStoreIterator fetch(K, long, long)
>>> WindowStoreIterator fetch(K, Instant, Instant)>
>> KeyValueIterator, V> fetch(K, K, long, long)
>>> KeyValueIterator, V> fetch(K, K, Instant, Instant)
>>
>> There is also fetchAll(long, long) and fetchAll(Instant, Instant) that
>> fetch over all keys.
>>
>> Maybe we could rename `V fetch(K, long)` to `V get(K, long)` and rename
>> all `fetch()/fetchAll()` to `range()`? There is actually no reason to
>> have range()/rangeAll().
>>
>> If we do this, we might consider to rename methods for SessionStore,
>> too. There is
>>
>>> ReadOnlySessionStore#fetch(K)
>>> ReadOnlySessionStore#fetch(K, K)
>>> SessionStore#findSessions(K, long, long)
>>> SessionStore#findSessions(K, K, long, long)
>>> SessionStore#fetchSession(K, long, long);
>>
>> Consistent renaming might be:
>>
>> ReadOnlySessionStore#range(K)
>> ReadOnlySessionStore#range(K, K)
>> SessionStore#range(K, long, long)
>> SessionStore#range(K, K, long, long)
>> SessionStore#get(K, long, long);
>>
>>
>> Not sure if extensive renaming might have too big of an impact. However,
>> `range()` and `get()` might actually be better names than `fetch()`,
>> thus, it might also provide some additional value.
>>
>> Thoughts?
>>
>>
>> -Matthias
>>
>>
>> On 3/27/19 10:19 AM, Guozhang Wang wrote:
>>> Hello Matthias,
>>>
>>> Just to clarify the naming conflicts is between the newly added function
>>> and the old functions that we want to deprecate / remove right? The
>>> existing ones have different signatures with parameters so that they
>> should
>>> not have conflicts.
>>>
>>> I was thinking about just make the change directly without deprecating
>>> existing ones which would require users of 2.3 to make code changes --
>> this
>>> code change looks reasonably straight-forward to me and not much worth
>>> deferring to later when the deprecated ones are removed.
>>>
>>> On the other hand, just deprecating "WindowIterator" without add new
>>> functions seems not very useful for users either since it is only used as
>>> an indicator but users cannot make code changes during this phase
>> anyways,
>>> so it is still a `one-cut` deal when we eventually remove the deprecated
>>> ones and add the new one.
>>>
>>> Hence I'm slightly inclining to trade compatibility and replace it with
>> new
>>> functions in one release, but if people have a good idea of the renaming
>>> approach (I do not have a good one on top of my head :) I can also be
>>> convinced that way.
>>>
>>> Guozhang
>>>
>>>
>>> On Mon, Mar 11, 2019 at 10:41 AM Matthias J. Sax 
>>> wrote:
>>>
 I am open to change the return type to

 KeyValueIterator, V>

 However, this requires to rename

 #fetch(K key, long startTimestamp, long endTimestamp)
 #fetch(K key, Instant startTimestamp, Instant endTimestamp)

 to avoid naming conflicts.

 What new name would you suggest? The existing methods are called
 `fetch()`, `fetchAll()`, `all()`, `put()`.

 While I think it would be good to get fully aligned return types, I am
 not sure how we can get aligned method names (without renaming all of
 them...)? If we think it's worth to rename all to get this cleaned up, I
 am no opposed.


 Thoughts?


 -Matthias


 On 3/11/19 10:27 AM, Guozhang Wang wrote:
> I was thinking about 

Re: Dynamic window size for aggregations

2019-04-11 Thread Boyang Chen
Hey Rajesh,

I do like the idea of customized windows. To make that work, we need to define 
a rule-based window size generation. In practical sense, fully randomized 
windows are rare and those are usually addressed as session windows. For 
calendar based window strategy, the goal is to define a cyclic formula for 
window creation. For example, we should define an API like:
new WindowStore(List[Duration] months -> (31, 28, 31, 30...))

and notice that the correct cycle is 4 years (1 extra day on February). This 
seems to be a very business specific use case that needs a lot of work to make 
it happen. Considering the amount of work, day based window should definitely 
easier for you and you just need to do secondary aggregation on top of the raw 
data. Do you want to talk more about your use case here, especially how much 
impact for your case if the rule-based window store API is developed?

Boyang


From: Rajesh Kalyanasundaram 
Sent: Wednesday, April 10, 2019 1:05 AM
To: dev@kafka.apache.org
Subject: Re: Dynamic window size for aggregations

Thanks Boyang & Matthias for your replies.
Boyang you are right.. More generically, I want to create a calendar based 
window. If my window is in months then the window shall tumble or hop end of 
every month. If my window is in days, then the window shall tumble/advanceby 
end of every day.
Perhaps, the thread's subject is misleading in the sense that its not dynamic 
windows rather calendar based window. Obviously if my unit is days, then my 
window size is NOT dynamic.
Thanks
Regards,
Rajesh

On 09/04/19, 10:12 PM, "Boyang Chen"  wrote:

Hey Rajesh,


my understanding is that you want to create a month-based time window, is 
that correct?


Boyang


From: Matthias J. Sax 
Sent: Tuesday, April 9, 2019 11:42 PM
To: dev@kafka.apache.org
Subject: Re: Dynamic window size for aggregations

Feel free to create a feature request JIRA.

For now, you could use a custom processor (via `.transform()`) that uses
an attached window store to implement the logic you need.


-Matthias

On 4/9/19 4:09 AM, Rajesh Kalyanasundaram wrote:
> Hi all,
> We have a requirement to implement aggregations with TimedWindows which 
may have varying window sizes. For example, I may want a tumbling window that 
tumbles on 31st of Jan and 28th of Feb, 31st of March and so on.
> We did initial analysis of TimedWindows. Found that the windoSize is 
being fixed at many places in the streams code.
>
> Any guidance in supporting this would be very much appreciated.
> Thank you.
> Regards,
> Rajesh
> This email and any files transmitted with it are confidential, 
proprietary and intended solely for the individual or entity to whom they are 
addressed. If you have received this email in error please delete it 
immediately.
>



This email and any files transmitted with it are confidential, proprietary and 
intended solely for the individual or entity to whom they are addressed. If you 
have received this email in error please delete it immediately.


Re: [DISCUSS] KIP-451: Make TopologyTestDriver output iterable

2019-04-11 Thread Patrik Kleindl
Hi Matthias

Thanks for the questions.

Regarding the return type:
Iterable offers the option of being used in a foreach loop directly and it
gives you access to the .iterator method, too.
(ref:
https://www.techiedelight.com/differences-between-iterator-and-iterable-in-java/
)

To return a List object would require an additional conversion and I
don't see the immediate benefit.

Regarding the ordering:
outputRecordsByTopic gives back a Queue

private final Map>>
outputRecordsByTopic = new HashMap<>();

which has a LinkedList behind it

outputRecordsByTopic.computeIfAbsent(record.topic(), k -> new
LinkedList<>()).add(record);

So the order is handled by the linked list and should not be modified by my
changes,
not even the .stream.map etc. (ref:
https://stackoverflow.com/questions/30258566/java-stream-map-and-collect-order-of-resulting-container
)


Then again, I am open to change it if people have some strong preference

best regards

Patrik


On Thu, 11 Apr 2019 at 17:45, Matthias J. Sax  wrote:

> Thanks for the KIP!
>
> Overall, this makes sense and can simplify testing.
>
> What I am wondering is, why you suggest to return an `Iterable`? Maybe
> returning an `Iterator` would make more sense? Or a List? Note that the
> order of emits matters, thus returning a generic `Collection` would not
> seem to be appropriate.
>
> Can you elaborate on the advantages to use `Iterable` compared to the
> other options?
>
>
>
> -Matthias
>
> On 4/11/19 2:09 AM, Patrik Kleindl wrote:
> > Hi everyone,
> >
> > I would like to start the discussion on this small enhancement of
> > the TopologyTestDriver.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-451%3A+Make+TopologyTestDriver+output+iterable
> >
> > Pull request is available at https://github.com/apache/kafka/pull/6556
> >
> > Any feedback is welcome
> >
> > best regards
> >
> > Patrik
> >
>
>


Proposal to Auto Close Inactive Tickets

2019-04-11 Thread Addison Huddy
Hi Kafka Developers,

The Apache Kafka JIRA currently has 2138 open JIRA tickets. As Charlie
Munger  once said,
“Simplicity has a way of improving performance through enabling us to
better understand what we are doing.”

What are everyone’s thoughts on adopting what the k8s community is doing
and auto close any ticket that has not seen any updates for 90 days.

https://github.com/kubernetes/community/blob/master/contributors/devel/automation.md

Prow will close pull-requests that don't have human activity in the
last 90 days. It will warn about this process 60 days before closing
the pull-request, and warn again 30 days later. One way to prevent
this from happening is to add the lifecycle/frozen label on the
pull-request.

If we were to adopt this practice, we could reduce our open ticket count to
553, a 74% decrease.
project = KAFKA AND resolution = Unresolved AND updated >= "-90d" ORDER BY
created DESC

So how might this work?

   - a bot, let’s call it Bender, would ping the ticket reporter after 30
   days of inactivity
   - After 60 days, Bender would again ping the reporter warning them that
   the ticket will be closed due to inactivity
   - After 90 days of inactivity, bender would resolve the ticket with the
   status Auto Closed and a comment that the ticket was resolved due to
   inactivity.
   - Bender would ignore all tickets with the label bender-ignore


[image: image.png]

Let me know what you think?

\ah


Build failed in Jenkins: kafka-trunk-jdk8 #3532

2019-04-11 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: Use generated InitProducerId RPC (#6538)

--
[...truncated 2.36 MB...]
org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaStringToTimestamp PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaTimeToTimestamp STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaTimeToTimestamp PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessUnixToTimestamp STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessUnixToTimestamp PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaTimestampToString STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaTimestampToString PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testConfigInvalidFormat STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testConfigInvalidFormat PASSED

org.apache.kafka.connect.transforms.HoistFieldTest > withSchema STARTED

org.apache.kafka.connect.transforms.HoistFieldTest > withSchema PASSED

org.apache.kafka.connect.transforms.HoistFieldTest > schemaless STARTED

org.apache.kafka.connect.transforms.HoistFieldTest > schemaless PASSED

org.apache.kafka.connect.transforms.CastTest > 
castWholeDateRecordValueWithSchemaString STARTED

org.apache.kafka.connect.transforms.CastTest > 
castWholeDateRecordValueWithSchemaString PASSED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessBooleanTrue STARTED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessBooleanTrue PASSED

org.apache.kafka.connect.transforms.CastTest > testConfigInvalidTargetType 
STARTED

org.apache.kafka.connect.transforms.CastTest > testConfigInvalidTargetType 
PASSED

org.apache.kafka.connect.transforms.CastTest > 
testConfigMixWholeAndFieldTransformation STARTED

org.apache.kafka.connect.transforms.CastTest > 
testConfigMixWholeAndFieldTransformation PASSED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessFloat32 STARTED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessFloat32 PASSED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessFloat64 STARTED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessFloat64 PASSED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessUnsupportedType STARTED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessUnsupportedType PASSED

org.apache.kafka.connect.transforms.CastTest > castWholeRecordKeySchemaless 
STARTED

org.apache.kafka.connect.transforms.CastTest > castWholeRecordKeySchemaless 
PASSED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueWithSchemaFloat32 STARTED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueWithSchemaFloat32 PASSED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueWithSchemaFloat64 STARTED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueWithSchemaFloat64 PASSED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueWithSchemaString STARTED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueWithSchemaString PASSED

org.apache.kafka.connect.transforms.CastTest > testConfigEmpty STARTED

org.apache.kafka.connect.transforms.CastTest > testConfigEmpty PASSED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessInt16 STARTED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessInt16 PASSED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessInt32 STARTED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessInt32 PASSED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessInt64 STARTED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueSchemalessInt64 PASSED

org.apache.kafka.connect.transforms.CastTest > castFieldsSchemaless STARTED

org.apache.kafka.connect.transforms.CastTest > castFieldsSchemaless PASSED

org.apache.kafka.connect.transforms.CastTest > testUnsupportedTargetType STARTED

org.apache.kafka.connect.transforms.CastTest > testUnsupportedTargetType PASSED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueWithSchemaInt16 STARTED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueWithSchemaInt16 PASSED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueWithSchemaInt32 STARTED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueWithSchemaInt32 PASSED

org.apache.kafka.connect.transforms.CastTest > 
castWholeRecordValueWithSchemaInt64 STARTED


[jira] [Resolved] (KAFKA-8214) Handling RecordTooLargeException in the main thread

2019-04-11 Thread Mohan Parthasarathy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-8214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mohan Parthasarathy resolved KAFKA-8214.

Resolution: Duplicate

> Handling RecordTooLargeException in the main thread
> ---
>
> Key: KAFKA-8214
> URL: https://issues.apache.org/jira/browse/KAFKA-8214
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.2
>Reporter: Mohan Parthasarathy
>Priority: Major
>
> How can we handle this exception in the main application ? If this task 
> incurs this exception, then it does not commit the offset and hence it goes 
> in a loop after that. This happens during aggregation process. We already 
> have a limit on the message size of the topic which is 15 MB.
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=2_6, processor=KSTREAM-SOURCE-16, 
> topic=r-detection-KSTREAM-AGGREGATE-STATE-STORE-12-repartition, 
> partition=6, offset=2049
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:367)
>   
>      
>     at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
>   
>  
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
>   
>    
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
>   
>      
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
>   
>      
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
>  
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [2_6] Abort 
> sending since an error caught with a previous record (key 
> fe80::a112:a206:bc15:8e86::743c:160:c0be:9e66&0 value [B@20dced9e 
> timestamp 1554238297629) to topic 
> -detection-KSTREAM-AGGREGATE-STATE-STORE-12-changelog due to 
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 15728866 bytes when serialized which is larger than the maximum request size 
> you have configured with the max.request.size configuration.  
>     
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:133)
>      
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:50)
>   
>  
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:192)
>   
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:915)
>   
>    
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:841)  
>   
>    
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
>   
>   
>     at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
>   
>   
>     at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66)
>   
>     at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
>   
>     at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:100)
>     
>     at 
> 

Re: [DISCUSS] KIP-439: Deprecate Interface WindowStoreIterator

2019-04-11 Thread Guozhang Wang
I like the renaming, since it also aligns with our metrics cleanup
(KIP-444) which touches upon the store level metrics as well.

One question: you seems still suggesting to keep "all" with the current
name (and also using a separate metric for it), what's the difference
between this one and other "range" functions?


Guozhang

On Thu, Apr 11, 2019 at 2:26 AM Matthias J. Sax 
wrote:

> Thanks for the input.
>
> >> Just to clarify the naming conflicts is between the newly added function
> >> and the old functions that we want to deprecate / remove right? The
>
> Yes, the conflict is just fort the existing `fetch()` methods for which
> we want to change the return type.
>
> IMHO, we should not make a breaking change in a minor release. Thus, we
> could either only deprecate those fetch methods that return
> `WindowStoreIterator` and do a "clean cut" in 3.0.
>
> Or we, follow the renaming path. No get a clean renaming, we need to
> consider all methods that are called "fetch":
>
> ReadOnlyWindowStore:
>
> > V fetch(K, long)
> > WindowStoreIterator fetch(K, Instant, Instant)
> > KeyValueIterator, V> fetch(K, K, Instant, Instant)
>
> WindowStore:
>
> > WindowStoreIterator fetch(K, long, long)
> > WindowStoreIterator fetch(K, Instant, Instant)>
> KeyValueIterator, V> fetch(K, K, long, long)
> > KeyValueIterator, V> fetch(K, K, Instant, Instant)
>
> There is also fetchAll(long, long) and fetchAll(Instant, Instant) that
> fetch over all keys.
>
> Maybe we could rename `V fetch(K, long)` to `V get(K, long)` and rename
> all `fetch()/fetchAll()` to `range()`? There is actually no reason to
> have range()/rangeAll().
>
> If we do this, we might consider to rename methods for SessionStore,
> too. There is
>
> > ReadOnlySessionStore#fetch(K)
> > ReadOnlySessionStore#fetch(K, K)
> > SessionStore#findSessions(K, long, long)
> > SessionStore#findSessions(K, K, long, long)
> > SessionStore#fetchSession(K, long, long);
>
> Consistent renaming might be:
>
> ReadOnlySessionStore#range(K)
> ReadOnlySessionStore#range(K, K)
> SessionStore#range(K, long, long)
> SessionStore#range(K, K, long, long)
> SessionStore#get(K, long, long);
>
>
> Not sure if extensive renaming might have too big of an impact. However,
> `range()` and `get()` might actually be better names than `fetch()`,
> thus, it might also provide some additional value.
>
> Thoughts?
>
>
> -Matthias
>
>
> On 3/27/19 10:19 AM, Guozhang Wang wrote:
> > Hello Matthias,
> >
> > Just to clarify the naming conflicts is between the newly added function
> > and the old functions that we want to deprecate / remove right? The
> > existing ones have different signatures with parameters so that they
> should
> > not have conflicts.
> >
> > I was thinking about just make the change directly without deprecating
> > existing ones which would require users of 2.3 to make code changes --
> this
> > code change looks reasonably straight-forward to me and not much worth
> > deferring to later when the deprecated ones are removed.
> >
> > On the other hand, just deprecating "WindowIterator" without add new
> > functions seems not very useful for users either since it is only used as
> > an indicator but users cannot make code changes during this phase
> anyways,
> > so it is still a `one-cut` deal when we eventually remove the deprecated
> > ones and add the new one.
> >
> > Hence I'm slightly inclining to trade compatibility and replace it with
> new
> > functions in one release, but if people have a good idea of the renaming
> > approach (I do not have a good one on top of my head :) I can also be
> > convinced that way.
> >
> > Guozhang
> >
> >
> > On Mon, Mar 11, 2019 at 10:41 AM Matthias J. Sax 
> > wrote:
> >
> >> I am open to change the return type to
> >>
> >> KeyValueIterator, V>
> >>
> >> However, this requires to rename
> >>
> >> #fetch(K key, long startTimestamp, long endTimestamp)
> >> #fetch(K key, Instant startTimestamp, Instant endTimestamp)
> >>
> >> to avoid naming conflicts.
> >>
> >> What new name would you suggest? The existing methods are called
> >> `fetch()`, `fetchAll()`, `all()`, `put()`.
> >>
> >> While I think it would be good to get fully aligned return types, I am
> >> not sure how we can get aligned method names (without renaming all of
> >> them...)? If we think it's worth to rename all to get this cleaned up, I
> >> am no opposed.
> >>
> >>
> >> Thoughts?
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 3/11/19 10:27 AM, Guozhang Wang wrote:
> >>> I was thinking about changing the return type even, to
> >>> `KeyValueIterator, V>` since it is confusing to users about
> >> the
> >>> key typed `Long` (Streams javadoc today did not explain it clearly
> >> either),
> >>> note it is not backward compatible at all.
> >>>
> >>> Personally I'd prefer to just deprecate the API and new new ones that
> >>> return `KeyValueIterator, V>` directly, but if most people
> >> felt
> >>> it is too intrusive for compatibility I can be convinced with
> >>> `KeyValueIterator` as 

Re: [DISCUSS] KIP-451: Make TopologyTestDriver output iterable

2019-04-11 Thread Matthias J. Sax
Thanks for the KIP!

Overall, this makes sense and can simplify testing.

What I am wondering is, why you suggest to return an `Iterable`? Maybe
returning an `Iterator` would make more sense? Or a List? Note that the
order of emits matters, thus returning a generic `Collection` would not
seem to be appropriate.

Can you elaborate on the advantages to use `Iterable` compared to the
other options?



-Matthias

On 4/11/19 2:09 AM, Patrik Kleindl wrote:
> Hi everyone,
> 
> I would like to start the discussion on this small enhancement of
> the TopologyTestDriver.
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-451%3A+Make+TopologyTestDriver+output+iterable
> 
> Pull request is available at https://github.com/apache/kafka/pull/6556
> 
> Any feedback is welcome
> 
> best regards
> 
> Patrik
> 



signature.asc
Description: OpenPGP digital signature


Re: Kafka KIP Wiki Page not editable with Google Chrome

2019-04-11 Thread Matthias J. Sax
I don't have issues.

Maybe clear the browser cache?


-Matthias

On 4/11/19 2:40 AM, Patrik Kleindl wrote:
> Hi
> 
> Does anyone else have a problem editing
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> or creating a KIP using Google Chrome (latest update, on MacOS)?
> 
> The page hangs forever and only shows an error message.
> 
> If I do the same in Firefox everything works like a charm.
> 
> best regards
> 
> Patrik
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Created] (KAFKA-8218) IllegalStateException while accessing context in Transformer

2019-04-11 Thread JIRA
Bartłomiej Kępa created KAFKA-8218:
--

 Summary: IllegalStateException while accessing context in 
Transformer
 Key: KAFKA-8218
 URL: https://issues.apache.org/jira/browse/KAFKA-8218
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.1.1
Reporter: Bartłomiej Kępa


Custom Kotlin implementation of Transformer throws 
{code}
java.lang.IllegalStateException: This should not happen as headers() should 
only be called while a record is processed
{code}

while being plugged into the stream topology that actually works. Invocation of 
transform() method has valid arguments (Key and GenericRecord).

The exception is being thrown because in our implementation of transform we 
need to access headers from context.  


{code:java}
 override fun transform(key: String?, value: GenericRecord): KeyValue {
  val headers = context.headers()
  ...
}
 {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8182) IllegalStateException in NetworkClient.initiateConnect when handling UnknownHostException thrown from ClusterConnectionStates.connecting

2019-04-11 Thread Mark Anderson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mark Anderson resolved KAFKA-8182.
--
Resolution: Duplicate

Resolving as I've confirmed this is a duplicate of KAFKA-7974 and is fixed in 
2.2.1 builds.

> IllegalStateException in NetworkClient.initiateConnect when handling 
> UnknownHostException thrown from ClusterConnectionStates.connecting 
> -
>
> Key: KAFKA-8182
> URL: https://issues.apache.org/jira/browse/KAFKA-8182
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 2.2.0, 2.1.1
>Reporter: Mark Anderson
>Priority: Major
>
> When NetworkUtils.initiateConnect calls connectionStates.connecting an 
> UnknownHostException can be thrown by ClientUtils.resolve when creating a new 
> NodeConnectionState .
> In the above case the nodeState map within ClusterConnectionStates will not 
> contain an entry for the node ID.
> The catch clause within NetworkUtils.initiateConnect immediately calls 
> connectionStates.disconnected but this makes the assumption that a 
> NodeConnectionState entry exists for the node ID. This assumption is 
> incorrect when an UnknownHostException is thrown as described above and leads 
> to an IllegalStateException like the following:
> {noformat}
> java.lang.IllegalStateException: No entry found for connection 2147483645|
>   at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:339)|
>   at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:143)|
>   at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:926)|
>    at 
> org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287)|{noformat}
> This is an issue when running in K8s since if a node is marked not ready then 
> DNS resolution starts to fail when resolving the address of the broker on 
> that node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Request KIP Permissions

2019-04-11 Thread Bill Bejeck
Done

On Thu, Apr 11, 2019 at 8:40 AM Łukasz Antoniak 
wrote:

> Hi,
>
> Could you please grant me write access to KIP proposals? I intend to make
> progress on KAFKA-6393. Wiki ID: lukasz (apache.org).
>
> Best regards,
> Lukasz Antoniak
>


Request KIP Permissions

2019-04-11 Thread Łukasz Antoniak
Hi,

Could you please grant me write access to KIP proposals? I intend to make
progress on KAFKA-6393. Wiki ID: lukasz (apache.org).

Best regards,
Lukasz Antoniak


[jira] [Created] (KAFKA-8217) MockConsumer.poll executes pollTasks before checking wakeup flag

2019-04-11 Thread Kevin (JIRA)
Kevin created KAFKA-8217:


 Summary: MockConsumer.poll executes pollTasks before checking 
wakeup flag
 Key: KAFKA-8217
 URL: https://issues.apache.org/jira/browse/KAFKA-8217
 Project: Kafka
  Issue Type: Bug
  Components: unit tests
Affects Versions: 2.0.1
Reporter: Kevin


The tasks scheduled in MockConsumer.schedulePollTask seem to be for simulating 
interactions between the Consumer and the brokers, but MockConsumer.poll runs 
scheduledPollTasks before it checks the wakeup flag. Given that the 
KafkaConsumer.poll method checks the wakeup flag before doing any other logic, 
it seems like the MockConsumer should rather check wakeup before running 
scheduledPollTasks.

This makes it difficult to control exactly how many Consumer.poll invocations 
occur in a unit test when using the wakeup() pattern described in 
"Multithreaded Processing" in the KafkaConsumer docs, as gating each poll so 
that it returns only when instructed in the test requires submitting a 
scheduledPollTask that blocks until the test unblocks it. The trouble occurs 
when trying to shut down the consumer, as the poll() task needs to be unblocked 
in order to receive the WakeupException but 
https://issues.apache.org/jira/browse/KAFKA-8216 means that Consumer.poll() can 
be called many times in the race condition interval after poll has been 
unblocked but before the test has called Consumer.wakeup().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8216) MockConsumer's poll(Duration timeout) method doesn't wait for timeout

2019-04-11 Thread Kevin (JIRA)
Kevin created KAFKA-8216:


 Summary: MockConsumer's poll(Duration timeout) method doesn't wait 
for timeout
 Key: KAFKA-8216
 URL: https://issues.apache.org/jira/browse/KAFKA-8216
 Project: Kafka
  Issue Type: Bug
  Components: unit tests
Affects Versions: 2.0.0
Reporter: Kevin


While unit-testing a custom consumer class using the MockConsumer, I noticed 
that even though I'm using Consumer.poll(Duration timeout) the MockConsumer 
won't wait for the timeout before returning (and confirmed in the code). This 
weakens the unit tests, as we can't guarantee the number of times Consumer.poll 
(and the code that follows it in the loop) will be called.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Kafka KIP Wiki Page not editable with Google Chrome

2019-04-11 Thread Patrik Kleindl
Hi

Does anyone else have a problem editing
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
or creating a KIP using Google Chrome (latest update, on MacOS)?

The page hangs forever and only shows an error message.

If I do the same in Firefox everything works like a charm.

best regards

Patrik


[jira] [Resolved] (KAFKA-8201) Kafka streams repartitioning topic settings crashing multiple nodes

2019-04-11 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-8201.

Resolution: Not A Bug

> Kafka streams repartitioning topic settings crashing multiple nodes
> ---
>
> Key: KAFKA-8201
> URL: https://issues.apache.org/jira/browse/KAFKA-8201
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Anders Aagaard
>Priority: Major
>
> We had an incident in a setup using kafka streams version 2.0.0 and kafka 
> version 2.0.0 protocol version 2.0-IV1. The reason for it is a combination of 
> kafka streams defaults and a bug in kafka.
> Info about the setup: Streams application reading a log compacted input 
> topic, and performing a groupby operation requiring repartitioning.
> Kafka streams automatically creates a repartitioning topic with 24 partitions 
> and the following options:
> segment.bytes=52428800, retention.ms=9223372036854775807, 
> segment.index.bytes=52428800, cleanup.policy=delete, segment.ms=60.
>  
> This should mean we roll out a new segment when the active one reaches 50mb 
> or is older than 10 mniutes. However, the different timestamps coming into 
> the topic due to log compaction (sometimes varying in multiple days) means 
> the server will see a message which is older than segments.ms and 
> automatically trigger a new segment roll out. This causes a segment 
> explosion. Where new segments are continuously rolled out.
> There seems to be a bug report for this server side here : 
> https://issues.apache.org/jira/browse/KAFKA-4336.
> This effectively took down several nodes and a broker in our cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-439: Deprecate Interface WindowStoreIterator

2019-04-11 Thread Matthias J. Sax
Thanks for the input.

>> Just to clarify the naming conflicts is between the newly added function
>> and the old functions that we want to deprecate / remove right? The

Yes, the conflict is just fort the existing `fetch()` methods for which
we want to change the return type.

IMHO, we should not make a breaking change in a minor release. Thus, we
could either only deprecate those fetch methods that return
`WindowStoreIterator` and do a "clean cut" in 3.0.

Or we, follow the renaming path. No get a clean renaming, we need to
consider all methods that are called "fetch":

ReadOnlyWindowStore:

> V fetch(K, long)
> WindowStoreIterator fetch(K, Instant, Instant)
> KeyValueIterator, V> fetch(K, K, Instant, Instant)

WindowStore:

> WindowStoreIterator fetch(K, long, long)
> WindowStoreIterator fetch(K, Instant, Instant)> 
> KeyValueIterator, V> fetch(K, K, long, long)
> KeyValueIterator, V> fetch(K, K, Instant, Instant)

There is also fetchAll(long, long) and fetchAll(Instant, Instant) that
fetch over all keys.

Maybe we could rename `V fetch(K, long)` to `V get(K, long)` and rename
all `fetch()/fetchAll()` to `range()`? There is actually no reason to
have range()/rangeAll().

If we do this, we might consider to rename methods for SessionStore,
too. There is

> ReadOnlySessionStore#fetch(K)
> ReadOnlySessionStore#fetch(K, K)
> SessionStore#findSessions(K, long, long)
> SessionStore#findSessions(K, K, long, long)
> SessionStore#fetchSession(K, long, long);

Consistent renaming might be:

ReadOnlySessionStore#range(K)
ReadOnlySessionStore#range(K, K)
SessionStore#range(K, long, long)
SessionStore#range(K, K, long, long)
SessionStore#get(K, long, long);


Not sure if extensive renaming might have too big of an impact. However,
`range()` and `get()` might actually be better names than `fetch()`,
thus, it might also provide some additional value.

Thoughts?


-Matthias


On 3/27/19 10:19 AM, Guozhang Wang wrote:
> Hello Matthias,
> 
> Just to clarify the naming conflicts is between the newly added function
> and the old functions that we want to deprecate / remove right? The
> existing ones have different signatures with parameters so that they should
> not have conflicts.
> 
> I was thinking about just make the change directly without deprecating
> existing ones which would require users of 2.3 to make code changes -- this
> code change looks reasonably straight-forward to me and not much worth
> deferring to later when the deprecated ones are removed.
> 
> On the other hand, just deprecating "WindowIterator" without add new
> functions seems not very useful for users either since it is only used as
> an indicator but users cannot make code changes during this phase anyways,
> so it is still a `one-cut` deal when we eventually remove the deprecated
> ones and add the new one.
> 
> Hence I'm slightly inclining to trade compatibility and replace it with new
> functions in one release, but if people have a good idea of the renaming
> approach (I do not have a good one on top of my head :) I can also be
> convinced that way.
> 
> Guozhang
> 
> 
> On Mon, Mar 11, 2019 at 10:41 AM Matthias J. Sax 
> wrote:
> 
>> I am open to change the return type to
>>
>> KeyValueIterator, V>
>>
>> However, this requires to rename
>>
>> #fetch(K key, long startTimestamp, long endTimestamp)
>> #fetch(K key, Instant startTimestamp, Instant endTimestamp)
>>
>> to avoid naming conflicts.
>>
>> What new name would you suggest? The existing methods are called
>> `fetch()`, `fetchAll()`, `all()`, `put()`.
>>
>> While I think it would be good to get fully aligned return types, I am
>> not sure how we can get aligned method names (without renaming all of
>> them...)? If we think it's worth to rename all to get this cleaned up, I
>> am no opposed.
>>
>>
>> Thoughts?
>>
>>
>> -Matthias
>>
>>
>> On 3/11/19 10:27 AM, Guozhang Wang wrote:
>>> I was thinking about changing the return type even, to
>>> `KeyValueIterator, V>` since it is confusing to users about
>> the
>>> key typed `Long` (Streams javadoc today did not explain it clearly
>> either),
>>> note it is not backward compatible at all.
>>>
>>> Personally I'd prefer to just deprecate the API and new new ones that
>>> return `KeyValueIterator, V>` directly, but if most people
>> felt
>>> it is too intrusive for compatibility I can be convinced with
>>> `KeyValueIterator` as well.
>>>
>>> Guozhang
>>>
>>> On Mon, Mar 11, 2019 at 10:17 AM Sophie Blee-Goldman <
>> sop...@confluent.io>
>>> wrote:
>>>
 I remember thinking this while working on window stores, am definitely
>> for
 it.

 On Mon, Mar 11, 2019 at 9:20 AM John Roesler  wrote:

> Sounds great to me. Thanks, Matthias!
> -John
>
> On Sun, Mar 10, 2019 at 11:58 PM Matthias J. Sax <
>> matth...@confluent.io>
> wrote:
>
>> Hi,
>>
>> I would like to propose KIP-439 to deprecate interface
>> `WindowStoreIterator`.
>>
>>
>>
>

>> 

[DISCUSS] KIP-451: Make TopologyTestDriver output iterable

2019-04-11 Thread Patrik Kleindl
Hi everyone,

I would like to start the discussion on this small enhancement of
the TopologyTestDriver.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-451%3A+Make+TopologyTestDriver+output+iterable

Pull request is available at https://github.com/apache/kafka/pull/6556

Any feedback is welcome

best regards

Patrik


Re: Kafka Jenkins not showing recent builds in history

2019-04-11 Thread Sönke Liebau
Quick update: everything seems to be back to normal.

On Mon, Apr 8, 2019 at 9:57 AM Sönke Liebau 
wrote:

> I've opened an INFRA ticket about this [1]. Apparently, it is a known
> issue that is solved by a restart of Jenkins. One will be scheduled in the
> near future, I'll update the list once the restart has happened.
>
> Best regards,
> Sönke
>
> [1] https://issues.apache.org/jira/browse/INFRA-18161
>
> On Wed, Apr 3, 2019 at 10:41 PM Sönke Liebau 
> wrote:
>
>> Hi everybody,
>>
>> I looked through recent Jenkins builds for a while today and it somehow
>> looks off to me.
>>
>> Both jobs [1] [2] don't show any builds that are more recent than March
>> 19th in the "build history".
>> Only the "last failed" and "last unsuccessful" permalinks show recent
>> dates. Build pages can be accessed by changing the build id in the link
>> though.
>>
>> That seems weird to me, I would have expected builds to how up in the
>> history, no matter if they were successful or not.
>>
>> Can someone shed some light on this for me? I am probably missing
>> something obvious.
>>
>> Best regards,
>> Sönke
>>
>>
>>
>> [1] https://builds.apache.org/job/kafka-pr-jdk11-scala2.12
>> [2] https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/
>>
>
>
> --
> Sönke Liebau
> Partner
> Tel. +49 179 7940878
> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
>


-- 
Sönke Liebau
Partner
Tel. +49 179 7940878
OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany


Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-04-11 Thread Matthias J. Sax
Thanks for driving the discussion of this KIP. It seems that everybody
agrees that the current branch() method using arrays is not optimal.

I had a quick look into the PR and I like the overall proposal. There
are some minor things we need to consider. I would recommend the
following renaming:

KStream#branch() -> #split()
KBranchedStream#addBranch() -> BranchingKStream#branch()
KBranchedStream#defaultBranch() -> BranchingKStream#default()

It's just a suggestion to get slightly shorter method names.

In the current PR, defaultBranch() does take an `Predicate` as argument,
but I think that is not required?

Also, we should consider KIP-307, that was recently accepted and is
currently implemented:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL

Ie, we should add overloads that accepted a `Named` parameter.


For the issue that the created `KStream` object are in different scopes:
could we extend `KBranchedStream` with a `get(int index)` method that
returns the corresponding "branched" result `KStream` object? Maybe, the
second argument of `addBranch()` should not be a `Consumer` but
a `Function` and `get()` could return whatever the
`Function` returns?


Finally, I would also suggest to update the KIP with the current
proposal. That makes it easier to review.


-Matthias



On 3/31/19 12:22 PM, Paul Whalen wrote:
> Ivan,
> 
> I'm a bit of a novice here as well, but I think it makes sense for you to
> revise the KIP and continue the discussion.  Obviously we'll need some
> buy-in from committers that have actual binding votes on whether the KIP
> could be adopted.  It would be great to hear if they think this is a good
> idea overall.  I'm not sure if that happens just by starting a vote, or if
> there is generally some indication of interest beforehand.
> 
> That being said, I'll continue the discussion a bit: assuming we do move
> forward the solution of "stream.branch() returns KBranchedStream", do we
> deprecate "stream.branch(...) returns KStream[]"?  I would favor
> deprecating, since having two mutually exclusive APIs that accomplish the
> same thing is confusing, especially when they're fairly similar anyway.  We
> just need to be sure we're not making something impossible/difficult that
> is currently possible/easy.
> 
> Regarding my PR - I think the general structure would work, it's just a
> little sloppy overall in terms of naming and clarity. In particular,
> passing in the "predicates" and "children" lists which get modified in
> KBranchedStream but read from all the way KStreamLazyBranch is a bit
> complicated to follow.
> 
> Thanks,
> Paul
> 
> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev  wrote:
> 
>> Hi Paul!
>>
>> I read your code carefully and now I am fully convinced: your proposal
>> looks better and should work. We just have to document the crucial fact
>> that KStream consumers are invoked as they're added. And then it's all
>> going to be very nice.
>>
>> What shall we do now? I should re-write the KIP and resume the
>> discussion here, right?
>>
>> Why are you telling that your PR 'should not be even a starting point if
>> we go in this direction'? To me it looks like a good starting point. But
>> as a novice in this project I might miss some important details.
>>
>> Regards,
>>
>> Ivan
>>
>>
>> 28.03.2019 17:38, Paul Whalen пишет:
>>> Ivan,
>>>
>>> Maybe I’m missing the point, but I believe the stream.branch() solution
>> supports this. The couponIssuer::set* consumers will be invoked as they’re
>> added, not during streamsBuilder.build(). So the user still ought to be
>> able to call couponIssuer.coupons() afterward and depend on the branched
>> streams having been set.
>>>
>>> The issue I mean to point out is that it is hard to access the branched
>> streams in the same scope as the original stream (that is, not inside the
>> couponIssuer), which is a problem with both proposed solutions. It can be
>> worked around though.
>>>
>>> [Also, great to hear additional interest in 401, I’m excited to hear
>> your thoughts!]
>>>
>>> Paul
>>>
 On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev  wrote:

 Hi Paul!

 The idea to postpone the wiring of branches to the
>> streamsBuilder.build() also looked great for me at first glance, but ---

> the newly branched streams are not available in the same scope as each
>> other.  That is, if we wanted to merge them back together again I don't see
>> a way to do that.

 You just took the words right out of my mouth, I was just going to
>> write in details about this issue.

 Consider the example from Bill's book, p. 101: say we need to identify
>> customers who have bought coffee and made a purchase in the electronics
>> store to give them coupons.

 This is the code I usually write under these circumstances using my
>> 'brancher' class:

 @Setter
 class CouponIssuer{
private KStream<> coffePurchases;
 

Re: [DISCUSS] KIP-450: Sliding Window Aggregations in the DSL

2019-04-11 Thread Matthias J. Sax
Thanks for the KIP Sophie. Couple of comments:

It's a little unclear to me, what public API you propose. It seems you
want to add

> public class SlidingWindow extends TimeWindow {}

and

> public class SlidingWindows extends TimeWindows {} // or maybe `extends 
> Windows`

If yes, should we add corresponding public Serdes classes?

Also, can you list all newly added classes/methods explicitly in the wiki?


About the semantics of the operator.

> "Only one single window is defined at a time,"

Should this be "one window per key" instead?

I agree that both window boundaries should be inclusive. However, I am
not sure about:

> At most one record is forwarded when new data arrives

(1) For what case, no output would be produced?

(2) I think, if we advance in time, it can also happen that we emit
multiple records. If a window "slides" (not "hops"), we cannot just
advance it to the current record stream time but would need to emit more
result if records expire before the current input record is added. For
example, consider a window with size 5ms, and the following ts (all
records have the same key):

1 2 3 10 11

This should result in windows:

[1]
[1,2]
[1,2,3]
[2,3]
[3]
[10]
[10,11]

Ie, when the record with ts=10 is processed, it will trigger the
computation of [2,3], [3] and [10].


About out-of-order handling: I am wondering, if the current design that
does not allow any grace period is too restrictive. Can you elaborate
more on the motivation for this suggestions?


Can you give more details about the "simple design"? Atm, it's not clear
to me how it works. I though we always need to store all raw values. If
we only store the current aggregate, would we end up with the same
inefficient solution as using a hopping window with advance 1ms?


For the O(sqrt(N)) proposal: can you maybe add an example with concrete
bucket sizes, window size etc. The current proposal is a little unclear
to me, atm.


How are windows advance? Do you propose to advance all windows over all
keys at the same time, or would each window (per key) advance
independent from all other windows? What would be the pros/cons for both
approaches?


To add to Guozhang's comment: atm, DSL operators assume that aggregate
functions are commutative and associative. Hence, it seems ok to make
the same assumption for sliding window. Addressing holistic and
non-subtractable aggregations should be supported out of the box at some
point, too, but this would be a different KIP adding this to all
existing aggregations.


-Matthias



On 4/9/19 4:38 PM, Guozhang Wang wrote:
> Hi Sophie,
> 
> Thanks for the proposed KIP. I've made a pass over it and here are some
> thoughts:
> 
> 1. "The window size is effectively the grace and retention period". The
> grace time is defined as "the time to admit late-arriving events after the
> end of the window." hence it is the additional time beyond the window size.
> I guess your were trying to say it should be zero?
> 
> Also for retention period, it is not a notion of the window spec any more,
> but only for the window store itself. So I'd suggest talking about window
> size here, and note that store retention time cannot be controlled via
> window spec at all.
> 
> 2. In the "O(sqrt(N)) Design" you did not mention when / how to expire a
> bucket, so I'd assume you will expire one bucket as a whole when its end
> time is smaller than the current window's starting time, right?
> 
> 3. Also in your algorithm how to choose "M" seems tricky, would it be a
> configurable parameter exposed to users or is it abstracted away and only
> being selected internally?
> 
> 4. "There is some tradeoff between purely optimizing " seems incomplete
> paragraph?
> 
> 5. Meta comment: for many aggregations it is commutative and associative so
> we can require users to pass in a "substract" function as well. Given these
> two function I think we can propose two set of APIs, 1) with the adder and
> subtractor and 2) with the added only (if the aggregate logic is not comm.
> and assoc.).
> 
> We just maintain an aggregate value for each bucket (call it
> bucket_aggregate) plus for the whole window (call it total_aggregate), i.e.
> at most M + 1 values per key. We use the total_aggregate for queries, and
> each update will cause 2 writes (to the bucket and to the total aggregate).
> 
> And with 1) when expiring the oldest bucket we simply call
> subtract(total_aggregate, bucket_aggregate); with 2) when expiring the
> oldest bucket we can re-compute the total_aggregate by
> sum(bucket_aggregate) over other buckets again.
> 
> 6. Meta comment: it is reasonable to assume in practice out-of-ordering
> data is not very common, hence most of the updates will be falling into the
> latest bucket. So I'm wondering if it makes sense to always store the first
> bucket in memory while making other buckets optionally on persistent
> storage. In practice, as long as M is large enough (we probably need it to
> be large enough to have sufficiently 

Re: Request KIP permissions

2019-04-11 Thread Matthias J. Sax
Done.

On 4/11/19 1:08 AM, Patrik Kleindl wrote:
> Hi
> Please add me (pkleindl) to the list to write a KIP
> Thanks in advance and best regards
> Patrik
> 



signature.asc
Description: OpenPGP digital signature


Stream caching

2019-04-11 Thread Ярослав Когут
Hi guys.

I’ve deployed spring cloud application using docker, service is using KStream 
functionality, and I have problems with /tmp folder.

On some instances of application /tmp folder is from 3 to 6 GB. Maybe I missed 
some property?

I will be very grateful for the help.

Thanks, best regards.

Kohut Yaroslav 

Request KIP permissions

2019-04-11 Thread Patrik Kleindl
Hi
Please add me (pkleindl) to the list to write a KIP
Thanks in advance and best regards
Patrik


Re: [VOTE] KIP-339: Create a new IncrementalAlterConfigs API

2019-04-11 Thread Rajini Sivaram
Sounds good. Thanks Colin!

Regards,

Rajini

On Wed, Apr 10, 2019 at 8:22 PM Colin McCabe  wrote:

> Hi Rajini,
>
> That is a good point.  We want to keep the "transactionality" of updating
> several configs for the same ConfigResource at once.  SSL configs are a
> good example of this-- it often wouldn't make sense to change just one at
> once.  How about an input like Map> and a
> result like: Map?
>
> best,
> Colin
>
> On Mon, Apr 8, 2019, at 04:48, Rajini Sivaram wrote:
> > Hi Colin,
> >
> > I am not sure the API proposed in the KIP fits with the type of updates
> we
> > support. The old API with Map fits better and we
> > need to find a way to do something similar while still retaining the old
> > one.
> >
> > Each request should specify a collection of updates for each
> > ConfigResource with
> > results returned per-ConfigResource since that is our unit of atomicity.
> We
> > guarantee that we never do a partial update of a collection of configs
> for
> > a ConfigResource from a single request and hence we should only have one
> > input with a collection of updates and one result for each
> ConfigResource,
> > making the model obvious in the API and docs. We need something similar
> to
> > the existing method with Map, but need to change the
> > method signature so that it can co-exist with the old method,
> >
> > On Mon, Oct 1, 2018 at 8:35 PM Colin McCabe  wrote:
> >
> > > Hi all,
> > >
> > > With 3 binding +1s from myself, Ismael, and Gwen, the vote passes.
> > >
> > > Thanks, all.
> > > Colin
> > >
> > >
> > > On Fri, Sep 28, 2018, at 09:43, Colin McCabe wrote:
> > > > Hi all,
> > > >
> > > > Thanks for the discussion.  I'm going to close the vote later today
> if
> > > > there are no more comments.
> > > >
> > > > cheers,
> > > > Colin
> > > >
> > > >
> > > > On Mon, Sep 24, 2018, at 22:33, Colin McCabe wrote:
> > > > > +1 (binding)
> > > > >
> > > > > Colin
> > > > >
> > > > >
> > > > > On Mon, Sep 24, 2018, at 17:49, Ismael Juma wrote:
> > > > > > Thanks Colin. I think this is much needed and I'm +1 (binding)
> > > > > > on fixing> this issue. However, I have a few minor suggestions:
> > > > > >
> > > > > > 1. Overload alterConfigs instead of creating a new method name.
> This
> > > > > >gives> us both the short name and a path for removal of the
> > > deprecated
> > > > > > overload.> 2. Did we consider Add/Remove instead of
> Append/Subtract?
> > > > > >
> > > > > > Ismael
> > > > > >
> > > > > > On Mon, Sep 24, 2018 at 10:29 AM Colin McCabe
> > > > > >  wrote:>
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I would like to start voting on KIP-339, which creates a new
> > > > > > > IncrementalAlterConfigs API.
> > > > > > >
> > > > > > > The KIP is described here:
> > > > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+ModifyConfigs+API
> >
> > > >
> > > > > > > Previous discussion:
> > > > > > > https://sematext.com/opensee/m/Kafka/uyzND1OYRKh2RrGba1
> > > > > > >
> > > > > > > best,
> > > > > > > Colin
> > > > > > >
> > > > >
> > >
> >
>


Re: [DISCUSS] KIP-448: Add State Stores Unit Test Support to Kafka Streams Test Utils

2019-04-11 Thread Matthias J. Sax
Thanks for the KIP. Only one initial comment (Sophie mentioned this
already but I want to emphasize on it).

You state that

> These will be internal classes, so no public API/interface.

If this is the case, we don't need a KIP. However, the idea of the
original Jira is to actually make those classes public, as part of the
`streams-test-utils` package. If it's not public, developers should not
use them, because they don't have any backward compatibility guarantees.

Hence, I would suggest that the corresponding classes go into a new
package `org.apache.kafka.streams.state`.


-Matthias


On 4/9/19 8:58 PM, Bruno Cadonna wrote:
> Hi Yishun,
> 
> Thank you for the KIP.
> 
> I have a couple of comments:
> 
> 1. Could you please add an example to the KIP that demonstrates how the
> mocks should be used in a test?
> 
> 2. I am wondering, whether the MockKeyValueStore needs to be backed by an
> actual KeyValueStore (in your KIP InMemoryKeyValueStore). Would it not
> suffice to provide the mock with the entries that it has to check in case
> of input operation like put() and with the entries it has to return in case
> of an output operation like get()? In my opinion, a mock should have as
> little and as simple code as possible. A unit test should depend as little
> as possible from productive code that it does not explicitly test.
> 
> 3. I would be interested in the arguments against using a well-established
> and well-tested mock framework like EasyMock. If there are good arguments,
> they should be listed under 'Rejected Alternatives'.
> 
> 3. What is the purpose of the parameter 'time' in MockStoreFactory?
> 
> Best,
> Bruno
> 
> On Tue, Apr 9, 2019 at 11:29 AM Sophie Blee-Goldman 
> wrote:
> 
>> Hi Yishun, thanks for the KIP! I have a few initial questions/comments:
>>
>> 1) It may be useful to capture the iterator results as well (eg with a
>> MockIterator that wraps the underlying iterator and records the same way
>> the MockStore wraps/records the underlying store)
>>
>> 2) a. Where is the "persistent" variable coming from or being used? It
>> seems the MockKeyValueStore accepts it in the constructor, but only the
>> name parameter is passed when constructing a new MockKeyValueStore in
>> build() ... also, if we extend InMemoryXXXStore shouldn't this always be
>> false?
>> b. Is the idea to wrap an in-memory store for each type (key-value,
>> session, etc)? We don't (yet) offer an in-memory version of the session
>> store although it is in the works, so this will be possible -- I am more
>> wondering if it makes sense to decide this for the user or to allow them to
>> choose between in-memory or rocksDB by setting "persistent"
>>
>> 3) I'm wondering if users might want to be able to plug in their own custom
>> stores as the underlying backend...should we support this as well? WDYT?
>>
>> 4) We probably want to make these stores available through the public
>> test-utils package (maybe not the stores themselves which should be
>> internal, but should there be some kind of public API that gives access to
>> them?)
>>
>> Cheers,
>> Sophie
>>
>> On Tue, Apr 9, 2019 at 9:19 AM Yishun Guan  wrote:
>>
>>> Bumping this up again, thanks!
>>>
>>> On Fri, Apr 5, 2019, 14:36 Yishun Guan  wrote:
>>>
 Hi, bumping this up again. Thanks!

 On Tue, Apr 2, 2019, 13:07 Yishun Guan  wrote:

> Hi All,
>
> I like to start a discussion on KIP-448
> (https://cwiki.apache.org/confluence/x/SAeZBg). It is about adding
> Mock state stores and relevant components for testing purposes.
>
> Here is the JIRA: https://issues.apache.org/jira/browse/KAFKA-6460
>
> This is a rough KIP draft, review and comment are appreciated. It
> seems to be tricky and some requirements and details are still needed
> to be discussed.
>
> Thanks,
> Yishun
>

>>>
>>
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-446: Add changelog topic configuration to KTable suppress

2019-04-11 Thread Matthias J. Sax
I think that the current proposal to add `withLoggingDisabled()` and
`withLoggingEnabled(Map)` should be the best option.

IMHO there is no reason to add a WARN log. We also don't have a WARN log
when people disable logging on regular stores. As Bruno mentioned, this
might also lead to data loss, so I don't see why we should treat
suppress() different to other stores.


-Matthias

On 4/10/19 3:36 PM, Bruno Cadonna wrote:
> Hi Marteen and John,
> 
> I would opt for option 1 with an additional log message on INFO or WARN
> level, since the log file is the place where you would look first to
> understand what went wrong. I would also not adjust it when persistence
> stores are available for suppress.
> 
> I would not go for option 2 or 3, because IIUC, with
> `withLoggingDisabled()` also persistent state stores do not guarantee not
> to loose records. Persisting state stores is basically a way to optimize
> recovery in certain cases. The changelog topic is the component that
> guarantees no data loss. So regarding data loss, in my opinion, disabling
> logging on the suppression buffer is not different from disabling logging
> on other state stores. Please correct me if I am wrong.
> 
> Best,
> Bruno
> 
> On Wed, Apr 10, 2019 at 12:12 PM John Roesler  wrote:
> 
>> Thanks for the update and comments, Maarten. It would be interesting to
>> hear what others think as well.
>> -John
>>
>> On Thu, Apr 4, 2019 at 2:43 PM Maarten Duijn  wrote:
>>
>>> Thank you for the explanation regarding the internals, I have edited the
>>> KIP accordingly and updated the Javadoc. About the possible data loss
>> when
>>> altering changelog config, I think we can improve by doing (one of) the
>>> following.
>>>
>>> 1) Add a warning in the comments that clearly states what might happen
>>> when change logging is disabled and adjust it when persistent stores are
>>> added.
>>>
>>> 2) Change `withLoggingDisabled` to `minimizeLogging`. Instead of
>> disabling
>>> logging, a call to this method minimizes the topic size by aggressively
>>> removing the records emitted downstream by the suppress operator. I
>> believe
>>> this can be achieved by setting `delete.retention.ms=0` in the topic
>>> config.
>>>
>>> 3) Remove `withLoggingDisabled` from the proposal.
>>>
>>> 4) Leave both methods as-proposed, as you indicated, this is in line with
>>> the other parts of the Streams API
>>>
>>> A user might want to disable logging when downstream is not a Kafka topic
>>> but some other service that does not benefit from atleast-once-delivery
>> of
>>> the suppressed records in case of failover or rebalance.
>>> Seeing as it might cause data loss, the methods should not be used
>> lightly
>>> and I think some comments are warranted. Personally, I rely purely on
>> Kafka
>>> to prevent data loss even when a store persisted locally, so when support
>>> is added for persistent suppression, I feel the comments may stay.
>>>
>>> Maarten
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Request for contribution

2019-04-11 Thread Matthias J. Sax
Check out the link in section "Developers":
https://cwiki.apache.org/confluence/display/KAFKA/Index

-Matthias

On 4/10/19 1:16 AM, Manish G wrote:
> Hi,
> 
> I am Manish, a Java programmer by profession.
> 
> I want to learn and contribute to Kafka.
> 
> Can you please guide me to appropriate links for it?
> 
> With regards
> Manish
> 



signature.asc
Description: OpenPGP digital signature