Re: [DISCUSS] KIP-504 - Add new Java Authorizer Interface

2019-09-04 Thread Don Bosco Durai
Hi Rajini

Overall, it looks good. 

Thanks

Bosco


On 9/4/19, 3:39 AM, "Rajini Sivaram"  wrote:

Hi Colin & Don,

I have submitted a PR to help with reviewing the new API:
https://github.com/apache/kafka/pull/7293/files.

   - Authorizer.java contains the updated API and includes javadoc
   update including the threading model.
   - AclAuthorizer.scala changes in the PR should give an idea of the
   implementation changes for custom authorizers that want to continue to be
   synchronous.
   - KafkaApis.scala continues to use the API synchronously for now. It can
   be updated later.


Thank you,

Rajini


On Wed, Sep 4, 2019 at 9:38 AM Rajini Sivaram 
wrote:

> Hi Don,
>
> Thanks for your note.
>
> 1) The intention is to avoid blocking in the calling thread. We already
> have several requests that are put into a purgatory when waiting for 
remote
> communication, for example produce request waiting for replication. Since
> we have a limited number of request threads in the broker, we want to make
> progress with other requests, while one is waiting on any form of remote
> communication.
>
> 2) Async management calls will be useful with the default authorizer when
> KIP-500 removes ZK and we rely on Kafka instead. Our current ZK-based
> implementation as well as any custom implementations that don't want to be
> async will just need to return a sync'ed value. So instead of returning `
> value`, the code would just return `CompletableFuture.completedFuture
> (value)`. So it would be just a single line change in the implementation
> with the new API. The caller would treat completedFuture exactly as it
> does today, processing the request synchronously without using a 
purgatory.
>
> 3) For implementations that return a completedFuture as described in 2),
> the behaviour would remain exactly the same. No additional threads or
> purgatory will be used for this case. So there would be no performance
> penalty. For implementations that return a future that is not complete, we
> prioritise running more requests concurrently. So in a deployment with a
> large number of clients, we would improve performance by allowing other
> requests to be processed on the request threads while some are waiting for
> authorization metadata.
>
> 4) I was concerned about this too. The goal is to make the API flexible
> enough to handle large scale deployments in future when caching all
> authorization metadata in each broker is not viable. Using an async API
> that returns CompletionStage, the caller has the option to handle the
> result synchronously or asynchronously, so we don't necessarily need to
> update the calling code right away. Custom authorizers using the async API
> have full control over whether authorization is performed in-line since
> completedFuture will always be handled synchronously. We do need to
> update KafkaApis to take advantage of the asynchronous API to improve
> scale. Even though this is a big change, since we will be doing the same
> for all requests, it shouldn't be too hard to maintain since the same
> pattern will be used for all requests.
>
> Regards,
>
> Rajini
>
> On Tue, Sep 3, 2019 at 11:48 PM Don Bosco Durai  wrote:
>
>> Hi Rajini
>>
>> Help me understand this a bit more.
>>
>> 1. For all practical purpose, without authorization you can't go to the
>> next step. The calling code needs to block anyway. So should we just let
>> the implementation code do the async part?
>> 2. If you feel management calls need to be async, then we should consider
>> another set of async APIs. I don't feel we should complicate it further (
>> 3. Another concern I have is wrt performance. Kafka has been built to
>> handle 1000s per second requests. Not sure whether making it async will 
add
>> any unnecessary overhead.
>> 4. How much complication would this add on the calling side? And is it
>> worth it?
>>
>> Thanks
>>
>> Bosco
>>
>>
>> On 9/3/19, 8:50 AM, "Rajini Sivaram"  wrote:
>>
>> Hi all,
>>
>> Ismael brought up a point that it will be good to make the Authorizer
>> interface asynchronous to avoid blocking request threads during 
remote
>> operations.
>>
>> 1) Since we want to support different backends for authorization
>> metadata,
>> making createAcls() and deleteAcls() asynchronous makes sense since
>> these
>> always involve remote operations. When KIP-500 removes ZooKeeper, we
>> would
>> want to move ACLs to Kafka and async updates will avoid unnecessary
>> blocking.
>> 2) For authorize() method, we currently use cached ACLs in the

Jenkins build is back to normal : kafka-trunk-jdk11 #793

2019-09-04 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-09-04 Thread John Roesler
I see; thanks for the clarification, Guozhang.

If the prior owner of a non-logged, stateful task always reports 0, and all
other instances report nothing, then it seems like we need a special case
in the assignor to handle assigning these. I.e., the strategy of creating
"movement" standbys and waiting for them to be equally caught up to the
leader before moving the task with zero downtime doesn't work. Since no
standby tasks are possible for non-logged tasks, the other instances would
never be able to "catch up", and the algorithm would be doomed to just
continue pinning the task to its current instance, and would potentially
never be able to balance the cluster.

Unless, of course, we put in a special case that says to keep it assigned
for a while and then move it.

But in this case, how do we decide when to move it; it seems like any time
is as good as any other. And if this is the case, then we might as well
just move it right away, which is what the KIP currently proposes.

Also, it seems like this creates an asymmetry with the handling of
non-logged stores in mixed logged/non-logged tasks (case (1) above). In
that case, we agree that the algorithm will make assignments without regard
to the non-logged stores at all, and would freely move the task between two
instances that are "equally" caught up on the logged stores. But if a task
contains only non-logged stores, it sounds like we would not have this
freedom of movement, but would just keep giving it back to the instance
that previously owned it?

Regarding the latter comment, thanks for pointing out my oversight. I've
added the section "Iterative Balancing Assignments" to define this
behavior:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-IterativeBalancingAssignments
 .

Thanks!
-John

On Wed, Sep 4, 2019 at 5:58 PM Guozhang Wang  wrote:

> Hi John,
>
> I think I may mis-communicate my ideas a bit, what I meant is simply for
> case 1), let the active tasks reporting a lag of 0 instead of not reporting
> any lags at all; in addition, for other hosts not hosting this tasks, and
> hence we do not know the offset lag at all we set the
> "StatefulTasksToRankedCandidates[task][1]
> := instance" just to make sure the current host is ranked the first while
> all others are ranked equally after it. In this case we would still favor
> stickiness for not moving such tasks out of its current host unless it
> violates balance, in which case we are free to move it to any other hosts.
> Does that make sense?
>
> Also I just realized that in the update wiki page the algorithm section of
> iteratively assigning / moving tasks based on
> StatefulTasksToRankedCandidates until convergence was missing (it was there
> in the previous version), could you add it back?
>
> Guozhang
>
>
> On Wed, Sep 4, 2019 at 2:59 PM John Roesler  wrote:
>
> > Thanks for the reply, Guozhang,
> >
> > I'll start by re-stating your cases, just to make sure we're on the same
> > page...
> >
> > 1) The task is stateful, and all its stores are non-logged. For this case
> > under the proposal, we would not have any standbys and the active version
> > would actually not report any lag at all (not a lag of 0). Thus, all
> > instances would be considered equal when it comes to assignment, although
> > the assignment logic would know that the task is "heavy" because it has
> > those non-logged stores and factor that in to the overall cluster
> balance.
> >
> > 2) The task is stateful and maybe has one logged and one non-logged
> store.
> > As you say, in this case, the non-logged store would not contribute at
> all
> > to anyone's reported lag. The active task would report a lag of 0, and
> the
> > standbys would report their lag on the logged store. The assignment logic
> > would take these reported lags into account.
> >
> > It sounds like there might be some dissonance on point (1). It sounds
> like
> > you're saying we should try to assign non-logged stateful tasks back to
> an
> > instance that has previously hosted it, or maybe we should assign it back
> > to the most recent instance that hosted it, and then only reassign it if
> > the movement would affect balance. I guess I'm not opposed to this, but I
> > also don't really see the advantage. The fact is that we'd still wind up
> > migrating it much of the time, so no one could depend on it not getting
> > migrated, and at the same time we're paying a significant complexity cost
> > in the assignor to support this case. Plus, we need to encode the
> previous
> > owner of the task somehow in the wire protocol, which means we pay a
> > payload-size penalty to support it as well.
> >
> > On the other hand, we should make our assignment as stable as possible in
> > the implementation to avoid pointless "swapping" of logged and non-logged
> > stateful tasks, as well as stateless tasks when it doesn't change the
> > balance at all to do so. It seems like 

[jira] [Created] (KAFKA-8876) KafkaBasedLog does not throw exception when some partitions of the topic is offline

2019-09-04 Thread Boquan Tang (Jira)
Boquan Tang created KAFKA-8876:
--

 Summary: KafkaBasedLog does not throw exception when some 
partitions of the topic is offline
 Key: KAFKA-8876
 URL: https://issues.apache.org/jira/browse/KAFKA-8876
 Project: Kafka
  Issue Type: Bug
Reporter: Boquan Tang


Currently KafkaBasedLog does not check if *all* partitions in the topic is 
online or not, this may result it ignoring partitions that's still recovering 
and in turn report to KafkaOffsetBackingStore null offset, while in fact it 
should either wait or fail the thread to prompt retry, so the offset can be 
correctly loaded by the connector.

Specifically, we are using debezium mysql connector to replicate mysql binlog 
to kafka.
In an attempt of restarting after a cluster downage, we observed following:
{code}
2019-08-29T19:27:32Z INFO 
[org.apache.kafka.connect.storage.KafkaOffsetBackingStore] [main] Starting 
KafkaOffsetBackingStore
2019-08-29T19:27:32Z INFO [org.apache.kafka.connect.util.KafkaBasedLog] [main] 
Starting KafkaBasedLog with topic bobqueue-binlog-shovel-v1-offsets
...skipped client config logs...
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-12 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-10 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-21 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-5 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-20 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-18 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-2 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-13 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-11 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-8 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-23 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-7 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-22 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-6 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-3 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-9 to offset 0.
2019-08-29T19:27:33Z INFO 

Build failed in Jenkins: kafka-trunk-jdk8 #3886

2019-09-04 Thread Apache Jenkins Server
See 


Changes:

[bbejeck] MINOR: remove unnecessary nulllity check (#7282)

[bill] KAFKA-8861 Fix flaky

--
[...truncated 7.74 MB...]
kafka.admin.DeleteConsumerGroupsTest > testDeleteNonEmptyGroup STARTED

kafka.admin.DeleteConsumerGroupsTest > testDeleteNonEmptyGroup PASSED

kafka.admin.DeleteConsumerGroupsTest > testDeleteCmdWithMixOfSuccessAndError 
STARTED

kafka.admin.DeleteConsumerGroupsTest > testDeleteCmdWithMixOfSuccessAndError 
PASSED

kafka.admin.DeleteConsumerGroupsTest > testDeleteWithTopicOption STARTED

kafka.admin.DeleteConsumerGroupsTest > testDeleteWithTopicOption PASSED

kafka.admin.DeleteConsumerGroupsTest > testDeleteNonExistingGroup STARTED

kafka.admin.DeleteConsumerGroupsTest > testDeleteNonExistingGroup PASSED

kafka.admin.DeleteConsumerGroupsTest > testDeleteCmdNonEmptyGroup STARTED

kafka.admin.DeleteConsumerGroupsTest > testDeleteCmdNonEmptyGroup PASSED

kafka.admin.DeleteConsumerGroupsTest > testDeleteCmdNonExistingGroup STARTED

kafka.admin.DeleteConsumerGroupsTest > testDeleteCmdNonExistingGroup PASSED

kafka.admin.DeleteConsumerGroupsTest > testDeleteEmptyGroup STARTED

kafka.admin.DeleteConsumerGroupsTest > testDeleteEmptyGroup PASSED

kafka.admin.DeleteConsumerGroupsTest > testDeleteWithMixOfSuccessAndError 
STARTED

kafka.admin.DeleteConsumerGroupsTest > testDeleteWithMixOfSuccessAndError PASSED

kafka.admin.DeleteConsumerGroupsTest > 
testDeleteWithUnrecognizedNewConsumerOption STARTED

kafka.admin.DeleteConsumerGroupsTest > 
testDeleteWithUnrecognizedNewConsumerOption PASSED

kafka.admin.DeleteConsumerGroupsTest > testDeleteCmdAllGroups STARTED

kafka.admin.DeleteConsumerGroupsTest > testDeleteCmdAllGroups PASSED

kafka.admin.DeleteConsumerGroupsTest > testDeleteCmdEmptyGroup STARTED

kafka.admin.DeleteConsumerGroupsTest > testDeleteCmdEmptyGroup PASSED

kafka.admin.ReassignPartitionsCommandTest > 
shouldRemoveThrottleReplicaListBasedOnProposedAssignment STARTED

kafka.admin.ReassignPartitionsCommandTest > 
shouldRemoveThrottleReplicaListBasedOnProposedAssignment PASSED

kafka.admin.ReassignPartitionsCommandTest > testReassigningNonExistingPartition 
STARTED

kafka.admin.ReassignPartitionsCommandTest > testReassigningNonExistingPartition 
PASSED

kafka.admin.ReassignPartitionsCommandTest > 
shouldFindMovingReplicasMultipleTopics STARTED

kafka.admin.ReassignPartitionsCommandTest > 
shouldFindMovingReplicasMultipleTopics PASSED

kafka.admin.ReassignPartitionsCommandTest > 
shouldNotOverwriteExistingPropertiesWhenLimitIsAdded STARTED

kafka.admin.ReassignPartitionsCommandTest > 
shouldNotOverwriteExistingPropertiesWhenLimitIsAdded PASSED

kafka.admin.ReassignPartitionsCommandTest > 
shouldFindMovingReplicasMultipleTopicsAndPartitions STARTED

kafka.admin.ReassignPartitionsCommandTest > 
shouldFindMovingReplicasMultipleTopicsAndPartitions PASSED

kafka.admin.ReassignPartitionsCommandTest > 
shouldRemoveThrottleLimitFromAllBrokers STARTED

kafka.admin.ReassignPartitionsCommandTest > 
shouldRemoveThrottleLimitFromAllBrokers PASSED

kafka.admin.ReassignPartitionsCommandTest > shouldFindMovingReplicas STARTED

kafka.admin.ReassignPartitionsCommandTest > shouldFindMovingReplicas PASSED

kafka.admin.ReassignPartitionsCommandTest > 
shouldFindMovingReplicasMultiplePartitions STARTED

kafka.admin.ReassignPartitionsCommandTest > 
shouldFindMovingReplicasMultiplePartitions PASSED

kafka.admin.ReassignPartitionsCommandTest > 
testPartitionReassignmentNonOverlappingReplicas STARTED

kafka.admin.ReassignPartitionsCommandTest > 
testPartitionReassignmentNonOverlappingReplicas PASSED

kafka.admin.ReassignPartitionsCommandTest > 
testPartitionReassignmentWithLeaderNotInNewReplicas STARTED

kafka.admin.ReassignPartitionsCommandTest > 
testPartitionReassignmentWithLeaderNotInNewReplicas PASSED

kafka.admin.ReassignPartitionsCommandTest > 
testResumePartitionReassignmentThatWasCompleted STARTED

kafka.admin.ReassignPartitionsCommandTest > 
testResumePartitionReassignmentThatWasCompleted PASSED

kafka.admin.ReassignPartitionsCommandTest > shouldSetQuotaLimit STARTED

kafka.admin.ReassignPartitionsCommandTest > shouldSetQuotaLimit PASSED

kafka.admin.ReassignPartitionsCommandTest > 
testPartitionReassignmentWithLeaderInNewReplicas STARTED

kafka.admin.ReassignPartitionsCommandTest > 
testPartitionReassignmentWithLeaderInNewReplicas PASSED

kafka.admin.ReassignPartitionsCommandTest > 
shouldFindMovingReplicasWhenProposedIsSubsetOfExisting STARTED

kafka.admin.ReassignPartitionsCommandTest > 
shouldFindMovingReplicasWhenProposedIsSubsetOfExisting PASSED

kafka.admin.ReassignPartitionsCommandTest > shouldUpdateQuotaLimit STARTED

kafka.admin.ReassignPartitionsCommandTest > shouldUpdateQuotaLimit PASSED

kafka.admin.ReassignPartitionsCommandTest > 
shouldFindTwoMovingReplicasInSamePartition STARTED

kafka.admin.ReassignPartitionsCommandTest > 

[jira] [Created] (KAFKA-8875) CreateTopic API should check topic existence before replication factor

2019-09-04 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-8875:
--

 Summary: CreateTopic API should check topic existence before 
replication factor
 Key: KAFKA-8875
 URL: https://issues.apache.org/jira/browse/KAFKA-8875
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


If you try to create a topic and the replication factor cannot be satisfied, 
Kafka will return `INVALID_REPLICATION_FACTOR`. If the topic already exists, we 
should probably return `TOPIC_ALREADY_EXISTS` instead. You won't see this 
problem if using TopicCommand because we check existence prior to creating the 
topic.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8874) KIP-517: Add consumer metric indicating time between poll calls

2019-09-04 Thread Kevin Lu (Jira)
Kevin Lu created KAFKA-8874:
---

 Summary: KIP-517: Add consumer metric indicating time between poll 
calls
 Key: KAFKA-8874
 URL: https://issues.apache.org/jira/browse/KAFKA-8874
 Project: Kafka
  Issue Type: New Feature
  Components: consumer, metrics
Reporter: Kevin Lu
Assignee: Kevin Lu


[https://cwiki.apache.org/confluence/display/KAFKA/KIP-517%3A+Add+consumer+metric+indicating+time+between+poll+calls]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] KIP-450: Sliding Window Aggregations in the DSL

2019-09-04 Thread Guozhang Wang
Hi folks,

I've been thinking more about this KIP and my understanding is that we want
to introduce a new SlidingWindow notion for aggregation since our current
TimeWindow aggregation is not very efficient with very small steps. So I'm
wondering that rather than introducing a new implementation mechanism, what
if we just optimize the TimeWindowed aggregations where we can allow a very
small advance step (which would in practice sufficient mimic the sliding
window behavior) compared to the window length itself, e.g. a window length
of 10 minutes with 1 second advance.

I've quickly write up an alternative proposal for KIP-450 here:
https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation
Please
let me know your thoughts.


Guozhang

On Tue, Apr 16, 2019 at 3:14 PM Matthias J. Sax 
wrote:

> Thanks Sophie!
>
>
> Regarding (4), I am in favor to support both. Not sure if we can reuse
> existing window store (with enabling to store duplicates) for this case
> or not though, or if we need to design a new store to keep all raw records?
>
> Btw: for holistic aggregations, like media, we would need to support a
> different store layout for existing aggregations (time-window,
> session-window), too. Thus, if we add support for this, we might be able
> to kill two birds with one stone. Of course, we would still need new
> APIs for existing aggregations to allow users to pick between both cases.
>
> I only bring this up, because it might make sense to design the store in
> a way such that we can use it for all cases.
>
>
> About (3): atm we support wall-clock time via the corresponding
> `WallclockTimestampeExtractor`. Maybe Bill can elaborate a little bit
> more what he has in mind exactly, and why using this extractor would not
> meet the requirements for processing-time sliding windows?
>
>
> -Matthias
>
>
> On 4/16/19 10:16 AM, Guozhang Wang wrote:
> > Regarding 4): yes I agree with you that invertibility is not a common
> > property for agg-functions. Just to be clear about our current APIs: for
> > stream.aggregate we only require a single Adder function, whereas for
> > table.aggregate we require both Adder and Subtractor, but these are not
> > used to leverage any properties just that the incoming table changelog
> > stream may contain "tombstones" and hence we need to negate the effect of
> > the previous record that has been deleted by this tombstone.
> >
> > What I'm proposing is exactly having two APIs, one for Adder only (like
> > other Streams aggregations) and one for Subtractor + Adder (for agg
> > functions users think are invertible) for efficiency. Some other
> frameworks
> > (e.g. Spark) have similar options for users and will recommend using the
> > latter so that some optimization in implementation can be done.
> >
> >
> > Guozhang
> >
> > On Mon, Apr 15, 2019 at 12:29 PM Sophie Blee-Goldman <
> sop...@confluent.io>
> > wrote:
> >
> >> Thanks for the feedback Matthias and Bill. After discussing offline we
> >> realized the type of windows I originally had in mind were quite
> different,
> >> and I agree now that the semantics outlined by Matthias are the
> direction
> >> to go in here. I will update the KIP accordingly with the new semantics
> >> (and corresponding design) and restart the discussion from there.
> >>
> >> In the meantime, to respond to some other points:
> >>
> >> 1) API:
> >>
> >> I propose adding only the one class -- public class SlidingWindows
> extends
> >> Windows {} --  so I do not believe we need any new Serdes?
> It
> >> will still be a fixed size TimeWindow, but handled a bit differently.
> I've
> >> updated the KIP to state explicitly all of the classes/methods being
> added
> >>
> >> 2) Zero grace period
> >>
> >> The "zero grace period" was essentially just consequence of my original
> >> definition for sliding windows; with the new semantics we can (and
> should)
> >> allow for a nonzero grace period
> >>
> >> 3) Wall-clock time
> >>
> >> Hm, I had not considered this yet but it may be a good idea to keep in
> mind
> >> while rethinking the design. To clarify, we don't support wall-clock
> based
> >> aggregations with hopping or tumbling windows though (yet?)
> >>
> >> 4) Commutative vs associative vs invertible aggregations
> >>
> >> I agree that it's reasonable to assume commutativity and associativity,
> but
> >> that's not the same as being subtractable -- that requires
> invertibility,
> >> which is broken by a lot of very simple functions and is not, I think,
> ok
> >> to assume. However we could consider adding a separate API which also
> takes
> >> a subtractor and corresponds to a completely different implementation.
> We
> >> could also consider an API that takes a function that aggregates two
> >> aggregates together in addition to the existing aggregator (which
> >> aggregates a single value with an existing aggregate) WDYT?
> >>
> >>
> >>
> >>
> >> On Thu, Apr 11, 2019 at 1:13 AM Matthias J. Sax 
> >> wrote:
> >>
> >>> Thanks for 

[jira] [Created] (KAFKA-8873) Implement timeout for Alter/List PartitionReassignment APIs

2019-09-04 Thread Stanislav Kozlovski (Jira)
Stanislav Kozlovski created KAFKA-8873:
--

 Summary: Implement timeout for Alter/List PartitionReassignment 
APIs
 Key: KAFKA-8873
 URL: https://issues.apache.org/jira/browse/KAFKA-8873
 Project: Kafka
  Issue Type: Improvement
Reporter: Stanislav Kozlovski


In the initial implementation of KIP-455, we decided 
([https://github.com/apache/kafka/pull/7128#issuecomment-528099402)] to 
delegate the implementation of the timeout functionality to a separate task.
This is in part because the change is not trivial and because there are other 
controller RPCs which would be good to get updated with timeout functionality



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Resolved] (KAFKA-8568) MirrorMaker 2.0 resource leak

2019-09-04 Thread Ryanne Dolan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryanne Dolan resolved KAFKA-8568.
-
Resolution: Fixed

> MirrorMaker 2.0 resource leak
> -
>
> Key: KAFKA-8568
> URL: https://issues.apache.org/jira/browse/KAFKA-8568
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 2.2.2
>Reporter: Péter Gergő Barna
>Assignee: Ryanne Dolan
>Priority: Major
>
> This issue produced by the branch  KIP-382 (I am not sure which version is 
> affected by that branch).
> While MirrorMaker 2.0 is running, the following command returns a number that 
> is getting larger and larger. 
>  
> {noformat}
> lsof -p  | grep ESTABLISHED | wc -l{noformat}
>  
> Meanwhile, in the error log, NullPointers pop up from the 
> MirrorSourceTask.cleanup, because either the consumer or the producer is null 
> when the cleanup method tries to close them.
>  
> {noformat}
> Exception in thread "Thread-790" java.lang.NullPointerException
>  at 
> org.apache.kafka.connect.mirror.MirrorSourceTask.cleanup(MirrorSourceTask.java:110)
>  at java.lang.Thread.run(Thread.java:748)
> Exception in thread "Thread-792" java.lang.NullPointerException
>  at 
> org.apache.kafka.connect.mirror.MirrorSourceTask.cleanup(MirrorSourceTask.java:110)
>  at java.lang.Thread.run(Thread.java:748)
> Exception in thread "Thread-791" java.lang.NullPointerException
>  at 
> org.apache.kafka.connect.mirror.MirrorSourceTask.cleanup(MirrorSourceTask.java:116)
>  at java.lang.Thread.run(Thread.java:748)
> Exception in thread "Thread-793" java.lang.NullPointerException
>  at 
> org.apache.kafka.connect.mirror.MirrorSourceTask.cleanup(MirrorSourceTask.java:110)
>  at java.lang.Thread.run(Thread.java:748){noformat}
> When the number of the established connections (returned by lsof) reaches a 
> certain limit, new exceptions start to pop up in the logs: Too many open files
> {noformat}
> [2019-06-19 12:56:43,949] ERROR 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} failed to send record to 
> heartbeats: {} (org.apache.kafka.connect.runtime.WorkerSourceTask)
> org.apache.kafka.common.errors.SaslAuthenticationException: An error: 
> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
> GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Too many open 
> files)]) occurred when evaluating SASL token received from the Kafka Broker. 
> Kafka Client will go to A
> UTHENTICATION_FAILED state.
> Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Too many open 
> files)]
>         at 
> com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)
>         at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:461)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:461)
>         at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:370)
>         at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:290)
>         at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:230)
>         at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173)
>         at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:536)
>         at org.apache.kafka.common.network.Selector.poll(Selector.java:472)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:311)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: GSSException: No valid credentials provided (Mechanism level: Too 
> many open files)
>         at 
> sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:775)
>         at 
> sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:248)
>         at 
> sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
>         at 
> com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
>         ... 14 more
> Caused by: java.net.SocketException: Too many open files
>         at java.net.Socket.createImpl(Socket.java:460)
>         

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-09-04 Thread Guozhang Wang
Hi John,

I think I may mis-communicate my ideas a bit, what I meant is simply for
case 1), let the active tasks reporting a lag of 0 instead of not reporting
any lags at all; in addition, for other hosts not hosting this tasks, and
hence we do not know the offset lag at all we set the
"StatefulTasksToRankedCandidates[task][1]
:= instance" just to make sure the current host is ranked the first while
all others are ranked equally after it. In this case we would still favor
stickiness for not moving such tasks out of its current host unless it
violates balance, in which case we are free to move it to any other hosts.
Does that make sense?

Also I just realized that in the update wiki page the algorithm section of
iteratively assigning / moving tasks based on
StatefulTasksToRankedCandidates until convergence was missing (it was there
in the previous version), could you add it back?

Guozhang


On Wed, Sep 4, 2019 at 2:59 PM John Roesler  wrote:

> Thanks for the reply, Guozhang,
>
> I'll start by re-stating your cases, just to make sure we're on the same
> page...
>
> 1) The task is stateful, and all its stores are non-logged. For this case
> under the proposal, we would not have any standbys and the active version
> would actually not report any lag at all (not a lag of 0). Thus, all
> instances would be considered equal when it comes to assignment, although
> the assignment logic would know that the task is "heavy" because it has
> those non-logged stores and factor that in to the overall cluster balance.
>
> 2) The task is stateful and maybe has one logged and one non-logged store.
> As you say, in this case, the non-logged store would not contribute at all
> to anyone's reported lag. The active task would report a lag of 0, and the
> standbys would report their lag on the logged store. The assignment logic
> would take these reported lags into account.
>
> It sounds like there might be some dissonance on point (1). It sounds like
> you're saying we should try to assign non-logged stateful tasks back to an
> instance that has previously hosted it, or maybe we should assign it back
> to the most recent instance that hosted it, and then only reassign it if
> the movement would affect balance. I guess I'm not opposed to this, but I
> also don't really see the advantage. The fact is that we'd still wind up
> migrating it much of the time, so no one could depend on it not getting
> migrated, and at the same time we're paying a significant complexity cost
> in the assignor to support this case. Plus, we need to encode the previous
> owner of the task somehow in the wire protocol, which means we pay a
> payload-size penalty to support it as well.
>
> On the other hand, we should make our assignment as stable as possible in
> the implementation to avoid pointless "swapping" of logged and non-logged
> stateful tasks, as well as stateless tasks when it doesn't change the
> balance at all to do so. It seems like this should accomplish the same
> spiritual goal with no special cases.
>
> WDYT?
> -John
>
> On Wed, Sep 4, 2019 at 2:26 PM Guozhang Wang  wrote:
>
> > Hello John,
> >
> > Your reasoning about non-logged state store looks good to me. But I think
> > it worth considering two cases differently: 1) a task's all state stores
> > are non-logged, 2) a task has non-logged state store but also have other
> > logged state store.
> >
> > For 1), we should not have any standbys for such tasks at all, but it is
> > not the same as a stateless task, because the only active task could
> report
> > a lag of "0" while all others should logically report the lag as infinity
> > (of course, they would not encode such value).
> >
> > For 2), it is sort of the same to a normal case: active task reporting a
> > lag of 0 while standby task reporting a lag summing all other logged
> state
> > stores, meaning that the non-logged state store does not matter in our
> > assignment semantics.
> >
> > The rationale I had is to effectively favor stickiness for those tasks in
> > case 1) than arbitrarily reassign them even though it is true that for a
> > non-logged store there's no durability and hence restoration guarantees
> > anyways from Streams. Then algorithmically we may consider assigning
> tasks
> > with descending order of the number of instances reporting lags for it,
> > then for such tasks there's always one candidate reporting a lag of 0,
> and
> > assigning them to any instance does not actually change the cumulated
> total
> > lag either. At that time we can decide "if it is still within load
> balance
> > factor, then let's respect stickiness, otherwise it's free to move".
> WDYT?
> >
> >
> > Guozhang
> >
> > On Wed, Sep 4, 2019 at 8:30 AM John Roesler  wrote:
> >
> > > Hey Bruno,
> > >
> > > Thanks for taking another look. Some quick responses:
> > >
> > > 1) It just means the number of offsets in the topic. E.g., the LSO is
> > 100,
> > > but the first offset is 40 due to retention, so there are 60 offsets in
> > the
> 

Jenkins build is back to normal : kafka-2.2-jdk8 #163

2019-09-04 Thread Apache Jenkins Server
See 




[jira] [Reopened] (KAFKA-7940) Flaky Test CustomQuotaCallbackTest#testCustomQuotaCallback

2019-09-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-7940:


Failed again on `trunk` 
[https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk11/detail/kafka-trunk-jdk11/790/tests]

with "org.scalatest.exceptions.TestFailedException: Partition 
[group1_largeTopic,69] metadata not propagated after 15000 ms"

> Flaky Test CustomQuotaCallbackTest#testCustomQuotaCallback
> --
>
> Key: KAFKA-7940
> URL: https://issues.apache.org/jira/browse/KAFKA-7940
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Assignee: Stanislav Kozlovski
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.2.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/14/]
> {quote}java.lang.AssertionError: Too many quotaLimit calls Map(PRODUCE -> 1, 
> FETCH -> 1, REQUEST -> 4) at org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback(CustomQuotaCallbackTest.scala:105){quote}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Resolved] (KAFKA-8460) Flaky Test PlaintextConsumerTest#testLowMaxFetchSizeForRequestAndPartition

2019-09-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-8460.

Resolution: Duplicate

> Flaky Test  PlaintextConsumerTest#testLowMaxFetchSizeForRequestAndPartition
> ---
>
> Key: KAFKA-8460
> URL: https://issues.apache.org/jira/browse/KAFKA-8460
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Priority: Major
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/5168/consoleFull] 
>  *16:17:04* kafka.api.PlaintextConsumerTest > 
> testLowMaxFetchSizeForRequestAndPartition FAILED*16:17:04* 
> org.scalatest.exceptions.TestFailedException: Timed out before consuming 
> expected 2700 records. The number consumed was 1980.*16:17:04* at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)*16:17:04*
>  at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)*16:17:04*
>  at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)*16:17:04*
>  at org.scalatest.Assertions.fail(Assertions.scala:1091)*16:17:04* at 
> org.scalatest.Assertions.fail$(Assertions.scala:1087)*16:17:04* at 
> org.scalatest.Assertions$.fail(Assertions.scala:1389)*16:17:04* at 
> kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:789)*16:17:04* at 
> kafka.utils.TestUtils$.pollRecordsUntilTrue(TestUtils.scala:765)*16:17:04* at 
> kafka.api.AbstractConsumerTest.consumeRecords(AbstractConsumerTest.scala:156)*16:17:04*
>  at 
> kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition(PlaintextConsumerTest.scala:801)*16:17:04*



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-09-04 Thread John Roesler
Thanks for the reply, Guozhang,

I'll start by re-stating your cases, just to make sure we're on the same
page...

1) The task is stateful, and all its stores are non-logged. For this case
under the proposal, we would not have any standbys and the active version
would actually not report any lag at all (not a lag of 0). Thus, all
instances would be considered equal when it comes to assignment, although
the assignment logic would know that the task is "heavy" because it has
those non-logged stores and factor that in to the overall cluster balance.

2) The task is stateful and maybe has one logged and one non-logged store.
As you say, in this case, the non-logged store would not contribute at all
to anyone's reported lag. The active task would report a lag of 0, and the
standbys would report their lag on the logged store. The assignment logic
would take these reported lags into account.

It sounds like there might be some dissonance on point (1). It sounds like
you're saying we should try to assign non-logged stateful tasks back to an
instance that has previously hosted it, or maybe we should assign it back
to the most recent instance that hosted it, and then only reassign it if
the movement would affect balance. I guess I'm not opposed to this, but I
also don't really see the advantage. The fact is that we'd still wind up
migrating it much of the time, so no one could depend on it not getting
migrated, and at the same time we're paying a significant complexity cost
in the assignor to support this case. Plus, we need to encode the previous
owner of the task somehow in the wire protocol, which means we pay a
payload-size penalty to support it as well.

On the other hand, we should make our assignment as stable as possible in
the implementation to avoid pointless "swapping" of logged and non-logged
stateful tasks, as well as stateless tasks when it doesn't change the
balance at all to do so. It seems like this should accomplish the same
spiritual goal with no special cases.

WDYT?
-John

On Wed, Sep 4, 2019 at 2:26 PM Guozhang Wang  wrote:

> Hello John,
>
> Your reasoning about non-logged state store looks good to me. But I think
> it worth considering two cases differently: 1) a task's all state stores
> are non-logged, 2) a task has non-logged state store but also have other
> logged state store.
>
> For 1), we should not have any standbys for such tasks at all, but it is
> not the same as a stateless task, because the only active task could report
> a lag of "0" while all others should logically report the lag as infinity
> (of course, they would not encode such value).
>
> For 2), it is sort of the same to a normal case: active task reporting a
> lag of 0 while standby task reporting a lag summing all other logged state
> stores, meaning that the non-logged state store does not matter in our
> assignment semantics.
>
> The rationale I had is to effectively favor stickiness for those tasks in
> case 1) than arbitrarily reassign them even though it is true that for a
> non-logged store there's no durability and hence restoration guarantees
> anyways from Streams. Then algorithmically we may consider assigning tasks
> with descending order of the number of instances reporting lags for it,
> then for such tasks there's always one candidate reporting a lag of 0, and
> assigning them to any instance does not actually change the cumulated total
> lag either. At that time we can decide "if it is still within load balance
> factor, then let's respect stickiness, otherwise it's free to move". WDYT?
>
>
> Guozhang
>
> On Wed, Sep 4, 2019 at 8:30 AM John Roesler  wrote:
>
> > Hey Bruno,
> >
> > Thanks for taking another look. Some quick responses:
> >
> > 1) It just means the number of offsets in the topic. E.g., the LSO is
> 100,
> > but the first offset is 40 due to retention, so there are 60 offsets in
> the
> > topic. Further, the lag on that topic would be considered to be 60 for
> any
> > task that hadn't previously done any work on it.
> >
> > 2) This is undecidable in general. I.e., there's no way we can know
> whether
> > the store is remote or not, and hence whether we can freely assign it to
> > another instance, or whether we have to keep it on the same instance.
> > However, there are a couple of reasons to go ahead and assume we have the
> > freedom to move such tasks.
> > * We know that nothing can prevent the loss of an instance in a cluster
> > (I.e., this is true of all cloud environments, as well as any managed
> > virtualized cluster like mesos or kubernetes), so any Streams program
> that
> > makes use of non-remote, non-logged state is doomed to lose its state
> when
> > it loses an instance.
> > * If we take on a restriction that we cannot move such state between
> > instances, we'd become overconstrained very quickly. Effectively, if you
> > made use of non-logged stores, and we didn't assume freedom of movement,
> > then we couldn't make use of any new instances in your cluster.
> > * On the 

[jira] [Reopened] (KAFKA-8677) Flakey test GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl

2019-09-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-8677:


This test is still failing: 
[https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk11/detail/kafka-trunk-jdk11/788/tests]

> Flakey test 
> GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
> 
>
> Key: KAFKA-8677
> URL: https://issues.apache.org/jira/browse/KAFKA-8677
> Project: Kafka
>  Issue Type: Bug
>  Components: core, security, unit tests
>Affects Versions: 2.4.0
>Reporter: Boyang Chen
>Assignee: Anastasia Vela
>Priority: Major
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6325/console]
>  
> *18:43:39* kafka.api.GroupEndToEndAuthorizationTest > 
> testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl STARTED*18:44:00* 
> kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl.test.stdout*18:44:00*
>  *18:44:00* kafka.api.GroupEndToEndAuthorizationTest > 
> testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl FAILED*18:44:00* 
> org.scalatest.exceptions.TestFailedException: Consumed 0 records before 
> timeout instead of the expected 1 records



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] KIP-486 Support for pluggable KeyStore and TrustStore

2019-09-04 Thread Maulin Vasavada
Do you guys think it would be easier if you can provide comments on GitHub
and we can continue there and summarize the conclusion here?

We should not lose addressing any comments.

On Wed, Sep 4, 2019 at 12:34 PM Pellerin, Clement 
wrote:

> The proposed interface does not look like the Builder pattern I am used to.
> Should SslEngineBuilder be called SslEngineFactory instead?
>
> On Mon, Sep 2, 2019, at 03:33, Rajini Sivaram wrote:
> > I would expect SslEngineBuilder interface to look something like this,
> > perhaps with some tweaking:
> >
> > public interface SslEngineBuilder extends Configurable, Closeable {
> >
> > Set reconfigurableConfigs();
> >
> > boolean shouldBeRebuilt(Map nextConfigs);
> >
> > SSLEngine createSslEngine(Mode mode, String peerHost, int
> > peerPort, String endpointIdentification);
> >
> > }
>


Re: [VOTE] KIP-482: The Kafka Protocol should Support Optional Tagged Fields

2019-09-04 Thread Colin McCabe
On Wed, Sep 4, 2019, at 13:01, Jason Gustafson wrote:
> Hi Colin,
> 
> Just a couple questions.
> 
> 1. I think we discussed that we would do a lazy version bump of all
> protocols in order to get flexible version support. Can you add that to the
> KIP?

Good point.  Added.

> 2. The doc mentions a bump to the request and response header formats to
> version 1. Currently there is no formal header version. It wasn't clear to
> me if you were suggesting that we create a header version as part of the
> schema or if this was just an informal way to refer to the header in
> "flexible version" requests. Can you clarify?

I think we should have a formal header version.  However, we can deduce which 
header version we should use based on the apiKey and apiVersion, so no changes 
will be needed to what is sent over the wire.

Having a new header version will let us add new fields to request and response 
headers.  In particular, having a flexible header version will let us add 
tagged fields, which will be useful for adding things like a message traceID. 
 As another example, ThrottleTimeMs would have made more sense in the response 
header than as an addition to every message.

cheers,
Colin

> 
> Thanks,
> Jason
> 
> On Wed, Sep 4, 2019 at 8:14 AM David Arthur  wrote:
> 
> > +1 binding.
> >
> > Thanks for the KIP, Colin!
> >
> > -David
> >
> > On Wed, Sep 4, 2019 at 5:40 AM Harsha Chintalapani 
> > wrote:
> >
> > > LGTM. +1 (binding)
> > > -Harsha
> > >
> > >
> > > On Wed, Sep 04, 2019 at 1:46 AM, Satish Duggana <
> > satish.dugg...@gmail.com>
> > > wrote:
> > >
> > > > +1 (non-binding) Thanks for the nice KIP.
> > > >
> > > > You may want to update the KIP saying that optional tagged fields do
> > not
> > > > support complex types(or structs).
> > > >
> > > > On Wed, Sep 4, 2019 at 3:43 AM Jose Armando Garcia Sancio
> > > >  wrote:
> > > >
> > > > +1 (non-binding)
> > > >
> > > > Looking forward to this improvement.
> > > >
> > > > On Tue, Sep 3, 2019 at 12:49 PM David Jacot 
> > wrote:
> > > >
> > > > +1 (non-binding)
> > > >
> > > > Thank for the KIP. Great addition to the Kafka protocol!
> > > >
> > > > Best,
> > > > David
> > > >
> > > > Le mar. 3 sept. 2019 à 19:17, Colin McCabe  a
> > écrit
> > > :
> > > >
> > > > Hi all,
> > > >
> > > > I'd like to start the vote for KIP-482: The Kafka Protocol should
> > Support
> > > > Optional Tagged Fields.
> > > >
> > > > KIP:
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/
> > > > KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields
> > > >
> > > > Discussion thread here:
> > > >
> > > > https://lists.apache.org/thread.html/
> > > > cdc801ae886491b73ef7efecac7ef81b24382f8b6b025899ee343f7a@%3Cdev.kafka.
> > > > apache.org%3E
> > > >
> > > > best,
> > > > Colin
> > > >
> > > > --
> > > > -Jose
> > > >
> > > >
> > >
> >
> >
> > --
> > David Arthur
> >
>


Re: [VOTE] KIP-482: The Kafka Protocol should Support Optional Tagged Fields

2019-09-04 Thread Jason Gustafson
Hi Colin,

Just a couple questions.

1. I think we discussed that we would do a lazy version bump of all
protocols in order to get flexible version support. Can you add that to the
KIP?
2. The doc mentions a bump to the request and response header formats to
version 1. Currently there is no formal header version. It wasn't clear to
me if you were suggesting that we create a header version as part of the
schema or if this was just an informal way to refer to the header in
"flexible version" requests. Can you clarify?

Thanks,
Jason

On Wed, Sep 4, 2019 at 8:14 AM David Arthur  wrote:

> +1 binding.
>
> Thanks for the KIP, Colin!
>
> -David
>
> On Wed, Sep 4, 2019 at 5:40 AM Harsha Chintalapani 
> wrote:
>
> > LGTM. +1 (binding)
> > -Harsha
> >
> >
> > On Wed, Sep 04, 2019 at 1:46 AM, Satish Duggana <
> satish.dugg...@gmail.com>
> > wrote:
> >
> > > +1 (non-binding) Thanks for the nice KIP.
> > >
> > > You may want to update the KIP saying that optional tagged fields do
> not
> > > support complex types(or structs).
> > >
> > > On Wed, Sep 4, 2019 at 3:43 AM Jose Armando Garcia Sancio
> > >  wrote:
> > >
> > > +1 (non-binding)
> > >
> > > Looking forward to this improvement.
> > >
> > > On Tue, Sep 3, 2019 at 12:49 PM David Jacot 
> wrote:
> > >
> > > +1 (non-binding)
> > >
> > > Thank for the KIP. Great addition to the Kafka protocol!
> > >
> > > Best,
> > > David
> > >
> > > Le mar. 3 sept. 2019 à 19:17, Colin McCabe  a
> écrit
> > :
> > >
> > > Hi all,
> > >
> > > I'd like to start the vote for KIP-482: The Kafka Protocol should
> Support
> > > Optional Tagged Fields.
> > >
> > > KIP:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/
> > > KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields
> > >
> > > Discussion thread here:
> > >
> > > https://lists.apache.org/thread.html/
> > > cdc801ae886491b73ef7efecac7ef81b24382f8b6b025899ee343f7a@%3Cdev.kafka.
> > > apache.org%3E
> > >
> > > best,
> > > Colin
> > >
> > > --
> > > -Jose
> > >
> > >
> >
>
>
> --
> David Arthur
>


Build failed in Jenkins: kafka-trunk-jdk11 #792

2019-09-04 Thread Apache Jenkins Server
See 


Changes:

[bbejeck] MINOR: remove unnecessary nulllity check (#7282)

[bill] KAFKA-8861 Fix flaky

--
[...truncated 2.62 MB...]
org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest > 
testStartTwoConnectors SKIPPED

org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest > 
testDeleteConnector STARTED

org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest > 
testDeleteConnector SKIPPED

org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest > 
testAddingWorker STARTED

org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest > 
testAddingWorker PASSED

org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest > 
testRemovingWorker STARTED

org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest > 
testRemovingWorker PASSED

org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest > 
testReconfigConnector STARTED

org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest > 
testReconfigConnector SKIPPED

org.apache.kafka.connect.integration.StartAndStopLatchTest > 
shouldReturnFalseWhenAwaitingForDependentLatchToComplete STARTED

org.apache.kafka.connect.integration.StartAndStopLatchTest > 
shouldReturnFalseWhenAwaitingForDependentLatchToComplete PASSED

org.apache.kafka.connect.integration.StartAndStopLatchTest > 
shouldReturnTrueWhenAwaitingForStartAndStopAndDependentLatch STARTED

org.apache.kafka.connect.integration.StartAndStopLatchTest > 
shouldReturnTrueWhenAwaitingForStartAndStopAndDependentLatch PASSED

org.apache.kafka.connect.integration.StartAndStopLatchTest > 
shouldReturnFalseWhenAwaitingForStartToNeverComplete STARTED

org.apache.kafka.connect.integration.StartAndStopLatchTest > 
shouldReturnFalseWhenAwaitingForStartToNeverComplete PASSED

org.apache.kafka.connect.integration.StartAndStopLatchTest > 
shouldReturnFalseWhenAwaitingForStopToNeverComplete STARTED

org.apache.kafka.connect.integration.StartAndStopLatchTest > 
shouldReturnFalseWhenAwaitingForStopToNeverComplete PASSED

org.apache.kafka.connect.integration.StartAndStopLatchTest > 
shouldReturnTrueWhenAwaitingForStartAndStopToComplete STARTED

org.apache.kafka.connect.integration.StartAndStopLatchTest > 
shouldReturnTrueWhenAwaitingForStartAndStopToComplete PASSED

> Task :streams:streams-scala:test

org.apache.kafka.streams.scala.WordCountTest > testShouldCountWordsMaterialized 
STARTED

org.apache.kafka.streams.scala.WordCountTest > testShouldCountWordsMaterialized 
PASSED

org.apache.kafka.streams.scala.WordCountTest > testShouldCountWordsJava STARTED

org.apache.kafka.streams.scala.WordCountTest > testShouldCountWordsJava PASSED

org.apache.kafka.streams.scala.WordCountTest > testShouldCountWords STARTED

org.apache.kafka.streams.scala.WordCountTest > testShouldCountWords PASSED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionWithNamedRepartitionTopic STARTED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionWithNamedRepartitionTopic PASSED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionJava STARTED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionJava PASSED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegion STARTED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegion PASSED

org.apache.kafka.streams.scala.TopologyTest > 
shouldBuildIdenticalTopologyInJavaNScalaJoin STARTED

org.apache.kafka.streams.scala.TopologyTest > 
shouldBuildIdenticalTopologyInJavaNScalaJoin PASSED

org.apache.kafka.streams.scala.TopologyTest > 
shouldBuildIdenticalTopologyInJavaNScalaSimple STARTED

org.apache.kafka.streams.scala.TopologyTest > 
shouldBuildIdenticalTopologyInJavaNScalaSimple PASSED

org.apache.kafka.streams.scala.TopologyTest > 
shouldBuildIdenticalTopologyInJavaNScalaAggregate STARTED

org.apache.kafka.streams.scala.TopologyTest > 
shouldBuildIdenticalTopologyInJavaNScalaAggregate PASSED

org.apache.kafka.streams.scala.TopologyTest > 
shouldBuildIdenticalTopologyInJavaNScalaProperties STARTED

org.apache.kafka.streams.scala.TopologyTest > 
shouldBuildIdenticalTopologyInJavaNScalaProperties PASSED

org.apache.kafka.streams.scala.TopologyTest > 
shouldBuildIdenticalTopologyInJavaNScalaTransform STARTED

org.apache.kafka.streams.scala.TopologyTest > 
shouldBuildIdenticalTopologyInJavaNScalaTransform PASSED

org.apache.kafka.streams.scala.kstream.MaterializedTest > Create a Materialized 
should create a Materialized 

RE: [DISCUSS] KIP-486 Support for pluggable KeyStore and TrustStore

2019-09-04 Thread Pellerin, Clement
The proposed interface does not look like the Builder pattern I am used to.
Should SslEngineBuilder be called SslEngineFactory instead?

On Mon, Sep 2, 2019, at 03:33, Rajini Sivaram wrote:
> I would expect SslEngineBuilder interface to look something like this,
> perhaps with some tweaking:
> 
> public interface SslEngineBuilder extends Configurable, Closeable {
> 
> Set reconfigurableConfigs();
> 
> boolean shouldBeRebuilt(Map nextConfigs);
> 
> SSLEngine createSslEngine(Mode mode, String peerHost, int
> peerPort, String endpointIdentification);
> 
> }


Re: [DISCUSS] KIP-444: Augment Metrics for Kafka Streams

2019-09-04 Thread Bruno Cadonna
Hi,

I am sorry to restart the discussion here, but I came across a small
issue in the KIP.

I started to implement KIP-444 and I am bit concerned about the values
for the the config `built.in.metrics.version`. In the KIP the possible
values are specified as all Kafka Streams versions. I think that this
set of values is really hard to maintain in the code and it also blows
up the testing burden unnecessarily because all versions need to be
tested. My proposal (backed by John) is to use the following values:
- `latest` for the latest version of the metrics
- `0.10.0-2.3` for the version before `latest`
If in future, let's say in version 4.1, we need again to change the
metrics, we would add `2.4-4.0` to the values of the config. With
major versions, we could also get rid of some values.

WDYT?

You can also have a look at the PR
https://github.com/apache/kafka/pull/7279 to see this in code.

Best,
Bruno

On Tue, Aug 20, 2019 at 8:29 PM Guozhang Wang  wrote:
>
> Hello Bruno,
>
> I've updated the wiki page again per your comments, here's a brief summary:
>
> 1. added the list of removed metrics.
> 2. added a task-level INFO metric "dropped-records" that covers all
> scenarios and merges in the existing "late-records-drop",
> "skipped-records", and "expired-window-records-drop".
> 3. renamed the util functions of StreamsMetrics as `addLatencyRateTotal`
> and `addRateTotal` sensors.
>
>
> Since I feel it has incorporated all of your comments I'm going to start
> the vote thread for this KIP now.
>
>
> Guozhang
>
>
> On Tue, Aug 20, 2019 at 9:59 AM Guozhang Wang  wrote:
>
> > Hi Bruno,
> >
> > No it was not intentional, and we can definitely add the total amount
> > sensor as well -- they are just util functions to save users some lines of
> > code anyways, and should be straightforward.
> >
> > Guozhang
> >
> >
> > On Tue, Aug 20, 2019 at 1:05 AM Bruno Cadonna  wrote:
> >
> >> Hi Guozhang,
> >>
> >> I totally missed the total invocation count metric in the javadoc.
> >> Which brings me to a follow-up question. Should the names of the
> >> methods reflect the included total invocation count? We have to rename
> >> them anyways. One option would be to simply add `Total` to the method
> >> names, i.e., `addLatencyAndRateAndTotalSensor` and
> >> `addRateAndTotalSensor` (alternatively without the `And`s). Since
> >> those sensors record exclusively invocations, another option would be
> >> `addInvocationSensor` and `addInvocationSensorWithoutLatency`.
> >>
> >> As far as I can see, we have sensors to record invocations but none to
> >> record amounts. Is that intentional? No need to add it to this KIP, I
> >> am just curious.
> >>
> >> Best,
> >> Bruno
> >>
> >> On Tue, Aug 20, 2019 at 1:13 AM Guozhang Wang  wrote:
> >> >
> >> > Hi Bruno,
> >> >
> >> > Just realized that for `addRateSensor` and `addLatencyAndRateSensor`
> >> we've
> >> > actually added the total invocation metric already.
> >> >
> >> >
> >> > Guozhang
> >> >
> >> > On Mon, Aug 19, 2019 at 4:11 PM Guozhang Wang 
> >> wrote:
> >> >
> >> > > Hi Bruno,
> >> > >
> >> > >
> >> > > On Tue, Aug 6, 2019 at 1:51 AM Bruno Cadonna 
> >> wrote:
> >> > >
> >> > >> Hi Guozhang,
> >> > >>
> >> > >> I left my comments inline.
> >> > >>
> >> > >> On Thu, Jul 18, 2019 at 8:28 PM Guozhang Wang 
> >> wrote:
> >> > >> >
> >> > >> > Hello Bruno,
> >> > >> >
> >> > >> > Thanks for the feedbacks, replied inline.
> >> > >> >
> >> > >> > On Mon, Jul 1, 2019 at 7:08 AM Bruno Cadonna 
> >> > >> wrote:
> >> > >> >
> >> > >> > > Hi Guozhang,
> >> > >> > >
> >> > >> > > Thank you for the KIP.
> >> > >> > >
> >> > >> > > 1) As far as I understand, the StreamsMetrics interface is there
> >> for
> >> > >> > > user-defined processors. Would it make sense to also add a
> >> method to
> >> > >> > > the interface to specify a sensor that records skipped records?
> >> > >> > >
> >> > >> > > Not sure I follow.. if users want to add a specific skipped
> >> records
> >> > >> > sensor, she can still do that as a "throughput" sensor via "
> >> > >> > addThroughputSensor" and then "record" right?
> >> > >> >
> >> > >> > As an after-thought, maybe it's better to rename `throughput` to
> >> `rate`
> >> > >> in
> >> > >> > the public APIs since it is really meant for the latter semantics.
> >> I did
> >> > >> > not change it just to make less API changes / deprecate fewer
> >> functions.
> >> > >> > But if we feel it is important we can change it as well.
> >> > >> >
> >> > >>
> >> > >> I see now that a user can record the rate of skipped records.
> >> However,
> >> > >> I was referring to the total number of skipped records. Maybe my
> >> > >> question should be more general: should we allow the user to also
> >> > >> specify sensors for totals or combinations of rate and totals?
> >> > >>
> >> > >> Sounds good to me, I will add it to the wiki page as well for
> >> > > StreamsMetrics.
> >> > >
> >> > >
> >> > >
> >> > >> Regarding the naming, I like `rate` more than `throughput`, but I
> >> > >> 

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-09-04 Thread Guozhang Wang
Hello John,

Your reasoning about non-logged state store looks good to me. But I think
it worth considering two cases differently: 1) a task's all state stores
are non-logged, 2) a task has non-logged state store but also have other
logged state store.

For 1), we should not have any standbys for such tasks at all, but it is
not the same as a stateless task, because the only active task could report
a lag of "0" while all others should logically report the lag as infinity
(of course, they would not encode such value).

For 2), it is sort of the same to a normal case: active task reporting a
lag of 0 while standby task reporting a lag summing all other logged state
stores, meaning that the non-logged state store does not matter in our
assignment semantics.

The rationale I had is to effectively favor stickiness for those tasks in
case 1) than arbitrarily reassign them even though it is true that for a
non-logged store there's no durability and hence restoration guarantees
anyways from Streams. Then algorithmically we may consider assigning tasks
with descending order of the number of instances reporting lags for it,
then for such tasks there's always one candidate reporting a lag of 0, and
assigning them to any instance does not actually change the cumulated total
lag either. At that time we can decide "if it is still within load balance
factor, then let's respect stickiness, otherwise it's free to move". WDYT?


Guozhang

On Wed, Sep 4, 2019 at 8:30 AM John Roesler  wrote:

> Hey Bruno,
>
> Thanks for taking another look. Some quick responses:
>
> 1) It just means the number of offsets in the topic. E.g., the LSO is 100,
> but the first offset is 40 due to retention, so there are 60 offsets in the
> topic. Further, the lag on that topic would be considered to be 60 for any
> task that hadn't previously done any work on it.
>
> 2) This is undecidable in general. I.e., there's no way we can know whether
> the store is remote or not, and hence whether we can freely assign it to
> another instance, or whether we have to keep it on the same instance.
> However, there are a couple of reasons to go ahead and assume we have the
> freedom to move such tasks.
> * We know that nothing can prevent the loss of an instance in a cluster
> (I.e., this is true of all cloud environments, as well as any managed
> virtualized cluster like mesos or kubernetes), so any Streams program that
> makes use of non-remote, non-logged state is doomed to lose its state when
> it loses an instance.
> * If we take on a restriction that we cannot move such state between
> instances, we'd become overconstrained very quickly. Effectively, if you
> made use of non-logged stores, and we didn't assume freedom of movement,
> then we couldn't make use of any new instances in your cluster.
> * On the other hand, if we optimistically assume we can't move state, but
> only reassign it when we lose an instance, then we're supporting
> non-deterministic logic, because the program would produce different
> results, depending on whether you lost a node during the execution or not.
> 2b) That last point goes along with your side note. I'm not sure if we
> should bother dropping such state on every reassignment, though. It seems
> to be undefined territory enough that we can just do the simplest thing and
> assume people have made their own (external) provisions for durability.
> I.e., when we say "non-logged", we mean that it doesn't make use of _our_
> durability mechanism. I'm arguing that the only sane assumption is that
> such folks have opted to use their own durability measures, and we should
> just assume it works with no special considerations in the assignment
> algorithm.
>
> 3) Good catch! I've fixed it.
>
> Thanks again!
> -John
>
> On Wed, Sep 4, 2019 at 6:09 AM Bruno Cadonna  wrote:
>
> > Hi,
> >
> > 1) What do you mean with "full set of offsets in the topic"? Is this
> > the sum of all offsets of the changelog partitions of the task?
> >
> > 2) I am not sure whether non-logged stateful tasks should be
> > effectively treated as stateless tasks during assignment. First we
> > need to decide whether a non-logged stateful task should preferably be
> > assigned to the same instance on which it just run in order to
> > continue to use its state or not.
> >
> > 3) In the example, you define stand-by tasks {S1, S2, ...} but never
> > use them, because below you use a dedicated row for stand-by tasks.
> >
> > As a side note to 2) since it is not directly related to this KIP: We
> > should decide if we want to avoid the possible non-determinism
> > introduced by non-logged stores or not. That is, if an instance hosts
> > a task with non-logged stores then we can have two cases after the
> > next rebalance: a) the task stays on the same instance and continues
> > to use the same state store as used so far or b) the task is assigned
> > to another instance and it starts an empty state store. The produced
> > results for these two cases might differ. 

Re: [DISCUSS] KIP-486 Support for pluggable KeyStore and TrustStore

2019-09-04 Thread Maulin Vasavada
Hi Colin

Thanks for your comments. I agree with most of them. This is not a
pull-request ready code yet :)

If we want to make DefaultSslEngineBuilder final then what do you propose
to address our requirements to be able to plugin custom way for loading
keys/certs? (The original challenge we have documented with this KIP). What
I gather from your comment on that is - we will have to standardize those
requirements as public APIs like interfaces documented in the KIP-486 for
KeyStoreLoader/TrustStoreLoader, correct? We have those pluggable APIs to
provide custom source for keys/certs and have SslEngineBuilder Interface
somehow take those?





On Wed, Sep 4, 2019 at 10:58 AM Colin McCabe  wrote:

> On Tue, Sep 3, 2019, at 22:56, Maulin Vasavada wrote:
> > Hi all
> >
> > Please check
> >
> https://github.com/maulin-vasavada/kafka/commit/44f86395b1ba3fe4bd87de89029d72da77995ff8
> >
> >
> > This is just the first cut obviously. There are few call outs I would
> like
> > to make,
> >
> > 1. So far I kept the old SslEngineBuilder hence I had to name the
> interface
> > with "I" (that can change later)
>
> Hi Maulin,
>
> Thanks for working on this.
>
> We don't use Hungarian notation in Kafka.  The interface should probably
> just be SslEngineBuilder.  The default implementation can be
> DefaultSslEngineBuilder.
>
> >
> > 2. I did not yet add the creation of SslEngineBuilder via loading the
> > configuration like 'ssl.engine.builder.class'. Hence you see direct
> > creation of DefaultSslEngineBuilder class
> >
> > 3. Due to validation logic in the current SslFactory I had to add more
> > methods in ISslEngineBuilder interface (like keystore(), truststore()
> etc).
> > Due to other classes like EchoServer depending upon needing SSLContext, I
> > had to add getSSLContext() also in the interface.
>
> Hmm.  I don't think we want to expose this stuff.  EchoServer is just used
> for testing, so it can cast the SslEngineBuilder to DefaultEngineBuilder
> (the only one that it will use during JUnit tests) and get what it needs
> that way.
>
> >
> > 4. With these changes and with existing old SslEngineBuilder, the
> > clients/core projects builds with tests successfully but I didn't add any
> > additional tests yet
> >
> > 5. I wanted to have DefaultSslEngineBuilder in such a way that if
> somebody
> > wants to implement custom SslEngineBuilder they can extend and override
> > only key required methods without repeating any logic.
>
> No, DefaultSslEngineBuilder should be final.  We should not allow people
> to extend the default engine builder, since then it becomes a public API.
> If there are utility functions that we feel would be useful to everyone, we
> can spell those out explicitly and standardize them as public APIs that
> can't be changed.
>
> >
> > 6. For reconfigurable interface I kept the way suggested by Rajini -
> > meaning SslFactory really is reconfigurable BUT it relies on the
> > ISslEngineBuilder to define the reconfigurable options. This means that
> > ISslEngineBuilder dictates based on which reconfigurable params the
> > SslFactory should try to reconfigure the SSLEngine.
>
> Each SslEngineBuilder should define its own set of reconfigurable
> configurations.  We don't know ahead of time what they will need.  We want
> to be flexible.  People might want to fetch certificates from a central
> location via HTTPS, for example.  Or maybe they want to use a native
> library of some kind.  And so on.
>
> best,
> Colin
>
> >
> > With this - open to all the suggestions and further improvements.
> >
> > Thanks
> > Maulin
> >
> >
> > On Tue, Sep 3, 2019 at 10:07 AM Colin McCabe  wrote:
> >
> > > On Mon, Sep 2, 2019, at 03:33, Rajini Sivaram wrote:
> > > > I would expect SslEngineBuilder interface to look something like
> this,
> > > > perhaps with some tweaking:
> > > >
> > > > public interface SslEngineBuilder extends Configurable, Closeable {
> > > >
> > > > Set reconfigurableConfigs();
> > > >
> > > > boolean shouldBeRebuilt(Map nextConfigs);
> > > >
> > > > SSLEngine createSslEngine(Mode mode, String peerHost, int
> > > > peerPort, String endpointIdentification);
> > > >
> > > > }
> > > >
> > > > The existing SslEngineBuilder class would be renamed and will
> implement
> > > > this interface. Loading of keystore/truststore will be in
> > > SslEngineBuilder
> > > > as it is now.  The method `shouldBeRebuilt()` will validate configs
> > > during
> > > > reconfiguration and decide if reconfiguration is required because
> > > keystore
> > > > or truststore changed. SslFactory.reconfigurableConfigs() will return
> > > > SslEngineBuilder.reconfigurableConfigs() as well including any custom
> > > > configs of SslEngineBuilder, so no other changes will be required
> when we
> > > > eventually support custom SSL configs.
> > > >
> > > > We don't want to make SslFactory the pluggable class since that
> contains
> > > > validation logic for SSL engines. Everything that we want to
> customise is
> > > > 

Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-09-04 Thread Bruno Cadonna
Hi John,

Thank you for your answer. Your assumptions sound reasonable to me.

Best,
Bruno

On Wed, Sep 4, 2019 at 5:30 PM John Roesler  wrote:
>
> Hey Bruno,
>
> Thanks for taking another look. Some quick responses:
>
> 1) It just means the number of offsets in the topic. E.g., the LSO is 100,
> but the first offset is 40 due to retention, so there are 60 offsets in the
> topic. Further, the lag on that topic would be considered to be 60 for any
> task that hadn't previously done any work on it.
>
> 2) This is undecidable in general. I.e., there's no way we can know whether
> the store is remote or not, and hence whether we can freely assign it to
> another instance, or whether we have to keep it on the same instance.
> However, there are a couple of reasons to go ahead and assume we have the
> freedom to move such tasks.
> * We know that nothing can prevent the loss of an instance in a cluster
> (I.e., this is true of all cloud environments, as well as any managed
> virtualized cluster like mesos or kubernetes), so any Streams program that
> makes use of non-remote, non-logged state is doomed to lose its state when
> it loses an instance.
> * If we take on a restriction that we cannot move such state between
> instances, we'd become overconstrained very quickly. Effectively, if you
> made use of non-logged stores, and we didn't assume freedom of movement,
> then we couldn't make use of any new instances in your cluster.
> * On the other hand, if we optimistically assume we can't move state, but
> only reassign it when we lose an instance, then we're supporting
> non-deterministic logic, because the program would produce different
> results, depending on whether you lost a node during the execution or not.
> 2b) That last point goes along with your side note. I'm not sure if we
> should bother dropping such state on every reassignment, though. It seems
> to be undefined territory enough that we can just do the simplest thing and
> assume people have made their own (external) provisions for durability.
> I.e., when we say "non-logged", we mean that it doesn't make use of _our_
> durability mechanism. I'm arguing that the only sane assumption is that
> such folks have opted to use their own durability measures, and we should
> just assume it works with no special considerations in the assignment
> algorithm.
>
> 3) Good catch! I've fixed it.
>
> Thanks again!
> -John
>
> On Wed, Sep 4, 2019 at 6:09 AM Bruno Cadonna  wrote:
>
> > Hi,
> >
> > 1) What do you mean with "full set of offsets in the topic"? Is this
> > the sum of all offsets of the changelog partitions of the task?
> >
> > 2) I am not sure whether non-logged stateful tasks should be
> > effectively treated as stateless tasks during assignment. First we
> > need to decide whether a non-logged stateful task should preferably be
> > assigned to the same instance on which it just run in order to
> > continue to use its state or not.
> >
> > 3) In the example, you define stand-by tasks {S1, S2, ...} but never
> > use them, because below you use a dedicated row for stand-by tasks.
> >
> > As a side note to 2) since it is not directly related to this KIP: We
> > should decide if we want to avoid the possible non-determinism
> > introduced by non-logged stores or not. That is, if an instance hosts
> > a task with non-logged stores then we can have two cases after the
> > next rebalance: a) the task stays on the same instance and continues
> > to use the same state store as used so far or b) the task is assigned
> > to another instance and it starts an empty state store. The produced
> > results for these two cases might differ. To avoid the nondeterminism,
> > non-logged state stores would need to be wiped out before assignment.
> > Then the question arises, how the removal of non-logged state stores
> > before assignment would affect backward-compatibility.
> >
> > Best,
> > Bruno
> >
> > On Wed, Aug 21, 2019 at 11:40 PM John Roesler  wrote:
> > >
> > > Hi Guozhang,
> > >
> > > > My impression from your previous email is that inside the algorithm
> > when
> > > we
> > > are "filling" them to instances some deterministic logic would be used to
> > > avoid the above case, is that correct?
> > >
> > > Yes, that was my plan, but I didn't formalize it. There was a requirement
> > > that the assignment algorithm must not produce a new assignment if the
> > > current assignment is already balanced, so at the least, any thrashing
> > > would be restricted to the "balancing" phase while tasks are moving
> > around
> > > the cluster.
> > >
> > > Anyway, I think it would be good to say that we'll "try to" produce
> > stable
> > > assignments, so I've added a "should" clause to the assignment spec:
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm
> > >
> > > For example, we would sort the stateless tasks and available instances
> > > before 

[DISCUSS] KIP-516: Topic Identifiers

2019-09-04 Thread Lucas Bradstreet
Hi all,

I would like to kick off discussion of KIP-516, an implementation of topic
IDs for Kafka. Topic IDs aim to solve topic uniqueness problems in Kafka,
where referring to a topic by name alone is insufficient. Such cases
include when a topic has been deleted and recreated with the same name.

Unique identifiers will help simplify and improve Kafka's topic deletion
process, as well as prevent cases where brokers may incorrectly interact
with stale versions of topics.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers

Looking forward to your thoughts.

Lucas


[jira] [Created] (KAFKA-8872) Improvements to controller "deleting" state / topic Identifiers

2019-09-04 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-8872:
---

 Summary: Improvements to controller "deleting" state /  topic 
Identifiers
 Key: KAFKA-8872
 URL: https://issues.apache.org/jira/browse/KAFKA-8872
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Bradstreet


Kafka currently uniquely identifies a topic by its name. This is generally 
sufficient, but there are flaws in this scheme if a topic is deleted and 
recreated with the same name. As a result, Kafka attempts to prevent these 
classes of issues by ensuring a topic is deleted from all replicas before 
completing a deletion. This solution is not perfect, as it is possible for 
partitions to be reassigned from brokers while they are down, and there are no 
guarantees that this state will ever be cleaned up and will not cause issues in 
the future.

As the controller must wait for all replicas to delete their local partitions, 
deletes can also become blocked, preventing topics from being created with the 
same name until the deletion is complete on all replicas. This can mean that 
downtime for a single broker can effectively cause a complete outage for 
everyone producing/consuming to that topic name, as the topic cannot be 
recreated without manual intervention.

Unique topic IDs could help address this issue by associating a unique ID with 
each topic, ensuring a newly created topic with a previously used name cannot 
be confused with a previous topic with that name.

 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] KIP-486 Support for pluggable KeyStore and TrustStore

2019-09-04 Thread Colin McCabe
On Tue, Sep 3, 2019, at 22:56, Maulin Vasavada wrote:
> Hi all
> 
> Please check
> https://github.com/maulin-vasavada/kafka/commit/44f86395b1ba3fe4bd87de89029d72da77995ff8
> 
> 
> This is just the first cut obviously. There are few call outs I would like
> to make,
> 
> 1. So far I kept the old SslEngineBuilder hence I had to name the interface
> with "I" (that can change later)

Hi Maulin,

Thanks for working on this.

We don't use Hungarian notation in Kafka.  The interface should probably just 
be SslEngineBuilder.  The default implementation can be DefaultSslEngineBuilder.

> 
> 2. I did not yet add the creation of SslEngineBuilder via loading the
> configuration like 'ssl.engine.builder.class'. Hence you see direct
> creation of DefaultSslEngineBuilder class
> 
> 3. Due to validation logic in the current SslFactory I had to add more
> methods in ISslEngineBuilder interface (like keystore(), truststore() etc).
> Due to other classes like EchoServer depending upon needing SSLContext, I
> had to add getSSLContext() also in the interface.

Hmm.  I don't think we want to expose this stuff.  EchoServer is just used for 
testing, so it can cast the SslEngineBuilder to DefaultEngineBuilder (the only 
one that it will use during JUnit tests) and get what it needs that way.

> 
> 4. With these changes and with existing old SslEngineBuilder, the
> clients/core projects builds with tests successfully but I didn't add any
> additional tests yet
> 
> 5. I wanted to have DefaultSslEngineBuilder in such a way that if somebody
> wants to implement custom SslEngineBuilder they can extend and override
> only key required methods without repeating any logic.

No, DefaultSslEngineBuilder should be final.  We should not allow people to 
extend the default engine builder, since then it becomes a public API.  If 
there are utility functions that we feel would be useful to everyone, we can 
spell those out explicitly and standardize them as public APIs that can't be 
changed.

> 
> 6. For reconfigurable interface I kept the way suggested by Rajini -
> meaning SslFactory really is reconfigurable BUT it relies on the
> ISslEngineBuilder to define the reconfigurable options. This means that
> ISslEngineBuilder dictates based on which reconfigurable params the
> SslFactory should try to reconfigure the SSLEngine.

Each SslEngineBuilder should define its own set of reconfigurable 
configurations.  We don't know ahead of time what they will need.  We want to 
be flexible.  People might want to fetch certificates from a central location 
via HTTPS, for example.  Or maybe they want to use a native library of some 
kind.  And so on.

best,
Colin

> 
> With this - open to all the suggestions and further improvements.
> 
> Thanks
> Maulin
> 
> 
> On Tue, Sep 3, 2019 at 10:07 AM Colin McCabe  wrote:
> 
> > On Mon, Sep 2, 2019, at 03:33, Rajini Sivaram wrote:
> > > I would expect SslEngineBuilder interface to look something like this,
> > > perhaps with some tweaking:
> > >
> > > public interface SslEngineBuilder extends Configurable, Closeable {
> > >
> > > Set reconfigurableConfigs();
> > >
> > > boolean shouldBeRebuilt(Map nextConfigs);
> > >
> > > SSLEngine createSslEngine(Mode mode, String peerHost, int
> > > peerPort, String endpointIdentification);
> > >
> > > }
> > >
> > > The existing SslEngineBuilder class would be renamed and will implement
> > > this interface. Loading of keystore/truststore will be in
> > SslEngineBuilder
> > > as it is now.  The method `shouldBeRebuilt()` will validate configs
> > during
> > > reconfiguration and decide if reconfiguration is required because
> > keystore
> > > or truststore changed. SslFactory.reconfigurableConfigs() will return
> > > SslEngineBuilder.reconfigurableConfigs() as well including any custom
> > > configs of SslEngineBuilder, so no other changes will be required when we
> > > eventually support custom SSL configs.
> > >
> > > We don't want to make SslFactory the pluggable class since that contains
> > > validation logic for SSL engines. Everything that we want to customise is
> > > contained in SslEngineBuilder. Basically custom SslEngineBuilder will
> > > validate custom configs during reconfiguration and create SSLEngine.
> > > SslFactory will continue to perform validation of SSLEngines and this
> > will
> > > not be customizable. SslEngineBuilder will not be reconfigurable, instead
> > > we create a new builder as we do now to avoid having to deal with
> > > thread-safety and atomicity of updates. We could consider using a public
> > > Reconfigurable interface as the pluggable interface for consistency, but
> > I
> > > think we would still want to create a new Builder on reconfiguration and
> > > retain non-pluggable SSL engine validation in SslFactory.
> >
> > +1
> >
> > C.
> >
> > >
> > >
> > > On Fri, Aug 30, 2019 at 10:21 PM Maulin Vasavada <
> > maulin.vasav...@gmail.com>
> > > wrote:
> > >
> > > > Looking at SslFactory and SslEngineBuilder I feel the 

[VOTE] KIP-447: Producer scalability for exactly once semantics

2019-09-04 Thread Boyang Chen
Hey all,

I would like to start the vote for KIP-447
.
This is a very important step to improve Kafka Streams scalability in
exactly-once semantics, by avoiding linearly increasing number of producers
with topic partition increases.

Thanks,
Boyang


Build failed in Jenkins: kafka-2.1-jdk8 #227

2019-09-04 Thread Apache Jenkins Server
See 


Changes:

[bill] KAFKA-8861 Fix flaky

[bill] HOTFIX: The cherry-pick for https://github.com/apache/kafka/pull/7207 to

--
Started by an SCM change
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on H27 (ubuntu xenial) in workspace 

[WS-CLEANUP] Deleting project workspace...
[WS-CLEANUP] Deferred wipeout is used...
No credentials specified
Cloning the remote Git repository
Cloning repository https://github.com/apache/kafka.git
 > git init  # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # 
 > timeout=10
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/2.1^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/2.1^{commit} # timeout=10
Checking out Revision 6560203d20e818515d5b8c961fa648788c684cd6 
(refs/remotes/origin/2.1)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 6560203d20e818515d5b8c961fa648788c684cd6
Commit message: "HOTFIX: The cherry-pick for 
https://github.com/apache/kafka/pull/7207 to 2.1 causes a failure in 
AssignedStreamsTasksTest"
 > git rev-list --no-walk 8cdc2d6d1b2a227f5e62f148179a52684b230bd6 # timeout=10
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
[kafka-2.1-jdk8] $ /bin/bash -xe /tmp/jenkins6125630457162380728.sh
+ rm -rf 
+ /home/jenkins/tools/gradle/4.8.1/bin/gradle
/tmp/jenkins6125630457162380728.sh: line 4: 
/home/jenkins/tools/gradle/4.8.1/bin/gradle: No such file or directory
Build step 'Execute shell' marked build as failure
[FINDBUGS] Collecting findbugs analysis files...
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
[FINDBUGS] Searching for all files in 
 that match the pattern 
**/build/reports/findbugs/*.xml
[FINDBUGS] No files found. Configuration error?
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
No credentials specified
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
 Using GitBlamer to create author and commit information for all 
warnings.
 GIT_COMMIT=6560203d20e818515d5b8c961fa648788c684cd6, 
workspace=
[FINDBUGS] Computing warning deltas based on reference build #225
Recording test results
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
ERROR: Step ?Publish JUnit test result report? failed: No test report files 
were found. Configuration error?
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
Not sending mail to unregistered user b...@confluent.io
Not sending mail to unregistered user wangg...@gmail.com


Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-09-04 Thread Boyang Chen
>From offline discussion, the eventual conclusion is to use a top-level
Consumer#getMetadata() API to fetch the latest group metadata information
for offset fencing, so that we only call initTransaction once in lifetime.

Since no further question is raised on this thread, I will start vote
today. In the meantime, still feel free to make comments on the discussion
thread, thank you!

Boyang

On Sun, Aug 25, 2019 at 11:43 PM Boyang Chen 
wrote:

> Hey Guozhang and Jason,
>
> I'm ok with either way. Thinking of Guozhang's approach, it is simpler to
> implement a consumer-producer if we avoid callback pattern and only do the
> group metadata initialization once, however the access pattern of consumer
> rebalance state is scattered, which means we get both rebalance listener
> and metadata getter. Jason's approach overloaded the initTransactions API,
> which could be more confusing as it already has been today. Comparing the
> two here, I'm inclined to Guozhang's approach as it is not conclusive to
> say a new metadata getter class will confuse any user, with a sacrifice in
> the cleanness of future implementation around consumer state. WDYT?
>
> Boyang
>
> On Wed, Aug 14, 2019 at 10:45 AM Guozhang Wang  wrote:
>
>> My main concern is to require the overloaded `initTransactions` to be
>> called repeatedly while the original `initTransactions` still called once
>> throughout the life time, which is a bit confusing.
>>
>> Looking into the current POC PR, we actually only need the latest
>> generation id when fetching offsets, so we can just make the GroupMetadata
>> returned from the consumer a wrapper of the underlying values, and the
>> getters of this object would always return the latest value.
>> The values would be reset internally within the rebalances; and then the
>> new `initTransactions` would still only be called once.
>>
>> Guozhang
>>
>>
>> On Wed, Aug 14, 2019 at 9:53 AM Jason Gustafson 
>> wrote:
>>
>> > Yeah, my reasoning is that the group metadata is only relevant to the
>> > subscription API. So it makes sense to only expose it to the rebalance
>> > listener.
>> >
>> > One option we could consider is bring back the `initTransactions`
>> overload.
>> > Then usage looks something like this:
>> >
>> > consumer.subscribe(topics, new RebalanceListener() {
>> >   void onGroupJoined(GroupMetadata metadata) {
>> > producer.initTransactions(metadata);
>> >   }
>> > }
>> >
>> > That seems pretty clean. What do you think?
>> >
>> > -Jason
>> >
>> > On Tue, Aug 13, 2019 at 6:07 PM Boyang Chen > >
>> > wrote:
>> >
>> > > Hey Guozhang,
>> > >
>> > > thanks for the suggestion. Could you elaborate more on why defining a
>> > > direct consumer API would be easier? The benefit of reusing consumer
>> > > rebalance listener is to consolidate the entry point of consumer
>> internal
>> > > states. Compared with letting consumer generate a deep-copy of
>> metadata
>> > > every time we call #sendOffsetsToTransactions, using a callback seems
>> > > reducing unnecessary updates towards the metadata. WDYT?
>> > >
>> > > Boyang
>> > >
>> > > On Tue, Aug 13, 2019 at 2:14 PM Guozhang Wang 
>> > wrote:
>> > >
>> > > > Hi Boyang, Jason,
>> > > >
>> > > > If we are going to expose the generation id / group.instance id etc
>> > > anyways
>> > > > I think its slightly better to just add a new API on KafkaConsumer
>> > > > returning the ConsumerGroupMetadata (option 3) than passing it in
>> on an
>> > > > additional callback of ConsumerRebalanceListener.
>> > > > It feels easier to leverage, than requiring users to pass in the
>> > > listener.
>> > > >
>> > > > Guozhang
>> > > >
>> > > > On Mon, Aug 12, 2019 at 3:41 PM Boyang Chen <
>> > reluctanthero...@gmail.com>
>> > > > wrote:
>> > > >
>> > > > > Thanks Jason, the intuition behind defining a separate callback
>> > > function
>> > > > is
>> > > > > that, with KIP-429 we no longer guarantee to call
>> > > OnPartitionsAssigned()
>> > > > or
>> > > > > OnPartitionsRevoked() with each rebalance. Our requirement is to
>> be
>> > > > > up-to-date with group metadata such as generation information, so
>> > > > callback
>> > > > > like onGroupJoined() would make more sense as it should be invoked
>> > > after
>> > > > > every successful rebalance.
>> > > > >
>> > > > > Best,
>> > > > > Boyang
>> > > > >
>> > > > > On Mon, Aug 12, 2019 at 2:02 PM Jason Gustafson <
>> ja...@confluent.io>
>> > > > > wrote:
>> > > > >
>> > > > > > Hey Boyang,
>> > > > > >
>> > > > > > I favor option 4 as well. It's a little more cumbersome than 3
>> for
>> > > this
>> > > > > use
>> > > > > > case, but it seems like a cleaner separation of concerns. The
>> > > rebalance
>> > > > > > listener is already concerned with events affecting the
>> assignment
>> > > > > > lifecycle and group membership. I think the only thing I'm
>> > wondering
>> > > is
>> > > > > > whether it should be a separate callback as you've suggested,
>> or if
>> > > it
>> > > > > > would make sense to overload 

[jira] [Created] (KAFKA-8871) Allow timestamp manipulation in ValueTransformerWithKey

2019-09-04 Thread Levani Kokhreidze (Jira)
Levani Kokhreidze created KAFKA-8871:


 Summary: Allow timestamp manipulation in ValueTransformerWithKey
 Key: KAFKA-8871
 URL: https://issues.apache.org/jira/browse/KAFKA-8871
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Levani Kokhreidze


h3. Motivation

When using `KStream#transform` in Kafka Streams DSL to manipulate the 
timestamp, `KStreamImpl#transform` implementation marks *repartitionRequired* 
as *true,* which isn't necessarily okay when one may just want to manipulate 
with timestamp without affecting the key. It would be great if DSL user could 
manipulate the timestamp in `ValueTransformerWithKey`.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Build failed in Jenkins: kafka-2.0-jdk8 #291

2019-09-04 Thread Apache Jenkins Server
See 


Changes:

[bill] KAFKA-8861 Fix flaky

--
Started by an SCM change
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on H24 (ubuntu xenial) in workspace 

[WS-CLEANUP] Deleting project workspace...
[WS-CLEANUP] Deferred wipeout is used...
No credentials specified
Cloning the remote Git repository
Cloning repository https://github.com/apache/kafka.git
 > git init  # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # 
 > timeout=10
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/2.0^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/2.0^{commit} # timeout=10
Checking out Revision e6eda88180b843020725082a9bcfd08985e154c0 
(refs/remotes/origin/2.0)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f e6eda88180b843020725082a9bcfd08985e154c0
Commit message: "KAFKA-8861 Fix flaky 
RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic 
(#7281)"
 > git rev-list --no-walk 3aa9f99eef44d50fb9971ebca3df5aa4213357ca # timeout=10
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
[kafka-2.0-jdk8] $ /bin/bash -xe /tmp/jenkins8120365312901149033.sh
+ rm -rf 
+ /home/jenkins/tools/gradle/4.8.1/bin/gradle
/tmp/jenkins8120365312901149033.sh: line 4: 
/home/jenkins/tools/gradle/4.8.1/bin/gradle: No such file or directory
Build step 'Execute shell' marked build as failure
[FINDBUGS] Collecting findbugs analysis files...
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
[FINDBUGS] Searching for all files in 
 that match the pattern 
**/build/reports/findbugs/*.xml
[FINDBUGS] No files found. Configuration error?
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
No credentials specified
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
 Using GitBlamer to create author and commit information for all 
warnings.
 GIT_COMMIT=e6eda88180b843020725082a9bcfd08985e154c0, 
workspace=
[FINDBUGS] Computing warning deltas based on reference build #290
Recording test results
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
ERROR: Step ?Publish JUnit test result report? failed: No test report files 
were found. Configuration error?
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
Not sending mail to unregistered user b...@confluent.io


[jira] [Created] (KAFKA-8870) Prevent dirty reads of Streams state store from Interactive queries

2019-09-04 Thread Vinoth Chandar (Jira)
Vinoth Chandar created KAFKA-8870:
-

 Summary: Prevent dirty reads of Streams state store from 
Interactive queries
 Key: KAFKA-8870
 URL: https://issues.apache.org/jira/browse/KAFKA-8870
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Vinoth Chandar


Today, Interactive Queries (IQ) against Streams state store could see 
uncommitted data, even with EOS processing guarantees (these are actually 
orthogonal, but clarifying since EOS may give the impression that everything is 
dandy). This is causes primarily because state updates in rocksdb are visible 
even before the kafka transaction is committed. Thus, if the instance fails, 
then the failed over instance will redo the uncommited old transaction and the 
following could be possible during recovery,.

Value for key K can go from *V0 → V1 → V2* on active instance A, IQ reads V1, 
instance A fails and any failure/rebalancing will leave the standy instance B 
rewinding offsets and reprocessing, during which time IQ can again see V0 or V1 
or any number of previous values for the same key.

In this issue, we will plan work towards providing consistency for IQ, for a 
single row in a single state store. i.e once a query sees V1, it can only see 
either V1 or V2.

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Build failed in Jenkins: kafka-2.3-jdk8 #100

2019-09-04 Thread Apache Jenkins Server
See 


Changes:

[bill] KAFKA-8861 Fix flaky

--
Started by an SCM change
Started by an SCM change
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on H22 (ubuntu xenial) in workspace 

[WS-CLEANUP] Deleting project workspace...
[WS-CLEANUP] Deferred wipeout is used...
No credentials specified
Cloning the remote Git repository
Cloning repository https://github.com/apache/kafka.git
 > git init  # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # 
 > timeout=10
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/2.3^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/2.3^{commit} # timeout=10
Checking out Revision c7b4595f3af966426b4d46216a7ba893a3c86de1 
(refs/remotes/origin/2.3)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f c7b4595f3af966426b4d46216a7ba893a3c86de1
Commit message: "KAFKA-8861 Fix flaky 
RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic 
(#7281)"
 > git rev-list --no-walk b9450a49ec0b7e10eca1e4413bdd6956f1a18677 # timeout=10
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
[kafka-2.3-jdk8] $ /bin/bash -xe /tmp/jenkins9149281496775566872.sh
+ rm -rf 
+ /home/jenkins/tools/gradle/4.8.1/bin/gradle
/tmp/jenkins9149281496775566872.sh: line 4: 
/home/jenkins/tools/gradle/4.8.1/bin/gradle: No such file or directory
Build step 'Execute shell' marked build as failure
[FINDBUGS] Collecting findbugs analysis files...
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
[FINDBUGS] Searching for all files in 
 that match the pattern 
**/build/reports/findbugs/*.xml
[FINDBUGS] No files found. Configuration error?
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
No credentials specified
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
 Using GitBlamer to create author and commit information for all 
warnings.
 GIT_COMMIT=c7b4595f3af966426b4d46216a7ba893a3c86de1, 
workspace=
[FINDBUGS] Computing warning deltas based on reference build #94
Recording test results
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
ERROR: Step ?Publish JUnit test result report? failed: No test report files 
were found. Configuration error?
Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
Not sending mail to unregistered user g...@confluent.io
Not sending mail to unregistered user b...@confluent.io
Not sending mail to unregistered user wangg...@gmail.com


[jira] [Created] (KAFKA-8869) Map taskConfigs in KafkaConfigBackingStore grows monotonically despite of task removals

2019-09-04 Thread Konstantine Karantasis (Jira)
Konstantine Karantasis created KAFKA-8869:
-

 Summary: Map taskConfigs in KafkaConfigBackingStore grows 
monotonically despite of task removals
 Key: KAFKA-8869
 URL: https://issues.apache.org/jira/browse/KAFKA-8869
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.3.0
Reporter: Konstantine Karantasis
Assignee: Konstantine Karantasis
 Fix For: 2.3.1



Investigation of https://issues.apache.org/jira/browse/KAFKA-8676 revealed 
another issue: 
a map in {{KafkaConfigBackingStore}} keeps growing despite of connectors and 
tasks getting removed eventually.

This bug does not affect directly rebalancing protocols but it'd good to 
resolve and use in a way similar to how {{connectorConfigs}} is used. 




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8868) Consider auto-generating Streams binary protocol messages

2019-09-04 Thread John Roesler (Jira)
John Roesler created KAFKA-8868:
---

 Summary: Consider auto-generating Streams binary protocol messages
 Key: KAFKA-8868
 URL: https://issues.apache.org/jira/browse/KAFKA-8868
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler
Assignee: John Roesler


Rather than maintain hand coded protocol serialization code, Streams could use 
the same code-generation framework as Clients/Core.

There isn't a perfect match, since the code generation framework includes an 
assumption that you're generating "protocol messages", rather than just 
arbitrary blobs, but I think it's close enough to justify using it, and 
improving it over time.

Using the code generation allows us to drop a lot of detail-oriented, brittle, 
and hard-to-maintain serialization logic in favor of a schema spec.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-09-04 Thread John Roesler
Hey Bruno,

Thanks for taking another look. Some quick responses:

1) It just means the number of offsets in the topic. E.g., the LSO is 100,
but the first offset is 40 due to retention, so there are 60 offsets in the
topic. Further, the lag on that topic would be considered to be 60 for any
task that hadn't previously done any work on it.

2) This is undecidable in general. I.e., there's no way we can know whether
the store is remote or not, and hence whether we can freely assign it to
another instance, or whether we have to keep it on the same instance.
However, there are a couple of reasons to go ahead and assume we have the
freedom to move such tasks.
* We know that nothing can prevent the loss of an instance in a cluster
(I.e., this is true of all cloud environments, as well as any managed
virtualized cluster like mesos or kubernetes), so any Streams program that
makes use of non-remote, non-logged state is doomed to lose its state when
it loses an instance.
* If we take on a restriction that we cannot move such state between
instances, we'd become overconstrained very quickly. Effectively, if you
made use of non-logged stores, and we didn't assume freedom of movement,
then we couldn't make use of any new instances in your cluster.
* On the other hand, if we optimistically assume we can't move state, but
only reassign it when we lose an instance, then we're supporting
non-deterministic logic, because the program would produce different
results, depending on whether you lost a node during the execution or not.
2b) That last point goes along with your side note. I'm not sure if we
should bother dropping such state on every reassignment, though. It seems
to be undefined territory enough that we can just do the simplest thing and
assume people have made their own (external) provisions for durability.
I.e., when we say "non-logged", we mean that it doesn't make use of _our_
durability mechanism. I'm arguing that the only sane assumption is that
such folks have opted to use their own durability measures, and we should
just assume it works with no special considerations in the assignment
algorithm.

3) Good catch! I've fixed it.

Thanks again!
-John

On Wed, Sep 4, 2019 at 6:09 AM Bruno Cadonna  wrote:

> Hi,
>
> 1) What do you mean with "full set of offsets in the topic"? Is this
> the sum of all offsets of the changelog partitions of the task?
>
> 2) I am not sure whether non-logged stateful tasks should be
> effectively treated as stateless tasks during assignment. First we
> need to decide whether a non-logged stateful task should preferably be
> assigned to the same instance on which it just run in order to
> continue to use its state or not.
>
> 3) In the example, you define stand-by tasks {S1, S2, ...} but never
> use them, because below you use a dedicated row for stand-by tasks.
>
> As a side note to 2) since it is not directly related to this KIP: We
> should decide if we want to avoid the possible non-determinism
> introduced by non-logged stores or not. That is, if an instance hosts
> a task with non-logged stores then we can have two cases after the
> next rebalance: a) the task stays on the same instance and continues
> to use the same state store as used so far or b) the task is assigned
> to another instance and it starts an empty state store. The produced
> results for these two cases might differ. To avoid the nondeterminism,
> non-logged state stores would need to be wiped out before assignment.
> Then the question arises, how the removal of non-logged state stores
> before assignment would affect backward-compatibility.
>
> Best,
> Bruno
>
> On Wed, Aug 21, 2019 at 11:40 PM John Roesler  wrote:
> >
> > Hi Guozhang,
> >
> > > My impression from your previous email is that inside the algorithm
> when
> > we
> > are "filling" them to instances some deterministic logic would be used to
> > avoid the above case, is that correct?
> >
> > Yes, that was my plan, but I didn't formalize it. There was a requirement
> > that the assignment algorithm must not produce a new assignment if the
> > current assignment is already balanced, so at the least, any thrashing
> > would be restricted to the "balancing" phase while tasks are moving
> around
> > the cluster.
> >
> > Anyway, I think it would be good to say that we'll "try to" produce
> stable
> > assignments, so I've added a "should" clause to the assignment spec:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm
> >
> > For example, we would sort the stateless tasks and available instances
> > before assigning them, so that the stateless task assignment would mostly
> > stay stable between assignments, modulo the compute capacity of the
> > instances changing a little as active stateful tasks get assigned in more
> > balanced ways.
> >
> > Thanks,
> > -John
> >
> >
> > On Wed, Aug 21, 2019 at 1:55 PM Guozhang Wang 
> wrote:
> >
> > > 

Re: [VOTE] KIP-482: The Kafka Protocol should Support Optional Tagged Fields

2019-09-04 Thread David Arthur
+1 binding.

Thanks for the KIP, Colin!

-David

On Wed, Sep 4, 2019 at 5:40 AM Harsha Chintalapani  wrote:

> LGTM. +1 (binding)
> -Harsha
>
>
> On Wed, Sep 04, 2019 at 1:46 AM, Satish Duggana 
> wrote:
>
> > +1 (non-binding) Thanks for the nice KIP.
> >
> > You may want to update the KIP saying that optional tagged fields do not
> > support complex types(or structs).
> >
> > On Wed, Sep 4, 2019 at 3:43 AM Jose Armando Garcia Sancio
> >  wrote:
> >
> > +1 (non-binding)
> >
> > Looking forward to this improvement.
> >
> > On Tue, Sep 3, 2019 at 12:49 PM David Jacot  wrote:
> >
> > +1 (non-binding)
> >
> > Thank for the KIP. Great addition to the Kafka protocol!
> >
> > Best,
> > David
> >
> > Le mar. 3 sept. 2019 à 19:17, Colin McCabe  a écrit
> :
> >
> > Hi all,
> >
> > I'd like to start the vote for KIP-482: The Kafka Protocol should Support
> > Optional Tagged Fields.
> >
> > KIP:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/
> > KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields
> >
> > Discussion thread here:
> >
> > https://lists.apache.org/thread.html/
> > cdc801ae886491b73ef7efecac7ef81b24382f8b6b025899ee343f7a@%3Cdev.kafka.
> > apache.org%3E
> >
> > best,
> > Colin
> >
> > --
> > -Jose
> >
> >
>


-- 
David Arthur


[jira] [Created] (KAFKA-8867) Kafka Connect JDBC fails to create PostgreSQL table with default boolean value

2019-09-04 Thread Tudor-Alexandru Voicu (Jira)
Tudor-Alexandru Voicu created KAFKA-8867:


 Summary: Kafka Connect JDBC fails to create PostgreSQL table with 
default boolean value
 Key: KAFKA-8867
 URL: https://issues.apache.org/jira/browse/KAFKA-8867
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.3.0
Reporter: Tudor-Alexandru Voicu


The `CREATE TABLE ..` statement generated for sink connectors when configured 
with `auto.table.create=true` generates field declarations that do not conform 
to allowed PostgreSQL syntax when considering fields of type boolean with 
default values:

Example of source message values Avro schema for input topic:

 
{code:java}
{
  "namespace": "com.test.avro.schema.v1",
  "type": "record",
  "name": "SomeEvent",
  "fields": [
{
  "name": "boolean_field",
  "type": "boolean",
  "default": false
}
  ]
}
{code}
The connector task fails with: 

 

 
{code:java}
ERROR WorkerSinkTask{id=test-events-sink-0} RetriableException from SinkTask: 
(org.apache.kafka.connect.runtime.WorkerSinkTask:551)
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: 
org.postgresql.util.PSQLException: ERROR: column "boolean_field" is of type 
boolean but default expression is of type integer
  Hint: You will need to rewrite or cast the expression.

at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:93)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748){code}
 

The generated SQL statement is:

 
{code:java}
CREATE TABLE "test_data" ("boolean_field" BOOLEAN DEFAULT 0){code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Build failed in Jenkins: kafka-trunk-jdk8 #3885

2019-09-04 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: Add system configuration to zk security exception messages

[matthias] KAFKA-7849: Fix the warning when using GlobalKTable (#7104)

--
[...truncated 2.62 MB...]

org.apache.kafka.connect.runtime.rest.resources.ConnectorsResourceTest > 
testCreateConnectorWithHeaderAuthorization STARTED

org.apache.kafka.connect.runtime.rest.resources.ConnectorsResourceTest > 
testCreateConnectorWithHeaderAuthorization PASSED

org.apache.kafka.connect.runtime.rest.resources.ConnectorsResourceTest > 
testCreateConnectorWithoutHeaderAuthorization STARTED

org.apache.kafka.connect.runtime.rest.resources.ConnectorsResourceTest > 
testCreateConnectorWithoutHeaderAuthorization PASSED

org.apache.kafka.connect.runtime.rest.resources.ConnectorsResourceTest > 
testCreateConnectorExists STARTED

org.apache.kafka.connect.runtime.rest.resources.ConnectorsResourceTest > 
testCreateConnectorExists PASSED

org.apache.kafka.connect.runtime.rest.resources.ConnectorsResourceTest > 
testCreateConnectorNameTrimWhitespaces STARTED

org.apache.kafka.connect.runtime.rest.resources.ConnectorsResourceTest > 
testCreateConnectorNameTrimWhitespaces PASSED

org.apache.kafka.connect.runtime.rest.resources.RootResourceTest > testRootGet 
STARTED

org.apache.kafka.connect.runtime.rest.resources.RootResourceTest > testRootGet 
PASSED

org.apache.kafka.connect.runtime.isolation.DelegatingClassLoaderTest > 
testWhiteListedManifestResources STARTED

org.apache.kafka.connect.runtime.isolation.DelegatingClassLoaderTest > 
testWhiteListedManifestResources PASSED

org.apache.kafka.connect.runtime.isolation.DelegatingClassLoaderTest > 
testOtherResources STARTED

org.apache.kafka.connect.runtime.isolation.DelegatingClassLoaderTest > 
testOtherResources PASSED

org.apache.kafka.connect.runtime.isolation.PluginDescTest > 
testPluginDescWithNullVersion STARTED

org.apache.kafka.connect.runtime.isolation.PluginDescTest > 
testPluginDescWithNullVersion PASSED

org.apache.kafka.connect.runtime.isolation.PluginDescTest > 
testPluginDescComparison STARTED

org.apache.kafka.connect.runtime.isolation.PluginDescTest > 
testPluginDescComparison PASSED

org.apache.kafka.connect.runtime.isolation.PluginDescTest > 
testRegularPluginDesc STARTED

org.apache.kafka.connect.runtime.isolation.PluginDescTest > 
testRegularPluginDesc PASSED

org.apache.kafka.connect.runtime.isolation.PluginDescTest > 
testPluginDescEquality STARTED

org.apache.kafka.connect.runtime.isolation.PluginDescTest > 
testPluginDescEquality PASSED

org.apache.kafka.connect.runtime.isolation.PluginDescTest > 
testPluginDescWithSystemClassLoader STARTED

org.apache.kafka.connect.runtime.isolation.PluginDescTest > 
testPluginDescWithSystemClassLoader PASSED

org.apache.kafka.connect.runtime.isolation.PluginsTest > 
shouldInstantiateAndConfigureConnectRestExtension STARTED

org.apache.kafka.connect.runtime.isolation.PluginsTest > 
shouldInstantiateAndConfigureConnectRestExtension PASSED

org.apache.kafka.connect.runtime.isolation.PluginsTest > 
shouldInstantiateAndConfigureConverters STARTED

org.apache.kafka.connect.runtime.isolation.PluginsTest > 
shouldInstantiateAndConfigureConverters PASSED

org.apache.kafka.connect.runtime.isolation.PluginsTest > 
shouldInstantiateAndConfigureInternalConverters STARTED

org.apache.kafka.connect.runtime.isolation.PluginsTest > 
shouldInstantiateAndConfigureInternalConverters PASSED

org.apache.kafka.connect.runtime.isolation.PluginsTest > 
shouldInstantiateAndConfigureDefaultHeaderConverter STARTED

org.apache.kafka.connect.runtime.isolation.PluginsTest > 
shouldInstantiateAndConfigureDefaultHeaderConverter PASSED

org.apache.kafka.connect.runtime.isolation.PluginsTest > 
shouldInstantiateAndConfigureExplicitlySetHeaderConverterWithCurrentClassLoader 
STARTED

org.apache.kafka.connect.runtime.isolation.PluginsTest > 
shouldInstantiateAndConfigureExplicitlySetHeaderConverterWithCurrentClassLoader 
PASSED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testPluginUrlsWithRelativeSymlinkForwards STARTED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testPluginUrlsWithRelativeSymlinkForwards PASSED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testOrderOfPluginUrlsWithJars STARTED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testOrderOfPluginUrlsWithJars PASSED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testPluginUrlsWithClasses STARTED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testPluginUrlsWithClasses PASSED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testConnectFrameworkClasses STARTED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testConnectFrameworkClasses PASSED

org.apache.kafka.connect.runtime.isolation.PluginUtilsTest > 
testThirdPartyClasses STARTED


Re: Request to be added as a contributor

2019-09-04 Thread Bill Bejeck
Soontaek,

You're all set now.  Thanks for your interest in Apache Kafka.

-Bill

On Wed, Sep 4, 2019 at 10:08 AM Soontaek Lim  wrote:

> Hi Team,
>
> Request you to add me to the contributor list.
> Jira username: soontaek.lim
>
> Thank you
>
> Best regards,
> Soontaek Lim
>


Request to be added as a contributor

2019-09-04 Thread Soontaek Lim
Hi Team,

Request you to add me to the contributor list.
Jira username: soontaek.lim

Thank you

Best regards,
Soontaek Lim


[jira] [Resolved] (KAFKA-8676) Avoid Stopping Unnecessary Connectors and Tasks

2019-09-04 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-8676.
--
Fix Version/s: (was: 2.3.0)
   2.3.1
   2.4.0
 Reviewer: Konstantine Karantasis
   Resolution: Fixed

Merged the proposed fix (https://github.com/apache/kafka/pull/7097) and 
[~kkonstantine]'s unit test (https://github.com/apache/kafka/pull/7287).

> Avoid Stopping Unnecessary Connectors and Tasks 
> 
>
> Key: KAFKA-8676
> URL: https://issues.apache.org/jira/browse/KAFKA-8676
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.3.0
> Environment: centOS
>Reporter: Luying Liu
>Priority: Major
>  Labels: ready-to-commit
> Fix For: 2.4.0, 2.3.1
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> When adding a new connector or changing a connector configuration, Kafka 
> Connect 2.3.0 will stop all existing tasks and start all the tasks, including 
> the new tasks and the existing ones. However, it is not necessary at all. 
> Only the new connector and tasks need to be started. As the rebalancing can 
> be applied for both running and suspended tasks.The following patch will fix 
> this problem and starts only the new tasks and connectors.
> The problem lies in the 
> KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623 in 
> KafkaConfigBackingStore.java). When record.key() startsWith "commit-", the 
> tasks are being committed, and the deferred tasks are processed, Some new 
> tasks are added to the 'updatedTasks'(line 623 in 
> KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to 
> updateListener to complete the task configuration update(line 638 in 
> KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate() 
> function, the  'updatedTasks' are added to the member variable, 
> 'taskConfigUpdates', of class DistributedHerder(line 1295 in 
> DistributedHerder.java).
> In another thread, 'taskConfigUpdates' is copied to 'taskConfigUpdatesCopy' 
> in updateConfigsWithIncrementalCooperative() (line 445 in 
> DistributedHerder.java). The 'taskConfigUpdatesCopy' is subsequently used in 
> processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in 
> DistributedHerder.java). This function then uses  'taskConfigUpdatesCopy' to 
> find connectors to stop(line 492 in DistributedHerder.java), and finally get 
> the tasks to stop, which are all the tasks. The worker thread does the actual 
> job of stop(line 499 in DistributedHerder.java). 
> In the original code, all the tasks are added to the 'updatedTasks' (line 623 
> in KafkaConfigBackingStore.java), which means all the active connectors are 
> in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the 
> 'tasksToStop' list. This causes the stops, and of course the subsequent 
> restarts, of all the tasks. 
> So, adding only the 'deferred' tasks to the  'updatedTasks' can avoid the 
> stops and restarts of unnecessary tasks.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Build failed in Jenkins: kafka-trunk-jdk11 #791

2019-09-04 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: Add system configuration to zk security exception messages

[matthias] KAFKA-7849: Fix the warning when using GlobalKTable (#7104)

--
[...truncated 2.63 MB...]

org.apache.kafka.connect.transforms.HoistFieldTest > schemaless STARTED

org.apache.kafka.connect.transforms.HoistFieldTest > schemaless PASSED

org.apache.kafka.connect.transforms.TimestampRouterTest > defaultConfiguration 
STARTED

org.apache.kafka.connect.transforms.TimestampRouterTest > defaultConfiguration 
PASSED

org.apache.kafka.connect.transforms.InsertFieldTest > 
schemalessInsertConfiguredFields STARTED

org.apache.kafka.connect.transforms.InsertFieldTest > 
schemalessInsertConfiguredFields PASSED

org.apache.kafka.connect.transforms.InsertFieldTest > topLevelStructRequired 
STARTED

org.apache.kafka.connect.transforms.InsertFieldTest > topLevelStructRequired 
PASSED

org.apache.kafka.connect.transforms.InsertFieldTest > 
copySchemaAndInsertConfiguredFields STARTED

org.apache.kafka.connect.transforms.InsertFieldTest > 
copySchemaAndInsertConfiguredFields PASSED

org.apache.kafka.connect.transforms.ValueToKeyTest > withSchema STARTED

org.apache.kafka.connect.transforms.ValueToKeyTest > withSchema PASSED

org.apache.kafka.connect.transforms.ValueToKeyTest > schemaless STARTED

org.apache.kafka.connect.transforms.ValueToKeyTest > schemaless PASSED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > schemaNameUpdate 
STARTED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > schemaNameUpdate 
PASSED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > 
schemaNameAndVersionUpdateWithStruct STARTED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > 
schemaNameAndVersionUpdateWithStruct PASSED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > 
updateSchemaOfStruct STARTED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > 
updateSchemaOfStruct PASSED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > 
schemaNameAndVersionUpdate STARTED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > 
schemaNameAndVersionUpdate PASSED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > updateSchemaOfNull 
STARTED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > updateSchemaOfNull 
PASSED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > schemaVersionUpdate 
STARTED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > schemaVersionUpdate 
PASSED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > 
updateSchemaOfNonStruct STARTED

org.apache.kafka.connect.transforms.SetSchemaMetadataTest > 
updateSchemaOfNonStruct PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessFieldConversion STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessFieldConversion PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testConfigInvalidTargetType STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testConfigInvalidTargetType PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessStringToTimestamp STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessStringToTimestamp PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > testKey STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > testKey PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessDateToTimestamp STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessDateToTimestamp PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessTimestampToString STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessTimestampToString PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessTimeToTimestamp STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessTimeToTimestamp PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaTimestampToDate STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaTimestampToDate PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaTimestampToTime STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaTimestampToTime PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaTimestampToUnix STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaTimestampToUnix PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaNullValueToTimestamp STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaNullValueToTimestamp PASSED


Re: [DISCUSS] KIP-504 - Add new Java Authorizer Interface

2019-09-04 Thread Ismael Juma
Hi Jun,

I think it's important to discuss the pluggable API versus the calling
implementation as two separate points.

>From an API perspective, are you suggesting that we should tell users that
they cannot block in authorize? Or are you saying that it's OK to block on
authorize on occasion and the recommendation would be for people to
increase the number of threads in the request thread pool? Architecturally,
this feels wrong in my opinion.

>From the calling implementation perspective, you don't need a callback. You
can basically say:

authorize(...).map { result =>
  ...
}

And then have a common method take that Future and handle it synchronously
if it's already complete or submit it to the purgatory if not. This is
similar to how many modern async web libraries work.

As Rajini said, this could be done later. The initial implementation could
simply do `toCompletableFuture().get()` to do it synchronously. But it
would mean that we could add this functionality without changing the
pluggable interface.

Ismael

On Wed, Sep 4, 2019 at 5:29 AM Jun Rao  wrote:

> Hi, Rajini,
>
> Thanks for the KIP. I was concerned about #4 too. If we change the handling
> of all requests to use an async authorize() api, will that cause the code
> much harder to understand? There are quite a few callbacks already. I am
> not sure that we want to introduce more of those. The benefit from async
> authorize() api seems limited.
>
> Jun
>
> On Wed, Sep 4, 2019 at 5:38 PM Rajini Sivaram 
> wrote:
>
> > Hi Don,
> >
> > Thanks for your note.
> >
> > 1) The intention is to avoid blocking in the calling thread. We already
> > have several requests that are put into a purgatory when waiting for
> remote
> > communication, for example produce request waiting for replication. Since
> > we have a limited number of request threads in the broker, we want to
> make
> > progress with other requests, while one is waiting on any form of remote
> > communication.
> >
> > 2) Async management calls will be useful with the default authorizer when
> > KIP-500 removes ZK and we rely on Kafka instead. Our current ZK-based
> > implementation as well as any custom implementations that don't want to
> be
> > async will just need to return a sync'ed value. So instead of returning `
> > value`, the code would just return
> > `CompletableFuture.completedFuture(value)
> > `. So it would be just a single line change in the implementation with
> the
> > new API. The caller would treat completedFuture exactly as it does today,
> > processing the request synchronously without using a purgatory.
> >
> > 3) For implementations that return a completedFuture as described in 2),
> > the behaviour would remain exactly the same. No additional threads or
> > purgatory will be used for this case. So there would be no performance
> > penalty. For implementations that return a future that is not complete,
> we
> > prioritise running more requests concurrently. So in a deployment with a
> > large number of clients, we would improve performance by allowing other
> > requests to be processed on the request threads while some are waiting
> for
> > authorization metadata.
> >
> > 4) I was concerned about this too. The goal is to make the API flexible
> > enough to handle large scale deployments in future when caching all
> > authorization metadata in each broker is not viable. Using an async API
> > that returns CompletionStage, the caller has the option to handle the
> > result synchronously or asynchronously, so we don't necessarily need to
> > update the calling code right away. Custom authorizers using the async
> API
> > have full control over whether authorization is performed in-line since
> > completedFuture will always be handled synchronously. We do need to
> update
> > KafkaApis to take advantage of the asynchronous API to improve scale.
> Even
> > though this is a big change, since we will be doing the same for all
> > requests, it shouldn't be too hard to maintain since the same pattern
> will
> > be used for all requests.
> >
> > Regards,
> >
> > Rajini
> >
> > On Tue, Sep 3, 2019 at 11:48 PM Don Bosco Durai 
> wrote:
> >
> > > Hi Rajini
> > >
> > > Help me understand this a bit more.
> > >
> > > 1. For all practical purpose, without authorization you can't go to the
> > > next step. The calling code needs to block anyway. So should we just
> let
> > > the implementation code do the async part?
> > > 2. If you feel management calls need to be async, then we should
> consider
> > > another set of async APIs. I don't feel we should complicate it
> further (
> > > 3. Another concern I have is wrt performance. Kafka has been built to
> > > handle 1000s per second requests. Not sure whether making it async will
> > add
> > > any unnecessary overhead.
> > > 4. How much complication would this add on the calling side? And is it
> > > worth it?
> > >
> > > Thanks
> > >
> > > Bosco
> > >
> > >
> > > On 9/3/19, 8:50 AM, "Rajini Sivaram"  wrote:
> > >
> > > Hi all,
> 

Re: [DISCUSS] KIP-504 - Add new Java Authorizer Interface

2019-09-04 Thread Jun Rao
Hi, Rajini,

Thanks for the KIP. I was concerned about #4 too. If we change the handling
of all requests to use an async authorize() api, will that cause the code
much harder to understand? There are quite a few callbacks already. I am
not sure that we want to introduce more of those. The benefit from async
authorize() api seems limited.

Jun

On Wed, Sep 4, 2019 at 5:38 PM Rajini Sivaram 
wrote:

> Hi Don,
>
> Thanks for your note.
>
> 1) The intention is to avoid blocking in the calling thread. We already
> have several requests that are put into a purgatory when waiting for remote
> communication, for example produce request waiting for replication. Since
> we have a limited number of request threads in the broker, we want to make
> progress with other requests, while one is waiting on any form of remote
> communication.
>
> 2) Async management calls will be useful with the default authorizer when
> KIP-500 removes ZK and we rely on Kafka instead. Our current ZK-based
> implementation as well as any custom implementations that don't want to be
> async will just need to return a sync'ed value. So instead of returning `
> value`, the code would just return
> `CompletableFuture.completedFuture(value)
> `. So it would be just a single line change in the implementation with the
> new API. The caller would treat completedFuture exactly as it does today,
> processing the request synchronously without using a purgatory.
>
> 3) For implementations that return a completedFuture as described in 2),
> the behaviour would remain exactly the same. No additional threads or
> purgatory will be used for this case. So there would be no performance
> penalty. For implementations that return a future that is not complete, we
> prioritise running more requests concurrently. So in a deployment with a
> large number of clients, we would improve performance by allowing other
> requests to be processed on the request threads while some are waiting for
> authorization metadata.
>
> 4) I was concerned about this too. The goal is to make the API flexible
> enough to handle large scale deployments in future when caching all
> authorization metadata in each broker is not viable. Using an async API
> that returns CompletionStage, the caller has the option to handle the
> result synchronously or asynchronously, so we don't necessarily need to
> update the calling code right away. Custom authorizers using the async API
> have full control over whether authorization is performed in-line since
> completedFuture will always be handled synchronously. We do need to update
> KafkaApis to take advantage of the asynchronous API to improve scale. Even
> though this is a big change, since we will be doing the same for all
> requests, it shouldn't be too hard to maintain since the same pattern will
> be used for all requests.
>
> Regards,
>
> Rajini
>
> On Tue, Sep 3, 2019 at 11:48 PM Don Bosco Durai  wrote:
>
> > Hi Rajini
> >
> > Help me understand this a bit more.
> >
> > 1. For all practical purpose, without authorization you can't go to the
> > next step. The calling code needs to block anyway. So should we just let
> > the implementation code do the async part?
> > 2. If you feel management calls need to be async, then we should consider
> > another set of async APIs. I don't feel we should complicate it further (
> > 3. Another concern I have is wrt performance. Kafka has been built to
> > handle 1000s per second requests. Not sure whether making it async will
> add
> > any unnecessary overhead.
> > 4. How much complication would this add on the calling side? And is it
> > worth it?
> >
> > Thanks
> >
> > Bosco
> >
> >
> > On 9/3/19, 8:50 AM, "Rajini Sivaram"  wrote:
> >
> > Hi all,
> >
> > Ismael brought up a point that it will be good to make the Authorizer
> > interface asynchronous to avoid blocking request threads during
> remote
> > operations.
> >
> > 1) Since we want to support different backends for authorization
> > metadata,
> > making createAcls() and deleteAcls() asynchronous makes sense since
> > these
> > always involve remote operations. When KIP-500 removes ZooKeeper, we
> > would
> > want to move ACLs to Kafka and async updates will avoid unnecessary
> > blocking.
> > 2) For authorize() method, we currently use cached ACLs in the
> built-in
> > authorizers, so synchronous authorise operations work well now. But
> > async
> > authorize() would support this scenario as well as authorizers in
> large
> > organisations where an LRU cache would enable a smaller cache even
> > when the
> > backend holds a large amount of ACLs for infrequently used resources
> or
> > users who don't use the system frequently.
> >
> > For both cases, the built-in authorizer will continue to be
> > synchronous,
> > using CompletableFuture.completedFuture() to return the actual
> result.
> > But
> > async API will make custom authorizer implementations more flexible.
> I
> >  

[jira] [Created] (KAFKA-8866) Make Authorizer create/delete exceptions Optional

2019-09-04 Thread Rajini Sivaram (Jira)
Rajini Sivaram created KAFKA-8866:
-

 Summary: Make Authorizer create/delete exceptions Optional
 Key: KAFKA-8866
 URL: https://issues.apache.org/jira/browse/KAFKA-8866
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 2.4.0


>From PR review of: 
>[https://github.com/apache/kafka/pull/7268|https://github.com/apache/kafka/pull/7268:]

We currently return possibly null ApiException in AclCreateResult and 
AclDeleteResult. It would be better to return Optional.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Resolved] (KAFKA-8760) KIP-504: Add new Java Authorizer API

2019-09-04 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-8760.
---
Resolution: Fixed

> KIP-504: Add new Java Authorizer API 
> -
>
> Key: KAFKA-8760
> URL: https://issues.apache.org/jira/browse/KAFKA-8760
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.4.0
>
>
> See 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-504+-+Add+new+Java+Authorizer+Interface]
>  for details.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8865) KIP-504: New Java Authorizer API

2019-09-04 Thread Rajini Sivaram (Jira)
Rajini Sivaram created KAFKA-8865:
-

 Summary: KIP-504: New Java Authorizer API
 Key: KAFKA-8865
 URL: https://issues.apache.org/jira/browse/KAFKA-8865
 Project: Kafka
  Issue Type: Improvement
  Components: security
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram


Parent task for sub-tasks related to KIP-504



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] KIP-441: Smooth Scaling Out for Kafka Streams

2019-09-04 Thread Bruno Cadonna
Hi,

1) What do you mean with "full set of offsets in the topic"? Is this
the sum of all offsets of the changelog partitions of the task?

2) I am not sure whether non-logged stateful tasks should be
effectively treated as stateless tasks during assignment. First we
need to decide whether a non-logged stateful task should preferably be
assigned to the same instance on which it just run in order to
continue to use its state or not.

3) In the example, you define stand-by tasks {S1, S2, ...} but never
use them, because below you use a dedicated row for stand-by tasks.

As a side note to 2) since it is not directly related to this KIP: We
should decide if we want to avoid the possible non-determinism
introduced by non-logged stores or not. That is, if an instance hosts
a task with non-logged stores then we can have two cases after the
next rebalance: a) the task stays on the same instance and continues
to use the same state store as used so far or b) the task is assigned
to another instance and it starts an empty state store. The produced
results for these two cases might differ. To avoid the nondeterminism,
non-logged state stores would need to be wiped out before assignment.
Then the question arises, how the removal of non-logged state stores
before assignment would affect backward-compatibility.

Best,
Bruno

On Wed, Aug 21, 2019 at 11:40 PM John Roesler  wrote:
>
> Hi Guozhang,
>
> > My impression from your previous email is that inside the algorithm when
> we
> are "filling" them to instances some deterministic logic would be used to
> avoid the above case, is that correct?
>
> Yes, that was my plan, but I didn't formalize it. There was a requirement
> that the assignment algorithm must not produce a new assignment if the
> current assignment is already balanced, so at the least, any thrashing
> would be restricted to the "balancing" phase while tasks are moving around
> the cluster.
>
> Anyway, I think it would be good to say that we'll "try to" produce stable
> assignments, so I've added a "should" clause to the assignment spec:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm
>
> For example, we would sort the stateless tasks and available instances
> before assigning them, so that the stateless task assignment would mostly
> stay stable between assignments, modulo the compute capacity of the
> instances changing a little as active stateful tasks get assigned in more
> balanced ways.
>
> Thanks,
> -John
>
>
> On Wed, Aug 21, 2019 at 1:55 PM Guozhang Wang  wrote:
>
> > Hello John,
> >
> > That sounds reasonable. Just double checked the code that with logging
> > disabled the corresponding checkpoint file would not contain any values,
> > just like a stateless task. So I think treating them logically the same is
> > fine.
> >
> > Guozhang
> >
> >
> > On Wed, Aug 21, 2019 at 11:41 AM John Roesler  wrote:
> >
> > > Hi again, Guozhang,
> > >
> > > While writing up the section on stateless tasks (
> > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Statelesstasks
> > > ),
> > > I reconsidered whether stateful, but non-logged, tasks should actually
> > > report a lag of zero, versus not reporting any lag. By the definition of
> > > the "StatefulTasksToRankedCandidates" function, the leader would compute
> > a
> > > lag of zero for these tasks anyway.
> > >
> > > Therefore, I think the same reasoning that I supplied you for stateless
> > > tasks applies, since the member and leader will agree on a lag of zero
> > > anyway, we can avoid adding them to the "Task Lags" map, and save some
> > > bytes in the JoinGroup request. This would be especially beneficial in an
> > > application that uses remote stores for _all_ its state stores, it would
> > > have an extremely lightweight JoinGroup request, with no task lags at
> > all.
> > >
> > > WDYT?
> > > -John
> > >
> > > On Wed, Aug 21, 2019 at 1:17 PM John Roesler  wrote:
> > >
> > > > Thanks, Guozhang.
> > > >
> > > > (Side note: I noticed on another pass over the discussion that I'd
> > missed
> > > > addressing your comment about the potential race condition between
> > state
> > > > cleanup and lag-based assignment. I've added a solution to the
> > proposal:
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Racebetweenassignmentandstatecleanup
> > > > )
> > > >
> > > > In the JoinGroup (SubscriptionInfo) metadata, stateless tasks are not
> > > > represented at all. This should save us some bytes in the request
> > > metadata.
> > > > If we treated them like non-logged stateful tasks and reported a lag of
> > > 0,
> > > > the only difference is that the assignor would be able to tell which
> > > > members previously hosted that stateless task.
> > 

Re: [DISCUSS] KIP-504 - Add new Java Authorizer Interface

2019-09-04 Thread Rajini Sivaram
Hi Colin & Don,

I have submitted a PR to help with reviewing the new API:
https://github.com/apache/kafka/pull/7293/files.

   - Authorizer.java contains the updated API and includes javadoc
   update including the threading model.
   - AclAuthorizer.scala changes in the PR should give an idea of the
   implementation changes for custom authorizers that want to continue to be
   synchronous.
   - KafkaApis.scala continues to use the API synchronously for now. It can
   be updated later.


Thank you,

Rajini


On Wed, Sep 4, 2019 at 9:38 AM Rajini Sivaram 
wrote:

> Hi Don,
>
> Thanks for your note.
>
> 1) The intention is to avoid blocking in the calling thread. We already
> have several requests that are put into a purgatory when waiting for remote
> communication, for example produce request waiting for replication. Since
> we have a limited number of request threads in the broker, we want to make
> progress with other requests, while one is waiting on any form of remote
> communication.
>
> 2) Async management calls will be useful with the default authorizer when
> KIP-500 removes ZK and we rely on Kafka instead. Our current ZK-based
> implementation as well as any custom implementations that don't want to be
> async will just need to return a sync'ed value. So instead of returning `
> value`, the code would just return `CompletableFuture.completedFuture
> (value)`. So it would be just a single line change in the implementation
> with the new API. The caller would treat completedFuture exactly as it
> does today, processing the request synchronously without using a purgatory.
>
> 3) For implementations that return a completedFuture as described in 2),
> the behaviour would remain exactly the same. No additional threads or
> purgatory will be used for this case. So there would be no performance
> penalty. For implementations that return a future that is not complete, we
> prioritise running more requests concurrently. So in a deployment with a
> large number of clients, we would improve performance by allowing other
> requests to be processed on the request threads while some are waiting for
> authorization metadata.
>
> 4) I was concerned about this too. The goal is to make the API flexible
> enough to handle large scale deployments in future when caching all
> authorization metadata in each broker is not viable. Using an async API
> that returns CompletionStage, the caller has the option to handle the
> result synchronously or asynchronously, so we don't necessarily need to
> update the calling code right away. Custom authorizers using the async API
> have full control over whether authorization is performed in-line since
> completedFuture will always be handled synchronously. We do need to
> update KafkaApis to take advantage of the asynchronous API to improve
> scale. Even though this is a big change, since we will be doing the same
> for all requests, it shouldn't be too hard to maintain since the same
> pattern will be used for all requests.
>
> Regards,
>
> Rajini
>
> On Tue, Sep 3, 2019 at 11:48 PM Don Bosco Durai  wrote:
>
>> Hi Rajini
>>
>> Help me understand this a bit more.
>>
>> 1. For all practical purpose, without authorization you can't go to the
>> next step. The calling code needs to block anyway. So should we just let
>> the implementation code do the async part?
>> 2. If you feel management calls need to be async, then we should consider
>> another set of async APIs. I don't feel we should complicate it further (
>> 3. Another concern I have is wrt performance. Kafka has been built to
>> handle 1000s per second requests. Not sure whether making it async will add
>> any unnecessary overhead.
>> 4. How much complication would this add on the calling side? And is it
>> worth it?
>>
>> Thanks
>>
>> Bosco
>>
>>
>> On 9/3/19, 8:50 AM, "Rajini Sivaram"  wrote:
>>
>> Hi all,
>>
>> Ismael brought up a point that it will be good to make the Authorizer
>> interface asynchronous to avoid blocking request threads during remote
>> operations.
>>
>> 1) Since we want to support different backends for authorization
>> metadata,
>> making createAcls() and deleteAcls() asynchronous makes sense since
>> these
>> always involve remote operations. When KIP-500 removes ZooKeeper, we
>> would
>> want to move ACLs to Kafka and async updates will avoid unnecessary
>> blocking.
>> 2) For authorize() method, we currently use cached ACLs in the
>> built-in
>> authorizers, so synchronous authorise operations work well now. But
>> async
>> authorize() would support this scenario as well as authorizers in
>> large
>> organisations where an LRU cache would enable a smaller cache even
>> when the
>> backend holds a large amount of ACLs for infrequently used resources
>> or
>> users who don't use the system frequently.
>>
>> For both cases, the built-in authorizer will continue to be
>> synchronous,
>> using CompletableFuture.completedFuture() to 

Current master does not build

2019-09-04 Thread Florian Schmidt
Hello,

I'm trying to compile the current master (commit:
8dc80e22973d89090833cb791d9269e9a0598059),
but the build fails at the tasks :core:compileScala and
:kafka:core:compileScala with the following error:

.../kafka/core/build/classes/scala/main/kafka/api/package.class' is broken
(class java.lang.RuntimeException/error reading Scala signature of
package.class: Scala signature package has wrong version
 expected: 5.0
 found: 5.2 in package.class)

It seems that during compilation the 2.12-compiler uses a 2.13 library!?

Best regards,
Florian


Re: [VOTE] KIP-482: The Kafka Protocol should Support Optional Tagged Fields

2019-09-04 Thread Harsha Chintalapani
LGTM. +1 (binding)
-Harsha


On Wed, Sep 04, 2019 at 1:46 AM, Satish Duggana 
wrote:

> +1 (non-binding) Thanks for the nice KIP.
>
> You may want to update the KIP saying that optional tagged fields do not
> support complex types(or structs).
>
> On Wed, Sep 4, 2019 at 3:43 AM Jose Armando Garcia Sancio
>  wrote:
>
> +1 (non-binding)
>
> Looking forward to this improvement.
>
> On Tue, Sep 3, 2019 at 12:49 PM David Jacot  wrote:
>
> +1 (non-binding)
>
> Thank for the KIP. Great addition to the Kafka protocol!
>
> Best,
> David
>
> Le mar. 3 sept. 2019 à 19:17, Colin McCabe  a écrit :
>
> Hi all,
>
> I'd like to start the vote for KIP-482: The Kafka Protocol should Support
> Optional Tagged Fields.
>
> KIP:
>
> https://cwiki.apache.org/confluence/display/KAFKA/
> KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields
>
> Discussion thread here:
>
> https://lists.apache.org/thread.html/
> cdc801ae886491b73ef7efecac7ef81b24382f8b6b025899ee343f7a@%3Cdev.kafka.
> apache.org%3E
>
> best,
> Colin
>
> --
> -Jose
>
>


Kafka to Kudu

2019-09-04 Thread Sarfraz SMS
Hello,

Greetings people,
I want to move the data from kafka brokers consumed into Kudu cloud
database. Is there any pre requisites needed in doing this ? Any
suggestions Do's/Don'ts in implementing this ?


Thanks,
Sarfraz


Re: [VOTE] KIP-482: The Kafka Protocol should Support Optional Tagged Fields

2019-09-04 Thread Satish Duggana
+1 (non-binding) Thanks for the nice KIP.

You may want to update the KIP saying that optional tagged fields do
not support complex types(or structs).

On Wed, Sep 4, 2019 at 3:43 AM Jose Armando Garcia Sancio
 wrote:
>
> +1 (non-binding)
>
> Looking forward to this improvement.
>
> On Tue, Sep 3, 2019 at 12:49 PM David Jacot  wrote:
>
> > +1 (non-binding)
> >
> > Thank for the KIP. Great addition to the Kafka protocol!
> >
> > Best,
> > David
> >
> > Le mar. 3 sept. 2019 à 19:17, Colin McCabe  a écrit :
> >
> > > Hi all,
> > >
> > > I'd like to start the vote for KIP-482: The Kafka Protocol should Support
> > > Optional Tagged Fields.
> > >
> > > KIP:
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields
> > >
> > > Discussion thread here:
> > >
> > https://lists.apache.org/thread.html/cdc801ae886491b73ef7efecac7ef81b24382f8b6b025899ee343f7a@%3Cdev.kafka.apache.org%3E
> > >
> > > best,
> > > Colin
> > >
> >
>
>
> --
> -Jose


Re: [DISCUSS] KIP-504 - Add new Java Authorizer Interface

2019-09-04 Thread Rajini Sivaram
Hi Don,

Thanks for your note.

1) The intention is to avoid blocking in the calling thread. We already
have several requests that are put into a purgatory when waiting for remote
communication, for example produce request waiting for replication. Since
we have a limited number of request threads in the broker, we want to make
progress with other requests, while one is waiting on any form of remote
communication.

2) Async management calls will be useful with the default authorizer when
KIP-500 removes ZK and we rely on Kafka instead. Our current ZK-based
implementation as well as any custom implementations that don't want to be
async will just need to return a sync'ed value. So instead of returning `
value`, the code would just return `CompletableFuture.completedFuture(value)
`. So it would be just a single line change in the implementation with the
new API. The caller would treat completedFuture exactly as it does today,
processing the request synchronously without using a purgatory.

3) For implementations that return a completedFuture as described in 2),
the behaviour would remain exactly the same. No additional threads or
purgatory will be used for this case. So there would be no performance
penalty. For implementations that return a future that is not complete, we
prioritise running more requests concurrently. So in a deployment with a
large number of clients, we would improve performance by allowing other
requests to be processed on the request threads while some are waiting for
authorization metadata.

4) I was concerned about this too. The goal is to make the API flexible
enough to handle large scale deployments in future when caching all
authorization metadata in each broker is not viable. Using an async API
that returns CompletionStage, the caller has the option to handle the
result synchronously or asynchronously, so we don't necessarily need to
update the calling code right away. Custom authorizers using the async API
have full control over whether authorization is performed in-line since
completedFuture will always be handled synchronously. We do need to update
KafkaApis to take advantage of the asynchronous API to improve scale. Even
though this is a big change, since we will be doing the same for all
requests, it shouldn't be too hard to maintain since the same pattern will
be used for all requests.

Regards,

Rajini

On Tue, Sep 3, 2019 at 11:48 PM Don Bosco Durai  wrote:

> Hi Rajini
>
> Help me understand this a bit more.
>
> 1. For all practical purpose, without authorization you can't go to the
> next step. The calling code needs to block anyway. So should we just let
> the implementation code do the async part?
> 2. If you feel management calls need to be async, then we should consider
> another set of async APIs. I don't feel we should complicate it further (
> 3. Another concern I have is wrt performance. Kafka has been built to
> handle 1000s per second requests. Not sure whether making it async will add
> any unnecessary overhead.
> 4. How much complication would this add on the calling side? And is it
> worth it?
>
> Thanks
>
> Bosco
>
>
> On 9/3/19, 8:50 AM, "Rajini Sivaram"  wrote:
>
> Hi all,
>
> Ismael brought up a point that it will be good to make the Authorizer
> interface asynchronous to avoid blocking request threads during remote
> operations.
>
> 1) Since we want to support different backends for authorization
> metadata,
> making createAcls() and deleteAcls() asynchronous makes sense since
> these
> always involve remote operations. When KIP-500 removes ZooKeeper, we
> would
> want to move ACLs to Kafka and async updates will avoid unnecessary
> blocking.
> 2) For authorize() method, we currently use cached ACLs in the built-in
> authorizers, so synchronous authorise operations work well now. But
> async
> authorize() would support this scenario as well as authorizers in large
> organisations where an LRU cache would enable a smaller cache even
> when the
> backend holds a large amount of ACLs for infrequently used resources or
> users who don't use the system frequently.
>
> For both cases, the built-in authorizer will continue to be
> synchronous,
> using CompletableFuture.completedFuture() to return the actual result.
> But
> async API will make custom authorizer implementations more flexible. I
> would like to know if there are any concerns with these changes before
> updating the KIP.
>
> *Proposed API:*
> public interface Authorizer extends Configurable, Closeable {
>
> Map> start(AuthorizerServerInfo
> serverInfo);
> List>
> authorize(AuthorizableRequestContext requestContext, List
> actions);
> List>
> createAcls(AuthorizableRequestContext requestContext, List
> aclBindings);
> List>
> deleteAcls(AuthorizableRequestContext requestContext,
> List aclBindingFilters);
> CompletionStage> acls(AclBindingFilter
> 

Build failed in Jenkins: kafka-trunk-jdk11 #790

2019-09-04 Thread Apache Jenkins Server
See 


Changes:

[rhauch] Changed for updatedTasks, avoids stopping and starting of unnecessary

[rhauch] MINOR: Add unit test for KAFKA-8676 to guard against unrequired task

--
[...truncated 2.63 MB...]

org.apache.kafka.connect.json.JsonConverterTest > mapToJsonNonStringKeys STARTED

org.apache.kafka.connect.json.JsonConverterTest > mapToJsonNonStringKeys PASSED

org.apache.kafka.connect.json.JsonConverterTest > longToJson STARTED

org.apache.kafka.connect.json.JsonConverterTest > longToJson PASSED

org.apache.kafka.connect.json.JsonConverterTest > mismatchSchemaJson STARTED

org.apache.kafka.connect.json.JsonConverterTest > mismatchSchemaJson PASSED

org.apache.kafka.connect.json.JsonConverterTest > mapToConnectNonStringKeys 
STARTED

org.apache.kafka.connect.json.JsonConverterTest > mapToConnectNonStringKeys 
PASSED

org.apache.kafka.connect.json.JsonConverterTest > 
testJsonSchemaMetadataTranslation STARTED

org.apache.kafka.connect.json.JsonConverterTest > 
testJsonSchemaMetadataTranslation PASSED

org.apache.kafka.connect.json.JsonConverterTest > bytesToJson STARTED

org.apache.kafka.connect.json.JsonConverterTest > bytesToJson PASSED

org.apache.kafka.connect.json.JsonConverterTest > shortToConnect STARTED

org.apache.kafka.connect.json.JsonConverterTest > shortToConnect PASSED

org.apache.kafka.connect.json.JsonConverterTest > intToJson STARTED

org.apache.kafka.connect.json.JsonConverterTest > intToJson PASSED

org.apache.kafka.connect.json.JsonConverterTest > nullSchemaAndNullValueToJson 
STARTED

org.apache.kafka.connect.json.JsonConverterTest > nullSchemaAndNullValueToJson 
PASSED

org.apache.kafka.connect.json.JsonConverterTest > timestampToConnectOptional 
STARTED

org.apache.kafka.connect.json.JsonConverterTest > timestampToConnectOptional 
PASSED

org.apache.kafka.connect.json.JsonConverterTest > structToConnect STARTED

org.apache.kafka.connect.json.JsonConverterTest > structToConnect PASSED

org.apache.kafka.connect.json.JsonConverterTest > stringToJson STARTED

org.apache.kafka.connect.json.JsonConverterTest > stringToJson PASSED

org.apache.kafka.connect.json.JsonConverterTest > nullSchemaAndArrayToJson 
STARTED

org.apache.kafka.connect.json.JsonConverterTest > nullSchemaAndArrayToJson 
PASSED

org.apache.kafka.connect.json.JsonConverterTest > byteToConnect STARTED

org.apache.kafka.connect.json.JsonConverterTest > byteToConnect PASSED

org.apache.kafka.connect.json.JsonConverterTest > nullSchemaPrimitiveToConnect 
STARTED

org.apache.kafka.connect.json.JsonConverterTest > nullSchemaPrimitiveToConnect 
PASSED

org.apache.kafka.connect.json.JsonConverterTest > byteToJson STARTED

org.apache.kafka.connect.json.JsonConverterTest > byteToJson PASSED

org.apache.kafka.connect.json.JsonConverterTest > intToConnect STARTED

org.apache.kafka.connect.json.JsonConverterTest > intToConnect PASSED

org.apache.kafka.connect.json.JsonConverterTest > dateToJson STARTED

org.apache.kafka.connect.json.JsonConverterTest > dateToJson PASSED

org.apache.kafka.connect.json.JsonConverterTest > noSchemaToConnect STARTED

org.apache.kafka.connect.json.JsonConverterTest > noSchemaToConnect PASSED

org.apache.kafka.connect.json.JsonConverterTest > noSchemaToJson STARTED

org.apache.kafka.connect.json.JsonConverterTest > noSchemaToJson PASSED

org.apache.kafka.connect.json.JsonConverterTest > nullSchemaAndPrimitiveToJson 
STARTED

org.apache.kafka.connect.json.JsonConverterTest > nullSchemaAndPrimitiveToJson 
PASSED

org.apache.kafka.connect.json.JsonConverterTest > 
decimalToConnectOptionalWithDefaultValue STARTED

org.apache.kafka.connect.json.JsonConverterTest > 
decimalToConnectOptionalWithDefaultValue PASSED

org.apache.kafka.connect.json.JsonConverterTest > mapToJsonStringKeys STARTED

org.apache.kafka.connect.json.JsonConverterTest > mapToJsonStringKeys PASSED

org.apache.kafka.connect.json.JsonConverterTest > arrayToJson STARTED

org.apache.kafka.connect.json.JsonConverterTest > arrayToJson PASSED

org.apache.kafka.connect.json.JsonConverterTest > nullToConnect STARTED

org.apache.kafka.connect.json.JsonConverterTest > nullToConnect PASSED

org.apache.kafka.connect.json.JsonConverterTest > timeToJson STARTED

org.apache.kafka.connect.json.JsonConverterTest > timeToJson PASSED

org.apache.kafka.connect.json.JsonConverterTest > structToJson STARTED

org.apache.kafka.connect.json.JsonConverterTest > structToJson PASSED

org.apache.kafka.connect.json.JsonConverterTest > 
testConnectSchemaMetadataTranslation STARTED

org.apache.kafka.connect.json.JsonConverterTest > 
testConnectSchemaMetadataTranslation PASSED

org.apache.kafka.connect.json.JsonConverterTest > shortToJson STARTED

org.apache.kafka.connect.json.JsonConverterTest > shortToJson PASSED

org.apache.kafka.connect.json.JsonConverterTest > dateToConnect STARTED

org.apache.kafka.connect.json.JsonConverterTest > dateToConnect 

Re: [VOTE] KIP-496: Administrative API to delete consumer offsets

2019-09-04 Thread David Jacot
Hi all,

While implementing the KIP, I have realized that a new error code and
exception is required to notify the caller that offsets of a topic can not
be deleted because the group is actively subscribed to the topic.

I would like to know if there are any concerns with these changes before
updating the KIP.

*Proposed API:*
GROUP_SUBSCRIBED_TO_TOPIC(86, "The consumer group is actively subscribed to
the topic", GroupSubscribedToTopicException::new);

public class GroupSubscribedToTopicException extends ApiException {
public GroupSubscribedToTopicException(String message) {
super(message);
}
}

Best,
David

On Fri, Aug 16, 2019 at 10:58 AM Mickael Maison 
wrote:

> +1 (non binding)
> Thanks!
>
> On Thu, Aug 15, 2019 at 11:53 PM Colin McCabe  wrote:
> >
> > On Thu, Aug 15, 2019, at 11:47, Jason Gustafson wrote:
> > > Hey Colin, I think deleting all offsets is equivalent to deleting the
> > > group, which can be done with the `deleteConsumerGroups` api. I debated
> > > whether there should be a way to delete partitions for all unsubscribed
> > > topics, but I decided to start with a simple API.
> >
> > That's a fair point-- deleting the group covers the main use-case for
> deleting all offsets.  So we might as well keep it simple for now.
> >
> > cheers,
> > Colin
> >
> > >
> > > I'm going to close this vote. The final result is +3 with myself,
> Guozhang,
> > > and Colin voting.
> > >
> > > -Jason
> > >
> > > On Tue, Aug 13, 2019 at 9:21 AM Colin McCabe 
> wrote:
> > >
> > > > Hi Jason,
> > > >
> > > > Thanks for the KIP.
> > > >
> > > > Is there ever a desire to delete all the offsets for a given group?
> > > > Should the protocol and tools support this?
> > > >
> > > > +1 (binding)
> > > >
> > > > best,
> > > > Colin
> > > >
> > > >
> > > > On Mon, Aug 12, 2019, at 10:57, Guozhang Wang wrote:
> > > > > +1 (binding).
> > > > >
> > > > > Thanks Jason!
> > > > >
> > > > > On Wed, Aug 7, 2019 at 11:18 AM Jason Gustafson <
> ja...@confluent.io>
> > > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > I'd like to start a vote on KIP-496:
> > > > > >
> > > > > >
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets
> > > > > > .
> > > > > > +1
> > > > > > from me of course.
> > > > > >
> > > > > > -Jason
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
>


Re: [DISCUSS] KIP-511: Collect and Expose Client's Name and Version in the Brokers

2019-09-04 Thread David Jacot
Hi Colin,

Thanks for your input. Please, find my comments below:

>> Currently, we don't parse the contents of ApiVersionsRequest at all,
since it's an empty message.  KIP-511 proposes adding some fields here,
which will clearly change that situation.  In the future, any changes to
ApiVersionsRequest will have to only add stuff at the end, rather than
changing fields that already exist.  This will significantly limit our
ability to add new things over time.
>> Therefore, I think we should make sure that KIP-511 is implemented after
KIP-482, so that the version we freeze into place can be one that includes
the ability to add tagged fields, and includes the more efficient string
and array serialization specified in KIP-482.  It's probably worth spelling
that out here.

I agree with you. It makes sense to bump the version once for the two
cases.

>> On another topic, when the client sends an unsupported version N of
ApiVersionsRequest, and the broker only supports versions up to M, the
broker currently falls back to sending a version 0 response.  Why can't the
broker fall back to version M instead?  Previously, we only had versions 1
and 0 of ApiVersionsRequest, so the two behaviors were identical.  But
going forward, once we have version 2 and later of ApiVersoinsRequest, it
doesn't make sense to fall back all the way to 0 if the broker supports 1
(for example).
>> If you agree, it would be good to spell this out in the KIP, so that if
we want to add more things to the response, we can, without losing them
each time the client's version of ApiVersionsRequest exceeds the broker's.

I fully agree with you and I have already outlined this in the proposal,
see "ApiVersions Request/Response Handling". The idea is to use the version
M in the broker so it can leverage the provided information it knows about.
The broker can then send back response with version M as well. On the
client side, it is a bit more tricky. As there is no version in the
response, the client will use the version used for the request to parse the
response. If fields have been added to the schema of the response version
N, it won't work to parse M. As the client doesn't know the version used,
we have two options: 1) use version 0 which is a prefix of all others but
it means losing information; or 2) try versions in descending order from N
to 0. I guess that N-1 would the one in most of the cases.

What is your opinion regarding the client side?

Best,
David

On Wed, Sep 4, 2019 at 12:13 AM Colin McCabe  wrote:

> Hi David,
>
> Thanks again for the KIP.
>
> Currently, we don't parse the contents of ApiVersionsRequest at all, since
> it's an empty message.  KIP-511 proposes adding some fields here, which
> will clearly change that situation.  In the future, any changes to
> ApiVersionsRequest will have to only add stuff at the end, rather than
> changing fields that already exist.  This will significantly limit our
> ability to add new things over time.
>
> Therefore, I think we should make sure that KIP-511 is implemented after
> KIP-482, so that the version we freeze into place can be one that includes
> the ability to add tagged fields, and includes the more efficient string
> and array serialization specified in KIP-482.  It's probably worth spelling
> that out here.
>
> On another topic, when the client sends an unsupported version N of
> ApiVersionsRequest, and the broker only supports versions up to M, the
> broker currently falls back to sending a version 0 response.  Why can't the
> broker fall back to version M instead?  Previously, we only had versions 1
> and 0 of ApiVersionsRequest, so the two behaviors were identical.  But
> going forward, once we have version 2 and later of ApiVersoinsRequest, it
> doesn't make sense to fall back all the way to 0 if the broker supports 1
> (for example).
>
> If you agree, it would be good to spell this out in the KIP, so that if we
> want to add more things to the response, we can, without losing them each
> time the client's version of ApiVersionsRequest exceeds the broker's.
>
> best,
> Colin
>
>
> On Tue, Sep 3, 2019, at 01:26, David Jacot wrote:
> > Hi all,
> >
> > I have updated the KIP to address the various comments. I have also
> added a
> > section about the handling of the ApiVersionsRequest/Response in the
> broker.
> >
> > Please, let me know what you think. I would like to make it for the next
> > release if possible.
> >
> > Best,
> > David
> >
> > On Fri, Aug 30, 2019 at 10:31 AM David Jacot 
> wrote:
> >
> > > Hi Magnus,
> > >
> > > Thank you for your feedback. Please, find my comments below.
> > >
> > > 1. I thought that the clientId was meant for this purpose (providing
> > > information about the application). Is there a gap I don't see?
> > >
> > > 2. I have put two fields to avoid requiring deep validation and
> parsing on
> > > the broker. I believe that it will be easier to use the metadata
> downstream
> > > like this.
> > >
> > > 3. Good point. I think that the broker 

Re: [VOTE] KIP-512:Adding headers to RecordMetaData

2019-09-04 Thread Maulin Vasavada
+1 (non-binding)

On Tue, Sep 3, 2019 at 3:38 PM Renuka M  wrote:

> Hi All,
>
> After good discussion for KIP-512
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-512%3AAdding+headers+to+RecordMetaData
> ,
> am starting thread for voting.
>
> Thanks
> Renuka M
>


Build failed in Jenkins: kafka-trunk-jdk8 #3884

2019-09-04 Thread Apache Jenkins Server
See 


Changes:

[rhauch] Changed for updatedTasks, avoids stopping and starting of unnecessary

[rhauch] MINOR: Add unit test for KAFKA-8676 to guard against unrequired task

--
[...truncated 1.98 MB...]

org.apache.kafka.streams.integration.MetricsIntegrationTest > 
shouldAddMetricsForWindowStore PASSED

org.apache.kafka.streams.integration.MetricsIntegrationTest > 
shouldAddMetricsForSessionStore STARTED

org.apache.kafka.streams.integration.MetricsIntegrationTest > 
shouldAddMetricsForSessionStore PASSED

org.apache.kafka.streams.integration.MetricsIntegrationTest > 
shouldNotAddRocksDBMetricsIfRecordingLevelIsInfo STARTED

org.apache.kafka.streams.integration.MetricsIntegrationTest > 
shouldNotAddRocksDBMetricsIfRecordingLevelIsInfo PASSED

org.apache.kafka.streams.integration.RepartitionOptimizingIntegrationTest > 
shouldSendCorrectRecords_OPTIMIZED STARTED

org.apache.kafka.streams.integration.RepartitionOptimizingIntegrationTest > 
shouldSendCorrectRecords_OPTIMIZED PASSED

org.apache.kafka.streams.integration.RepartitionOptimizingIntegrationTest > 
shouldSendCorrectResults_NO_OPTIMIZATION STARTED

org.apache.kafka.streams.integration.RepartitionOptimizingIntegrationTest > 
shouldSendCorrectResults_NO_OPTIMIZATION PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryMapValuesState STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryMapValuesState PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
concurrentAccesses STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
concurrentAccesses PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldAllowToQueryAfterThreadDied STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldAllowToQueryAfterThreadDied PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryStateWithZeroSizedCache STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryStateWithZeroSizedCache PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryMapValuesAfterFilterState STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryMapValuesAfterFilterState PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryFilterState STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryFilterState PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryStateWithNonZeroSizedCache STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryStateWithNonZeroSizedCache PASSED

org.apache.kafka.streams.integration.GlobalThreadShutDownOrderTest > 
shouldFinishGlobalStoreOperationOnShutDown STARTED

org.apache.kafka.streams.integration.GlobalThreadShutDownOrderTest > 
shouldFinishGlobalStoreOperationOnShutDown PASSED

org.apache.kafka.streams.integration.InternalTopicIntegrationTest > 
shouldCompactTopicsForKeyValueStoreChangelogs STARTED

org.apache.kafka.streams.integration.InternalTopicIntegrationTest > 
shouldCompactTopicsForKeyValueStoreChangelogs PASSED

org.apache.kafka.streams.integration.InternalTopicIntegrationTest > 
shouldCompactAndDeleteTopicsForWindowStoreChangelogs STARTED

org.apache.kafka.streams.integration.InternalTopicIntegrationTest > 
shouldCompactAndDeleteTopicsForWindowStoreChangelogs PASSED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldReduce STARTED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldReduce PASSED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldGroupByKey STARTED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldGroupByKey PASSED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldReduceWindowed STARTED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldReduceWindowed PASSED

org.apache.kafka.streams.integration.StateRestorationIntegrationTest > 
shouldRestoreNullRecord STARTED
ERROR: Could not install GRADLE_4_10_3_HOME
java.lang.NullPointerException