Jenkins build is back to normal : kafka-2.1-jdk8 #179

2019-05-02 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-419 Safely notify Kafka Connect SourceTask is stopped

2019-05-02 Thread Vahid Hashemian
Hi Andrew,

Thanks for the KIP. I'm not too familiar with the internals of KC so I hope
you can clarify a couple of things:

   - It seems the KIP is proposing a new interface because the existing
   "stop()" interface doesn't fully perform what it should ideally be doing.
   Is that a fair statement?
   - You mentioned the "stop()" interface can be called multiple times.
   Would the same thing be true for the proposed interface? Does it matter? Or
   there is a guard against that?
   - I also agree with Ryan that using a verb sounds more intuitive for an
   interface that's supposed to trigger some action.

Regards,
--Vahid


On Thu, Jan 24, 2019 at 9:23 AM Ryanne Dolan  wrote:

> Ah, I'm sorta wrong -- in the current implementation, restartTask()
> stops the task and starts a *new* task instance with the same task ID.
> (I'm not certain that is clear from the documentation or interfaces,
> or if that may change in the future.)
>
> Ryanne
>
> On Thu, Jan 24, 2019 at 10:25 AM Ryanne Dolan 
> wrote:
> >
> > Andrew, I believe the task can be started again with start() during the
> stopping and stopped states in your diagram.
> >
> > Ryanne
> >
> > On Thu, Jan 24, 2019, 10:20 AM Andrew Schofield <
> andrew_schofi...@live.com wrote:
> >>
> >> I've now added a diagram to illustrate the states of a SourceTask. The
> KIP is essentially trying to give a clear signal to SourceTask when all
> work has stopped. In particular, if a SourceTask has a session to the
> source system that it uses in poll() and commit(), it now has a safe way to
> release this.
> >>
> >> Andrew Schofield
> >> IBM Event Streams
> >>
> >> On 21/01/2019, 10:13, "Andrew Schofield" 
> wrote:
> >>
> >> Ryanne,
> >> Thanks for your comments. I think my overarching point is that the
> various states of a SourceTask and the transitions between them seem a bit
> loose and that makes it difficult to figure out when the resources held by
> a SourceTask can be safely released. Your "I can't tell from the
> documentation" comment is key here __ Neither could I.
> >>
> >> The problem is that stop() is a signal to stop polling. It's
> basically a request from the framework to the task and it doesn't tell the
> task that it's actually finished. One of the purposes of the KC framework
> is to make life easy for a connector developer and a nice clean "all done
> now" method would help.
> >>
> >> I think I'll add a diagram to illustrate to the KIP.
> >>
> >> Andrew Schofield
> >> IBM Event Streams
> >>
> >> On 18/01/2019, 19:02, "Ryanne Dolan"  wrote:
> >>
> >> Andrew, do we know whether the SourceTask may be start()ed
> again? If this
> >> is the last call to a SourceTask I suggest we call it close().
> I can't tell
> >> from the documentation.
> >>
> >> Also, do we need this if a SourceTask can keep track of whether
> it was
> >> start()ed since the last stop()?
> >>
> >> Ryanne
> >>
> >>
> >> On Fri, Jan 18, 2019, 12:02 PM Andrew Schofield <
> andrew_schofi...@live.com
> >> wrote:
> >>
> >> > Hi,
> >> > I’ve created a new KIP to enhance the SourceTask interface in
> Kafka
> >> > Connect.
> >> >
> >> >
> >> >
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-419%3A%2BSafely%2Bnotify%2BKafka%2BConnect%2BSourceTask%2Bis%2Bstoppeddata=02%7C01%7C%7Cfa73e281fe0348a2740b08d67f8924b8%7C84df9e7fe9f640afb435%7C1%7C0%7C636836624328119778sdata=v6BU3q3W4Q2RIkdWtHCCn5uCSTF%2BMAnbj%2F%2B2%2Flladco%3Dreserved=0
> >> >
> >> > Comments welcome.
> >> >
> >> > Andrew Schofield
> >> > IBM Event Streams
> >> >
> >> >
> >>
> >>
> >>
> >>
>


-- 

Thanks!
--Vahid


Re: [DISCUSS] 2.2.1 Bug Fix Release

2019-05-02 Thread Vahid Hashemian
If there are no objections on the proposed plan, I'll start preparing the
first release candidate.

Thanks,
--Vahid

On Thu, Apr 25, 2019 at 6:27 AM Ismael Juma  wrote:

> Thanks Vahid!
>
> On Wed, Apr 24, 2019 at 10:44 PM Vahid Hashemian <
> vahid.hashem...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > I'd like to volunteer for the release manager of the 2.2.1 bug fix
> release.
> > Kafka 2.2.0 was released on March 22, 2019.
> >
> > At this point, there are 29 resolved JIRA issues scheduled for inclusion
> in
> > 2.2.1:
> >
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Resolved%2C%20Closed)%20AND%20fixVersion%20%3D%202.2.1
> >
> > The release plan is documented here:
> > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+2.2.1
> >
> > Thanks!
> > --Vahid
> >
>


-- 

Thanks!
--Vahid


Re: [VOTE] KIP-454: Expansion of the ConnectClusterState interface

2019-05-02 Thread Magesh Nandakumar
Thanks a lot for the work on this KIP Chris.

+1(non-binding)

On Thu, May 2, 2019, 4:56 PM Chris Egerton  wrote:

> Hi all,
>
> I'd like to start a vote for KIP-454:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-454%3A+Expansion+of+the+ConnectClusterState+interface
>
> The discussion thread can be found at
> https://www.mail-archive.com/dev@kafka.apache.org/msg96911.html
>
> Thanks to Konstantine Karantasis and Magesh Nandakumar for their thoughtful
> feedback!
>
> Cheers,
>
> Chris
>


Jenkins build is back to normal : kafka-trunk-jdk8 #3596

2019-05-02 Thread Apache Jenkins Server
See 




Build failed in Jenkins: kafka-trunk-jdk11 #481

2019-05-02 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-8289: Fix Session Expiration and Suppression (#6654)

[github] KAFKA-7601; Clear leader epoch cache on downgraded format in append

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on H28 (ubuntu xenial) in workspace 

[WS-CLEANUP] Deleting project workspace...
[WS-CLEANUP] Deferred wipeout is used...
[WS-CLEANUP] Done
No credentials specified
Cloning the remote Git repository
Cloning repository https://github.com/apache/kafka.git
 > git init  # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # 
 > timeout=10
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/trunk^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/trunk^{commit} # timeout=10
Checking out Revision 3ba4686d4d650f0f9155b2e22dddb192a5a56a6c 
(refs/remotes/origin/trunk)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 3ba4686d4d650f0f9155b2e22dddb192a5a56a6c
Commit message: "KAFKA-7601; Clear leader epoch cache on downgraded format in 
append (#6568)"
 > git rev-list --no-walk a4f7675db1a928e73c7a69eb906dd1e9ecd4a22a # timeout=10
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
[kafka-trunk-jdk11] $ /bin/bash -xe /tmp/jenkins6617242328958590534.sh
+ rm -rf 
+ /home/jenkins/tools/gradle/4.10.2/bin/gradle
/tmp/jenkins6617242328958590534.sh: line 4: 
/home/jenkins/tools/gradle/4.10.2/bin/gradle: No such file or directory
Build step 'Execute shell' marked build as failure
Recording test results
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
ERROR: Step ?Publish JUnit test result report? failed: No test report files 
were found. Configuration error?
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
Not sending mail to unregistered user ism...@juma.me.uk
Not sending mail to unregistered user nore...@github.com
Not sending mail to unregistered user wangg...@gmail.com


Jenkins build is back to normal : kafka-2.2-jdk8 #99

2019-05-02 Thread Apache Jenkins Server
See 




[jira] [Resolved] (KAFKA-7601) Handle message format downgrades during upgrade of message format version

2019-05-02 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-7601.

   Resolution: Fixed
Fix Version/s: 2.2.1
   2.1.2

The patch fixes the main problem of ensuring the safety of leader election in 
the presence of unexpected regressions of the message format. It does not solve 
the problem of down-conversion with mixed message formats. 

> Handle message format downgrades during upgrade of message format version
> -
>
> Key: KAFKA-7601
> URL: https://issues.apache.org/jira/browse/KAFKA-7601
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Major
> Fix For: 2.1.2, 2.2.1
>
>
> During an upgrade of the message format, there is a short time during which 
> the configured message format version is not consistent across all replicas 
> of a partition. As long as all brokers are using the same binary version 
> (i.e. all have been updated to the latest code), this typically does not 
> cause any problems. Followers will take whatever message format is used by 
> the leader. However, it is possible for leadership to change several times 
> between brokers which support the new format and those which support the old 
> format. This can cause the version used in the log to flap between the 
> different formats until the upgrade is complete. 
> For example, suppose broker 1 has been updated to use v2 and broker 2 is 
> still on v1. When broker 1 is the leader, all new messages will be written in 
> the v2 format. When broker 2 is leader, v1 will be used. If there is any 
> instability in the cluster or if completion of the update is delayed, then 
> the log will be seen to switch back and forth between v1 and v2. Once the 
> update is completed and broker 1 begins using v2, then the message format 
> will stabilize and everything will generally be ok.
> Downgrades of the message format are problematic, even if they are just 
> temporary. There are basically two issues:
> 1. We use the configured message format version to tell whether 
> down-conversion is needed. We assume that the this is always the maximum 
> version used in the log, but that assumption fails in the case of a 
> downgrade. In the worst case, old clients will see the new format and likely 
> fail.
> 2. The logic we use for finding the truncation offset during the become 
> follower transition does not handle flapping between message formats. When 
> the new format is used by the leader, then the epoch cache will be updated 
> correctly. When the old format is in use, the epoch cache won't be updated. 
> This can lead to an incorrect result to OffsetsForLeaderEpoch queries.
> We have actually observed the second problem. The scenario went something 
> like this. Broker 1 is the leader of epoch 0 and writes some messages to the 
> log using the v2 message format. Broker 2 then becomes the leader for epoch 1 
> and writes some messages in the v2 format. On broker 2, the last entry in the 
> epoch cache is epoch 0. No entry is written for epoch 1 because it uses the 
> old format. When broker 1 became a follower, it send an OffsetsForLeaderEpoch 
> query to broker 2 for epoch 0. Since epoch 0 was the last entry in the cache, 
> the log end offset was returned. This resulted in localized log divergence.
> There are a few options to fix this problem. From a high level, we can either 
> be stricter about preventing downgrades of the message format, or we can add 
> additional logic to make downgrades safe. 
> (Disallow downgrades): As an example of the first approach, the leader could 
> always use the maximum of the last version written to the log and the 
> configured message format version. 
> (Allow downgrades): If we want to allow downgrades, then it make makes sense 
> to invalidate and remove all entries in the epoch cache following the message 
> format downgrade. This would basically force us to revert to truncation to 
> the high watermark, which is what you'd expect from a downgrade.  We would 
> also need a solution for the problem of detecting when down-conversion is 
> needed for a fetch request. One option I've been thinking about is enforcing 
> the invariant that each segment uses only one message format version. 
> Whenever the message format changes, we need to roll a new segment. Then we 
> can simply remember which format is in use by each segment to tell whether 
> down-conversion is needed for a given fetch request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-05-02 Thread Paul Whalen
Ivan, I’ll definitely forfeit my point on the clumsiness of the 
branch(predicate, consumer) solution, I don’t see any real drawbacks for the 
dynamic case. 

IMO the one trade off to consider at this point is the scope question. I don’t 
know if I totally agree that “we rarely need them in the same scope” since 
merging the branches back together later seems like a perfectly plausible use 
case that can be a lot nicer when the branched streams are in the same scope. 
That being said, for the reasons Ivan listed, I think it is overall the better 
solution - working around the scope thing is easy enough if you need to. 

> On May 2, 2019, at 7:00 PM, Ivan Ponomarev  wrote:
> 
> Hello everyone, thank you all for joining the discussion!
> 
> Well, I don't think the idea of named branches, be it a LinkedHashMap (no 
> other Map will do, because order of definition matters) or `branch` method  
> taking name and Consumer has more advantages than drawbacks.
> 
> In my opinion, the only real positive outcome from Michael's proposal is that 
> all the returned branches are in the same scope. But 1) we rarely need them 
> in the same scope 2) there is a workaround for the scope problem, described 
> in the KIP.
> 
> 'Inlining the complex logic' is not a problem, because we can use method 
> references instead of lambdas. In real world scenarios you tend to split the 
> complex logic to methods anyway, so the code is going to be clean.
> 
> The drawbacks are strong. The cohesion between predicates and handlers is 
> lost. We have to define predicates in one place, and handlers in another. 
> This opens the door for bugs:
> 
> - what if we forget to define a handler for a name? or a name for a handler?
> - what if we misspell a name?
> - what if we copy-paste and duplicate a name?
> 
> What Michael propose would have been totally OK if we had been writing the 
> API in Lua, Ruby or Python. In those languages the "dynamic naming" approach 
> would have looked most concise and beautiful. But in Java we expect all the 
> problems related to identifiers to be eliminated in compile time.
> 
> Do we have to invent duck-typing for the Java API?
> 
> And if we do, what advantage are we supposed to get besides having all the 
> branches in the same scope? Michael, maybe I'm missing your point?
> 
> ---
> 
> Earlier in this discussion John Roesler also proposed to do without "start 
> branching" operator, and later Paul mentioned that in the case when we have 
> to add a dynamic number of branches, the current KIP is 'clumsier' compared 
> to Michael's 'Map' solution. Let me address both comments here.
> 
> 1) "Start branching" operator (I think that *split* is a good name for it 
> indeed) is critical when we need to do a dynamic branching, see example below.
> 
> 2) No, dynamic branching in current KIP is not clumsy at all. Imagine a 
> real-world scenario when you need one branch per enum value (say, 
> RecordType). You can have something like this:
> 
> /*John:if we had to start with stream.branch(...) here, it would have been 
> much messier.*/
> KBranchedStream branched = stream.split();
> 
> /*Not clumsy at all :-)*/
> for (RecordType recordType : RecordType.values())
> branched = branched.branch((k, v) -> v.getRecType() == recordType,
> recordType::processRecords);
> 
> Regards,
> 
> Ivan
> 
> 
> 02.05.2019 14:40, Matthias J. Sax пишет:
>> I also agree with Michael's observation about the core problem of
>> current `branch()` implementation.
>> 
>> However, I also don't like to pass in a clumsy Map object. My thinking
>> was more aligned with Paul's proposal to just add a name to each
>> `branch()` statement and return a `Map`.
>> 
>> It makes the code easier to read, and also make the order of
>> `Predicates` (that is essential) easier to grasp.
>> 
>> Map> branches = stream.split()
>>.branch("branchOne", Predicate)
>>.branch( "branchTwo", Predicate)
>>.defaultBranch("defaultBranch");
>> An open question is the case for which no defaultBranch() should be
>> specified. Atm, `split()` and `branch()` would return `BranchedKStream`
>> and the call to `defaultBranch()` that returns the `Map` is mandatory
>> (what is not the case atm). Or is this actually not a real problem,
>> because users can just ignore the branch returned by `defaultBranch()`
>> in the result `Map` ?
>> 
>> 
>> About "inlining": So far, it seems to be a matter of personal
>> preference. I can see arguments for both, but no "killer argument" yet
>> that clearly make the case for one or the other.
>> 
>> 
>> -Matthias
>> 
>>> On 5/1/19 6:26 PM, Paul Whalen wrote:
>>> Perhaps inlining is the wrong terminology. It doesn’t require that a lambda 
>>> with the full downstream topology be defined inline - it can be a method 
>>> reference as with Ivan’s original suggestion.  The advantage of putting the 
>>> predicate and its downstream logic (Consumer) together in branch() is that 
>>> they are 

Re: [VOTE] KIP-411: Make default Kafka Connect worker task client IDs distinct

2019-05-02 Thread Arjun Satish
Paul,

You might want to make a note on the KIP regarding the impact on quotas.

Thanks,

On Thu, May 2, 2019 at 9:48 AM Paul Davidson
 wrote:

> Thanks for the votes everyone! KIP-411 is now accepted with:
>
> +3 binding votes (Randall, Jason, Gwen) , and
> +3 non-binding votes (Ryanne, Arjun, Magesh)
>
> Regards,
>
> Paul
>
> On Wed, May 1, 2019 at 10:07 PM Arjun Satish 
> wrote:
>
> > Good point, Gwen. We always set a non empty value for client id:
> >
> >
> https://github.com/apache/kafka/blob/2.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L668
> > .
> >
> > But more importantly, connect client ids (for consumers, for example)
> were
> > already of the form "consumer-[0-9]+", and from now on they will be
> > "connector-consumer-[connector_name]-[0-9]+". So, at least for connect
> > consumers/producers, we would have already been hitting the default quota
> > limits and nothing changes for them. You can correct me if I'm missing
> > something, but seems like this doesn't *break* backward compatibility?
> >
> > I suppose this change only gives us a better way to manage that quota
> > limit.
> >
> > Best,
> >
> > On Wed, May 1, 2019 at 9:16 PM Gwen Shapira  wrote:
> >
> > > I'm confused. Surely the default quota applies on empty client IDs too?
> > > otherwise it will be very difficult to enforce?
> > > So setting the client name will only change something if there's
> already
> > a
> > > quota for that client?
> > >
> > > On the other hand, I fully support switching to "easy-to-wildcard"
> > template
> > > for the client id.
> > >
> > > On Wed, May 1, 2019 at 8:50 PM Arjun Satish 
> > > wrote:
> > >
> > > > I just realized that setting the client.id on the will now trigger
> any
> > > > quota restrictions (
> > > > https://kafka.apache.org/documentation/#design_quotasconfig) on the
> > > > broker.
> > > > It seems like this PR will enforce quota policies that will either
> > > require
> > > > admins to set limits for each task (since the chosen format is
> > > > connector-*-id), or fallback to some default value.
> > > >
> > > > Maybe we should mention this in the backward compatibility section
> for
> > > the
> > > > KIP. At the same time, since there is no way atm to turn off this
> > > feature,
> > > > should this feature be merged and released in the upcoming v2.3? This
> > is
> > > > something the committers can comment better.
> > > >
> > > > Best,
> > > >
> > > >
> > > > On Wed, May 1, 2019 at 5:13 PM Gwen Shapira 
> wrote:
> > > >
> > > > > hell yeah!
> > > > > +1
> > > > >
> > > > >
> > > > > On Fri, Apr 5, 2019 at 9:08 AM Paul Davidson
> > > > >  wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > Since we seem to have agreement in the discussion I would like to
> > > start
> > > > > the
> > > > > > vote on KIP-411.
> > > > > >
> > > > > > See:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-411%3A+Make+default+Kafka+Connect+worker+task+client+IDs+distinct
> > > > > >
> > > > > > Also see the related PR:
> https://github.com/apache/kafka/pull/6097
> > > > > >
> > > > > > Thanks to everyone who contributed!
> > > > > >
> > > > > > Paul
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > *Gwen Shapira*
> > > > > Product Manager | Confluent
> > > > > 650.450.2760 | @gwenshap
> > > > > Follow us: Twitter  | blog
> > > > > 
> > > > >
> > > >
> > >
> > >
> > > --
> > > *Gwen Shapira*
> > > Product Manager | Confluent
> > > 650.450.2760 | @gwenshap
> > > Follow us: Twitter  | blog
> > > 
> > >
> >
>


Build failed in Jenkins: kafka-trunk-jdk11 #480

2019-05-02 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-8285: enable localized thread IDs in Kafka Streams (#6632)

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on H30 (ubuntu xenial) in workspace 

[WS-CLEANUP] Deleting project workspace...
[WS-CLEANUP] Deferred wipeout is used...
[WS-CLEANUP] Done
No credentials specified
Cloning the remote Git repository
Cloning repository https://github.com/apache/kafka.git
 > git init  # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # 
 > timeout=10
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/trunk^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/trunk^{commit} # timeout=10
Checking out Revision a4f7675db1a928e73c7a69eb906dd1e9ecd4a22a 
(refs/remotes/origin/trunk)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f a4f7675db1a928e73c7a69eb906dd1e9ecd4a22a
Commit message: "KAFKA-8285: enable localized thread IDs in Kafka Streams 
(#6632)"
 > git rev-list --no-walk b074173ea249ef028272c2c07358222550917d8c # timeout=10
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
[kafka-trunk-jdk11] $ /bin/bash -xe /tmp/jenkins2541469893481688070.sh
+ rm -rf 
+ /home/jenkins/tools/gradle/4.10.2/bin/gradle
/tmp/jenkins2541469893481688070.sh: line 4: 
/home/jenkins/tools/gradle/4.10.2/bin/gradle: No such file or directory
Build step 'Execute shell' marked build as failure
Recording test results
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
ERROR: Step ?Publish JUnit test result report? failed: No test report files 
were found. Configuration error?
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
Not sending mail to unregistered user ism...@juma.me.uk
Not sending mail to unregistered user nore...@github.com
Not sending mail to unregistered user wangg...@gmail.com


[jira] [Resolved] (KAFKA-8285) Handle thread-id random switch on JVM for KStream

2019-05-02 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8285.
--
   Resolution: Fixed
Fix Version/s: 2.3.0

> Handle thread-id random switch on JVM for KStream
> -
>
> Key: KAFKA-8285
> URL: https://issues.apache.org/jira/browse/KAFKA-8285
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently we are potentially at risk by being bite for interleaving stream 
> thread ids. It is because we share the same init config number when two 
> stream instances happen to be scheduled under one JVM. This would be bad 
> scenario because we could have different thread-ids throughout restarts, 
> which invalidates static membership.
> For example for once our thread id assigned were 1,2,3,4 for instance A and 
> 5, 6, 7, 8 for instance B. On the restart of both instances, the same atomic 
> update could be applied as 1,3,5,7 for A and 2,4,6,8 for B, which changes 
> their group.instance.ids.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-05-02 Thread Ivan Ponomarev

Hello everyone, thank you all for joining the discussion!

Well, I don't think the idea of named branches, be it a LinkedHashMap 
(no other Map will do, because order of definition matters) or `branch` 
method  taking name and Consumer has more advantages than drawbacks.


In my opinion, the only real positive outcome from Michael's proposal is 
that all the returned branches are in the same scope. But 1) we rarely 
need them in the same scope 2) there is a workaround for the scope 
problem, described in the KIP.


'Inlining the complex logic' is not a problem, because we can use method 
references instead of lambdas. In real world scenarios you tend to split 
the complex logic to methods anyway, so the code is going to be clean.


The drawbacks are strong. The cohesion between predicates and handlers 
is lost. We have to define predicates in one place, and handlers in 
another. This opens the door for bugs:


- what if we forget to define a handler for a name? or a name for a handler?
- what if we misspell a name?
- what if we copy-paste and duplicate a name?

What Michael propose would have been totally OK if we had been writing 
the API in Lua, Ruby or Python. In those languages the "dynamic naming" 
approach would have looked most concise and beautiful. But in Java we 
expect all the problems related to identifiers to be eliminated in 
compile time.


Do we have to invent duck-typing for the Java API?

And if we do, what advantage are we supposed to get besides having all 
the branches in the same scope? Michael, maybe I'm missing your point?


---

Earlier in this discussion John Roesler also proposed to do without 
"start branching" operator, and later Paul mentioned that in the case 
when we have to add a dynamic number of branches, the current KIP is 
'clumsier' compared to Michael's 'Map' solution. Let me address both 
comments here.


1) "Start branching" operator (I think that *split* is a good name for 
it indeed) is critical when we need to do a dynamic branching, see 
example below.


2) No, dynamic branching in current KIP is not clumsy at all. Imagine a 
real-world scenario when you need one branch per enum value (say, 
RecordType). You can have something like this:


/*John:if we had to start with stream.branch(...) here, it would have 
been much messier.*/

KBranchedStream branched = stream.split();

/*Not clumsy at all :-)*/
for (RecordType recordType : RecordType.values())
    branched = branched.branch((k, v) -> v.getRecType() == 
recordType,

    recordType::processRecords);

Regards,

Ivan


02.05.2019 14:40, Matthias J. Sax пишет:

I also agree with Michael's observation about the core problem of
current `branch()` implementation.

However, I also don't like to pass in a clumsy Map object. My thinking
was more aligned with Paul's proposal to just add a name to each
`branch()` statement and return a `Map`.

It makes the code easier to read, and also make the order of
`Predicates` (that is essential) easier to grasp.


Map> branches = stream.split()
.branch("branchOne", Predicate)
.branch( "branchTwo", Predicate)
.defaultBranch("defaultBranch");

An open question is the case for which no defaultBranch() should be
specified. Atm, `split()` and `branch()` would return `BranchedKStream`
and the call to `defaultBranch()` that returns the `Map` is mandatory
(what is not the case atm). Or is this actually not a real problem,
because users can just ignore the branch returned by `defaultBranch()`
in the result `Map` ?


About "inlining": So far, it seems to be a matter of personal
preference. I can see arguments for both, but no "killer argument" yet
that clearly make the case for one or the other.


-Matthias

On 5/1/19 6:26 PM, Paul Whalen wrote:

Perhaps inlining is the wrong terminology. It doesn’t require that a lambda 
with the full downstream topology be defined inline - it can be a method 
reference as with Ivan’s original suggestion.  The advantage of putting the 
predicate and its downstream logic (Consumer) together in branch() is that they 
are required to be near to each other.

Ultimately the downstream code has to live somewhere, and deep branch trees 
will be hard to read regardless.


On May 1, 2019, at 1:07 PM, Michael Drogalis  
wrote:

I'm less enthusiastic about inlining the branch logic with its downstream
functionality. Programs that have deep branch trees will quickly become
harder to read as a single unit.


On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen  wrote:

Also +1 on the issues/goals as Michael outlined them, I think that sets a
great framework for the discussion.

Regarding the SortedMap solution, my understanding is that the current
proposal in the KIP is what is in my PR which (pending naming decisions) is
roughly this:

stream.split()
.branch(Predicate, Consumer>)
.branch(Predicate, Consumer>)
.defaultBranch(Consumer>);

Obviously some ordering is necessary, since branching as a construct
doesn't work without 

[VOTE] KIP-454: Expansion of the ConnectClusterState interface

2019-05-02 Thread Chris Egerton
Hi all,

I'd like to start a vote for KIP-454:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-454%3A+Expansion+of+the+ConnectClusterState+interface

The discussion thread can be found at
https://www.mail-archive.com/dev@kafka.apache.org/msg96911.html

Thanks to Konstantine Karantasis and Magesh Nandakumar for their thoughtful
feedback!

Cheers,

Chris


RE: [DISCUSS] KIP-465: Add Consolidated Connector Endpoint to Connect REST API

2019-05-02 Thread Alex Liu
Good idea, Dan. One thing I might suggest is to have the query parameters
reflect the fact that there are multiple resources under each connector.
There is `connectors//`, `connectors//config`, and
`connectors//status`.
Each of them returns a slightly different set of information, so it would
be useful to allow the query parameter be a string instead of a true/false
flag. In this case, `expand=status,config` would specify expanding both the
/status and /config subresources into the response objects.

Other than this detail, I think this is a useful addition to the Connect
REST API.

Alex


Re: [DISCUSS] KIP-424: Allow suppression of intermediate events based on wall clock time

2019-05-02 Thread jonathangordon
I'd like to move this KIP along. To summarize where we've landed so far:

- Should suppress() flush all records (would be _before_ the time elapsed)?
- Or should it preserve buffered records and reload on restart? For
this case, should the record be flushed on reload (elapsed time is
unknown) or should we reset the timer to zero?

John Roesler suggested we preserve the buffered records since suppression is 
backed with a durable store via the changelog. Works for me.

- How strict the guarantee when records should be flushed should be.

John Roesler suggested to use the system time we already checked at the start 
of the processing loop. This should incur no extra performance hits with extra 
calls to System.currentTimeMillis(). Works for me.

I'm happy to add these details to the KIP if all agree. Also encourage other 
questions before putting this to a vote.

Thanks!


Build failed in Jenkins: kafka-trunk-jdk11 #479

2019-05-02 Thread Apache Jenkins Server
See 


Changes:

[harsha] KAFKA-8191: Add pluggability of KeyManager to generate the broker

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on H32 (ubuntu xenial) in workspace 

[WS-CLEANUP] Deleting project workspace...
[WS-CLEANUP] Deferred wipeout is used...
[WS-CLEANUP] Done
No credentials specified
Cloning the remote Git repository
Cloning repository https://github.com/apache/kafka.git
 > git init  # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # 
 > timeout=10
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/trunk^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/trunk^{commit} # timeout=10
Checking out Revision b074173ea249ef028272c2c07358222550917d8c 
(refs/remotes/origin/trunk)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f b074173ea249ef028272c2c07358222550917d8c
Commit message: "KAFKA-8191: Add pluggability of KeyManager to generate the 
broker Private Keys and Certificates"
 > git rev-list --no-walk c34330c5481803c91705cbfcc499021360ff7fdc # timeout=10
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
[kafka-trunk-jdk11] $ /bin/bash -xe /tmp/jenkins5785431307927434504.sh
+ rm -rf 
+ /home/jenkins/tools/gradle/4.10.2/bin/gradle
/tmp/jenkins5785431307927434504.sh: line 4: 
/home/jenkins/tools/gradle/4.10.2/bin/gradle: No such file or directory
Build step 'Execute shell' marked build as failure
Recording test results
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
ERROR: Step ?Publish JUnit test result report? failed: No test report files 
were found. Configuration error?
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
Not sending mail to unregistered user ism...@juma.me.uk
Not sending mail to unregistered user nore...@github.com


Re: [VOTE] KIP-460: Admin Leader Election RPC

2019-05-02 Thread Colin McCabe
+1 (binding)

thanks, Jose.

best,
Colin

On Wed, May 1, 2019, at 14:44, Jose Armando Garcia Sancio wrote:
> Hi all,
> 
> I would like to start the voting for KIP-460:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-460%3A+Admin+Leader+Election+RPC
> 
> The thread discussion is here:
> https://www.mail-archive.com/dev@kafka.apache.org/msg97226.html
> 
> Thanks!
> -Jose
>


Build failed in Jenkins: kafka-trunk-jdk8 #3595

2019-05-02 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-8248; Ensure time updated before sending transactional request

--
[...truncated 2.40 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnIsOpen 
STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnIsOpen 
PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldDeleteAndReturnPlainValue STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldDeleteAndReturnPlainValue PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnName 
STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnName 
PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldPutWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldPutWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldPutAllWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldPutAllWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldReturnIsPersistent STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldReturnIsPersistent PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldPutIfAbsentWithUnknownTimestamp STARTED


Build failed in Jenkins: kafka-trunk-jdk11 #478

2019-05-02 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-8248; Ensure time updated before sending transactional request

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on H28 (ubuntu xenial) in workspace 

[WS-CLEANUP] Deleting project workspace...
[WS-CLEANUP] Deferred wipeout is used...
[WS-CLEANUP] Done
No credentials specified
Cloning the remote Git repository
Cloning repository https://github.com/apache/kafka.git
 > git init  # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # 
 > timeout=10
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/trunk^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/trunk^{commit} # timeout=10
Checking out Revision c34330c5481803c91705cbfcc499021360ff7fdc 
(refs/remotes/origin/trunk)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f c34330c5481803c91705cbfcc499021360ff7fdc
Commit message: "KAFKA-8248; Ensure time updated before sending transactional 
request (#6613)"
 > git rev-list --no-walk 093a22536fb9e970e12d8b186ea8ae18ad8bcca1 # timeout=10
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
[kafka-trunk-jdk11] $ /bin/bash -xe /tmp/jenkins6811076698778988349.sh
+ rm -rf 
+ /home/jenkins/tools/gradle/4.10.2/bin/gradle
/tmp/jenkins6811076698778988349.sh: line 4: 
/home/jenkins/tools/gradle/4.10.2/bin/gradle: No such file or directory
Build step 'Execute shell' marked build as failure
Recording test results
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
ERROR: Step ?Publish JUnit test result report? failed: No test report files 
were found. Configuration error?
Setting GRADLE_4_10_2_HOME=/home/jenkins/tools/gradle/4.10.2
Not sending mail to unregistered user ism...@juma.me.uk
Not sending mail to unregistered user nore...@github.com


[jira] [Created] (KAFKA-8317) ClassCastException using KTable.suppress()

2019-05-02 Thread Andrew (JIRA)
Andrew created KAFKA-8317:
-

 Summary: ClassCastException using KTable.suppress()
 Key: KAFKA-8317
 URL: https://issues.apache.org/jira/browse/KAFKA-8317
 Project: Kafka
  Issue Type: Bug
Reporter: Andrew


I am trying to use `KTable.suppress()` and I am getting the following error :

{Code}
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot 
be cast to java.lang.String
    at 
org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
    at 
org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:95)
    at 
org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:87)
    at 
org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:40)
    at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
{Code}

My code is as follows :

{Code}
    final KTable, GenericRecord> groupTable = groupedStream
    .aggregate(lastAggregator, lastAggregator, materialized);

    final KTable, GenericRecord> suppressedTable = 
groupTable.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));

    // write the change-log stream to the topic
    suppressedTable.toStream((k, v) -> k.key())
    .mapValues(joinValueMapper::apply)
    .to(props.joinTopic());
{Code}


The code without using `suppressedTable` works... what am i doing wrong.


Someone else has encountered the same issue :

https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380


Slack conversation : 
https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556633088239800



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-411: Make default Kafka Connect worker task client IDs distinct

2019-05-02 Thread Paul Davidson
Thanks for the votes everyone! KIP-411 is now accepted with:

+3 binding votes (Randall, Jason, Gwen) , and
+3 non-binding votes (Ryanne, Arjun, Magesh)

Regards,

Paul

On Wed, May 1, 2019 at 10:07 PM Arjun Satish  wrote:

> Good point, Gwen. We always set a non empty value for client id:
>
> https://github.com/apache/kafka/blob/2.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L668
> .
>
> But more importantly, connect client ids (for consumers, for example) were
> already of the form "consumer-[0-9]+", and from now on they will be
> "connector-consumer-[connector_name]-[0-9]+". So, at least for connect
> consumers/producers, we would have already been hitting the default quota
> limits and nothing changes for them. You can correct me if I'm missing
> something, but seems like this doesn't *break* backward compatibility?
>
> I suppose this change only gives us a better way to manage that quota
> limit.
>
> Best,
>
> On Wed, May 1, 2019 at 9:16 PM Gwen Shapira  wrote:
>
> > I'm confused. Surely the default quota applies on empty client IDs too?
> > otherwise it will be very difficult to enforce?
> > So setting the client name will only change something if there's already
> a
> > quota for that client?
> >
> > On the other hand, I fully support switching to "easy-to-wildcard"
> template
> > for the client id.
> >
> > On Wed, May 1, 2019 at 8:50 PM Arjun Satish 
> > wrote:
> >
> > > I just realized that setting the client.id on the will now trigger any
> > > quota restrictions (
> > > https://kafka.apache.org/documentation/#design_quotasconfig) on the
> > > broker.
> > > It seems like this PR will enforce quota policies that will either
> > require
> > > admins to set limits for each task (since the chosen format is
> > > connector-*-id), or fallback to some default value.
> > >
> > > Maybe we should mention this in the backward compatibility section for
> > the
> > > KIP. At the same time, since there is no way atm to turn off this
> > feature,
> > > should this feature be merged and released in the upcoming v2.3? This
> is
> > > something the committers can comment better.
> > >
> > > Best,
> > >
> > >
> > > On Wed, May 1, 2019 at 5:13 PM Gwen Shapira  wrote:
> > >
> > > > hell yeah!
> > > > +1
> > > >
> > > >
> > > > On Fri, Apr 5, 2019 at 9:08 AM Paul Davidson
> > > >  wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > Since we seem to have agreement in the discussion I would like to
> > start
> > > > the
> > > > > vote on KIP-411.
> > > > >
> > > > > See:
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-411%3A+Make+default+Kafka+Connect+worker+task+client+IDs+distinct
> > > > >
> > > > > Also see the related PR: https://github.com/apache/kafka/pull/6097
> > > > >
> > > > > Thanks to everyone who contributed!
> > > > >
> > > > > Paul
> > > > >
> > > >
> > > >
> > > > --
> > > > *Gwen Shapira*
> > > > Product Manager | Confluent
> > > > 650.450.2760 | @gwenshap
> > > > Follow us: Twitter  | blog
> > > > 
> > > >
> > >
> >
> >
> > --
> > *Gwen Shapira*
> > Product Manager | Confluent
> > 650.450.2760 | @gwenshap
> > Follow us: Twitter  | blog
> > 
> >
>


Re: [DISCUSS] KIP-464 Default Replication Factor for AdminClient#createTopic

2019-05-02 Thread Almog Gavra
Thanks for the input Randall. I'm happy adding it natively to NewTopic
instead of introducing more verbosity - updating the KIP to reflect this
now.

On Thu, May 2, 2019 at 9:28 AM Randall Hauch  wrote:

> I wrote the `NewTopicBuilder` in Connect, and it was simply a convenience
> to more easily set some of the frequently-used properties and the # of
> partitions and replicas for the new topic in the same way. An example is:
>
> NewTopic topicDescription = TopicAdmin.defineTopic(topic).
> compacted().
> partitions(1).
> replicationFactor(3).
> build();
>
> Arguably it should have been added to clients from the beginning. So I'm
> fine with that being moved to clients, as long as Connect is changed to use
> the new clients class. However, even though Connect's `NewTopicBuilder` is
> in the runtime and technically not part of the public API, things like this
> still tend to get reused elsewhere. Let's keep the Connect
> `NewTopicBuilder` but deprecate it and have it extend the one in clients.
> The `TopicAdmin` class in Connect can then refer to the new one in clients.
>
> The KIP now talks about having a constructor for the builder:
>
> NewTopic myTopic = new
>
> NewTopicBuilder(name).compacted().partitions(1).replicationFactor(3).build();
>
> How about adding the builder to the NewTopic class itself:
>
> NewTopic myTopic =
>
> NewTopic.build(name).compacted().partitions(1).replicationFactor(3).build();
>
> This is a bit shorter, a bit easier to read (no "new New..."), and more
> discoverable since anyone looking at the NewTopic source or JavaDoc will
> maybe notice it.
>
> Randall
>
>
> On Thu, May 2, 2019 at 8:56 AM Almog Gavra  wrote:
>
> > Sure thing, added more detail to the KIP! To clarify, the plan is to move
> > an existing API from one package to another (NewTopicBuilder exists in
> the
> > connect.runtime package) leaving the old in place for compatibility and
> > deprecating it.
> >
> > I'm happy to hear thoughts on whether we should (a) move it to the same
> > package in a new module so that we don't need to deprecate it or (b) take
> > this opportunity to change any of the APIs.
> >
> > On Thu, May 2, 2019 at 8:22 AM Ismael Juma  wrote:
> >
> > > If you are adding new API, you need to specify it all in the KIP.
> > >
> > > Ismael
> > >
> > > On Thu, May 2, 2019, 8:04 AM Almog Gavra  wrote:
> > >
> > > > I think that sounds reasonable - I updated the KIP and I will remove
> > the
> > > > constructor that takes in only partitions.
> > > >
> > > > On Thu, May 2, 2019 at 4:44 AM Andy Coates 
> wrote:
> > > >
> > > > > Rather than adding overloaded constructors, which can lead to API
> > > bloat,
> > > > > how about using a builder pattern?
> > > > >
> > > > > I see it’s already got some constructor overloading, but we could
> > add a
> > > > > single new constructor that takes just the name, and support
> > everything
> > > > > else being set via builder methods.
> > > > >
> > > > > This would result in a better long term api as the number of
> options
> > > > > increases.
> > > > >
> > > > > Sent from my iPhone
> > > > >
> > > > > > On 30 Apr 2019, at 16:28, Almog Gavra 
> wrote:
> > > > > >
> > > > > > Hello Everyone,
> > > > > >
> > > > > > I'd like to start a discussion on KIP-464:
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Default+Replication+Factor+for+AdminClient%23createTopic
> > > > > >
> > > > > > It's about allowing users of the AdminClient to supply only a
> > > partition
> > > > > > count and to use the default replication factor configured by the
> > > kafka
> > > > > > cluster. Happy to receive any and all feedback!
> > > > > >
> > > > > > Cheers,
> > > > > > Almog
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-455: Create an Administrative API for Replica Reassignment

2019-05-02 Thread Viktor Somogyi-Vass
Hey Colin & George,

Thinking on George's points I was wondering if it's feasible to submit a
big reassignment to the controller and thus Zookeeper as frequent writes
are slow as the quorum has to synchronize. Perhaps it should be the
responsibility of KIP-435  but
I'd like to note it here as we're changing the current znode layout in this
KIP.
I think ideally we should add these writes in batches to zookeeper and
otherwise store it in a replicated internal topic
(__partition_reassignments). That would solve the scalability problem as
the failover controller would be able to read it up very quickly and also
we would spread the writes in Zookeeper over time. Just the current,
actively replicated partitions should be present under
/brokers/topics/[topic]/partitions/[partitionId]/state, so those partitions
will know if they have to do reassignment (even in case of a broker
bounce). The controller on the other hand could regain its state by reading
up the last produced message from this __partition_reassignments topic and
reading up the Zookeeper state to figure out which batch its currently
doing (supposing it goes sequentially in the given reassignment).
I'll think a little bit more about this to fill out any gaps there are and
perhaps add it to my KIP. That being said probably we'll need to make some
benchmarking first if this bulk read-write causes a problem at all to avoid
premature optimisation. I generally don't really worry about reading up
this new information as the controller would read up the assignment anyway
in initializeControllerContext().

A question on SubmitPartitionReassignmentsRequest and its connection with
KIP-435 . Would
the list of topic-partitions have the same ordering on the client side as
well as the broker side? I think it would be an advantage as the user would
know in which order the reassignment would be performed. I think it's
useful when it comes to incrementalization as they'd be able to figure out
what replicas will be in one batch (given they know about the batch size).

Viktor

On Wed, May 1, 2019 at 8:33 AM George Li 
wrote:

>  Hi Colin,
>
> Thanks for KIP-455!  yes. KIP-236, etc. will depend on it.  It is the good
> direction to go for the RPC.
>
> Regarding storing the new reassignments & original replicas at the
> topic/partition level.  I have some concerns when controller is failing
> over, and the scalability of scanning the active reassignments from ZK
> topic/partition level nodes. Please see my reply to Jason in the KIP-236
> thread.
>
> Once the decision is made where new reassignment and original replicas is
> stored, I will modify KIP-236 accordingly for how to cancel/rollback the
> reassignments.
>
> Thanks,
> George
>
>
> On Monday, April 15, 2019, 6:07:44 PM PDT, Colin McCabe <
> cmcc...@apache.org> wrote:
>
>  Hi all,
>
> We've been having discussions on a few different KIPs (KIP-236, KIP-435,
> etc.) about what the Admin Client replica reassignment API should look
> like.  The current API is really hard to extend and maintain, which is a
> big source of problems.  I think it makes sense to have a KIP that
> establishes a clean API that we can use and extend going forward, so I
> posted KIP-455.  Take a look.  :)
>
> best,
> Colin
>


Re: [DISCUSS] KIP-464 Default Replication Factor for AdminClient#createTopic

2019-05-02 Thread Randall Hauch
I wrote the `NewTopicBuilder` in Connect, and it was simply a convenience
to more easily set some of the frequently-used properties and the # of
partitions and replicas for the new topic in the same way. An example is:

NewTopic topicDescription = TopicAdmin.defineTopic(topic).
compacted().
partitions(1).
replicationFactor(3).
build();

Arguably it should have been added to clients from the beginning. So I'm
fine with that being moved to clients, as long as Connect is changed to use
the new clients class. However, even though Connect's `NewTopicBuilder` is
in the runtime and technically not part of the public API, things like this
still tend to get reused elsewhere. Let's keep the Connect
`NewTopicBuilder` but deprecate it and have it extend the one in clients.
The `TopicAdmin` class in Connect can then refer to the new one in clients.

The KIP now talks about having a constructor for the builder:

NewTopic myTopic = new
NewTopicBuilder(name).compacted().partitions(1).replicationFactor(3).build();

How about adding the builder to the NewTopic class itself:

NewTopic myTopic =
NewTopic.build(name).compacted().partitions(1).replicationFactor(3).build();

This is a bit shorter, a bit easier to read (no "new New..."), and more
discoverable since anyone looking at the NewTopic source or JavaDoc will
maybe notice it.

Randall


On Thu, May 2, 2019 at 8:56 AM Almog Gavra  wrote:

> Sure thing, added more detail to the KIP! To clarify, the plan is to move
> an existing API from one package to another (NewTopicBuilder exists in the
> connect.runtime package) leaving the old in place for compatibility and
> deprecating it.
>
> I'm happy to hear thoughts on whether we should (a) move it to the same
> package in a new module so that we don't need to deprecate it or (b) take
> this opportunity to change any of the APIs.
>
> On Thu, May 2, 2019 at 8:22 AM Ismael Juma  wrote:
>
> > If you are adding new API, you need to specify it all in the KIP.
> >
> > Ismael
> >
> > On Thu, May 2, 2019, 8:04 AM Almog Gavra  wrote:
> >
> > > I think that sounds reasonable - I updated the KIP and I will remove
> the
> > > constructor that takes in only partitions.
> > >
> > > On Thu, May 2, 2019 at 4:44 AM Andy Coates  wrote:
> > >
> > > > Rather than adding overloaded constructors, which can lead to API
> > bloat,
> > > > how about using a builder pattern?
> > > >
> > > > I see it’s already got some constructor overloading, but we could
> add a
> > > > single new constructor that takes just the name, and support
> everything
> > > > else being set via builder methods.
> > > >
> > > > This would result in a better long term api as the number of options
> > > > increases.
> > > >
> > > > Sent from my iPhone
> > > >
> > > > > On 30 Apr 2019, at 16:28, Almog Gavra  wrote:
> > > > >
> > > > > Hello Everyone,
> > > > >
> > > > > I'd like to start a discussion on KIP-464:
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Default+Replication+Factor+for+AdminClient%23createTopic
> > > > >
> > > > > It's about allowing users of the AdminClient to supply only a
> > partition
> > > > > count and to use the default replication factor configured by the
> > kafka
> > > > > cluster. Happy to receive any and all feedback!
> > > > >
> > > > > Cheers,
> > > > > Almog
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-464 Default Replication Factor for AdminClient#createTopic

2019-05-02 Thread Almog Gavra
Sure thing, added more detail to the KIP! To clarify, the plan is to move
an existing API from one package to another (NewTopicBuilder exists in the
connect.runtime package) leaving the old in place for compatibility and
deprecating it.

I'm happy to hear thoughts on whether we should (a) move it to the same
package in a new module so that we don't need to deprecate it or (b) take
this opportunity to change any of the APIs.

On Thu, May 2, 2019 at 8:22 AM Ismael Juma  wrote:

> If you are adding new API, you need to specify it all in the KIP.
>
> Ismael
>
> On Thu, May 2, 2019, 8:04 AM Almog Gavra  wrote:
>
> > I think that sounds reasonable - I updated the KIP and I will remove the
> > constructor that takes in only partitions.
> >
> > On Thu, May 2, 2019 at 4:44 AM Andy Coates  wrote:
> >
> > > Rather than adding overloaded constructors, which can lead to API
> bloat,
> > > how about using a builder pattern?
> > >
> > > I see it’s already got some constructor overloading, but we could add a
> > > single new constructor that takes just the name, and support everything
> > > else being set via builder methods.
> > >
> > > This would result in a better long term api as the number of options
> > > increases.
> > >
> > > Sent from my iPhone
> > >
> > > > On 30 Apr 2019, at 16:28, Almog Gavra  wrote:
> > > >
> > > > Hello Everyone,
> > > >
> > > > I'd like to start a discussion on KIP-464:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Default+Replication+Factor+for+AdminClient%23createTopic
> > > >
> > > > It's about allowing users of the AdminClient to supply only a
> partition
> > > > count and to use the default replication factor configured by the
> kafka
> > > > cluster. Happy to receive any and all feedback!
> > > >
> > > > Cheers,
> > > > Almog
> > >
> >
>


[jira] [Resolved] (KAFKA-4304) Extend Interactive Queries for return latest update timestamp per key

2019-05-02 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4304.

Resolution: Duplicate

> Extend Interactive Queries for return latest update timestamp per key
> -
>
> Key: KAFKA-4304
> URL: https://issues.apache.org/jira/browse/KAFKA-4304
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>  Labels: kip
>
> Currently, when querying state store, it is not clear when the key was 
> updated last. The ides of this JIRA is to make the latest update timestamp 
> for each key-value-pair of the state store accessible.
> For example, this might be useful to
>  * check if a value was update but did not changed (just compare the update 
> TS)
>  * if you want to consider only recently updated keys
> Covered via KIP-258: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-464 Default Replication Factor for AdminClient#createTopic

2019-05-02 Thread Ismael Juma
If you are adding new API, you need to specify it all in the KIP.

Ismael

On Thu, May 2, 2019, 8:04 AM Almog Gavra  wrote:

> I think that sounds reasonable - I updated the KIP and I will remove the
> constructor that takes in only partitions.
>
> On Thu, May 2, 2019 at 4:44 AM Andy Coates  wrote:
>
> > Rather than adding overloaded constructors, which can lead to API bloat,
> > how about using a builder pattern?
> >
> > I see it’s already got some constructor overloading, but we could add a
> > single new constructor that takes just the name, and support everything
> > else being set via builder methods.
> >
> > This would result in a better long term api as the number of options
> > increases.
> >
> > Sent from my iPhone
> >
> > > On 30 Apr 2019, at 16:28, Almog Gavra  wrote:
> > >
> > > Hello Everyone,
> > >
> > > I'd like to start a discussion on KIP-464:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Default+Replication+Factor+for+AdminClient%23createTopic
> > >
> > > It's about allowing users of the AdminClient to supply only a partition
> > > count and to use the default replication factor configured by the kafka
> > > cluster. Happy to receive any and all feedback!
> > >
> > > Cheers,
> > > Almog
> >
>


Re: [DISCUSS] KIP-464 Default Replication Factor for AdminClient#createTopic

2019-05-02 Thread Almog Gavra
I think that sounds reasonable - I updated the KIP and I will remove the
constructor that takes in only partitions.

On Thu, May 2, 2019 at 4:44 AM Andy Coates  wrote:

> Rather than adding overloaded constructors, which can lead to API bloat,
> how about using a builder pattern?
>
> I see it’s already got some constructor overloading, but we could add a
> single new constructor that takes just the name, and support everything
> else being set via builder methods.
>
> This would result in a better long term api as the number of options
> increases.
>
> Sent from my iPhone
>
> > On 30 Apr 2019, at 16:28, Almog Gavra  wrote:
> >
> > Hello Everyone,
> >
> > I'd like to start a discussion on KIP-464:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Default+Replication+Factor+for+AdminClient%23createTopic
> >
> > It's about allowing users of the AdminClient to supply only a partition
> > count and to use the default replication factor configured by the kafka
> > cluster. Happy to receive any and all feedback!
> >
> > Cheers,
> > Almog
>


[jira] [Created] (KAFKA-8316) Remove deprecated usage of Slf4jRequestLog, SslContextFactory

2019-05-02 Thread Ismael Juma (JIRA)
Ismael Juma created KAFKA-8316:
--

 Summary: Remove deprecated usage of Slf4jRequestLog, 
SslContextFactory
 Key: KAFKA-8316
 URL: https://issues.apache.org/jira/browse/KAFKA-8316
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma


Jetty recently deprecated a few classes we use. The following commit suppresses 
the deprecation warnings:

https://github.com/apache/kafka/commit/e66bc6255b2ee42481b54b7fd1d256b9e4ff5741

We should remove the suppressions and use the suggested alternatives.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-05-02 Thread Bruno Cadonna
Hi Paul,

I will try to express myself a bit clearer.

Ad 1)
My assumption is that if `StateStoreConnector#stateStores()` returns `null`
Kafka Streams will throw an NPE because on purpose no null check is
performed before the loop that calls `StreamsBuilder#addStateStore()`. When
the user finally understands the cause of the NPE, she knows that she has
to override `StateStoreConnector#stateStores()` in her implementation. My
question was, why let the user discover that she has to overwrite the
method at runtime if you could not provide a default implementation for
`StateStoreConnector#stateStores()` and let the compiler tell the user the
need to overwrite the method. Not providing a default implementation
without separating the interfaces implies not being backward-compatible.
That means, if we choose to not provide a default implementation and let
the compiler signal the necessity to override the method, we have to
separate the interfaces in any case.

Ad 2)
If you check for `null` or empty list in `process` and do not call
`addStateStores` in those cases, the advantage of returning `null` to be
saver to detect bugs as mentioned by Matthias would be lost. But maybe I am
missing something here.

Best,
Bruno



On Wed, May 1, 2019 at 6:27 AM Paul Whalen  wrote:

> I definitely don't mind anyone jumping, Bruno, thanks for the comments!
>
> 1) I'm not totally sure I'm clear on your point, but I think we're on the
> same page - if we're adding a method to the XSupplier interfaces (by making
> them inherit from a super interface StateStoreConnector) then we definitely
> need a default implementation to maintain compatibility.  Whether the
> default implementation returns null or an empty list is somewhat of a
> detail.
>
> 2) If stream.process() sees that StateStoreConnector#stateStores() returns
> either null or an empty list, it would handle that case specifically and
> not try to call addStateStore at all.  Or is this not what you're asking?
>
> Separately, I'm still hacking away at the details of the PR and will
> continue to get something into a discussable state, but I'll share some
> thoughts I've run into.
>
> A) I'm tentatively going the separate interface route (Matthias's
> suggestion) and naming it ConnectedStoreProvider.  Still don't love the
> name, but there's something nice about the name indicating *why* this thing
> is providing the store, not just that it is providing it.
>
> B) It has occurred to me that topology.addProcessor() could also recognize
> if ProcessorSupplier implements ConnectedStoreProvider and add and connect
> stores appropriately.  This isn't in the KIP and I think the value-add is
> lower (if you're reaching that low level, surely the "auto add/connect
> store" isn't too important to you), but I think it would be a confusing if
> it didn't, and I don't see any real downside.
>
> Paul
>
> On Tue, Apr 30, 2019 at 4:18 AM Bruno Cadonna  wrote:
>
> > Hi,
> >
> > @Paul: Thank you for the KIP!
> >
> > I hope you do not mind that I jump in.
> >
> > I have the following comments:
> >
> > 1) `null` vs empty list in the default implementation
> > IIUC, returning `null` in the default implementation should basically
> > signal that the method `stateStores` was not overridden. Why then
> provide a
> > default implementation in the first place? Without default implementation
> > you would discover the missing implementation already at compile-time and
> > not only at runtime. If you decide not to provide a default
> implementation,
> > `XSupplier extends StateStoreConnector` would break existing code as
> > Matthias has already pointed out.
> >
> > 2) `process` method adding the StoreBuilders to the topology
> > If the default implementation returned `null` and `XSupplier extends
> > StateStoreConnector`, then existing code would break, because
> > `StreamsBuilder#addStateStore()` would throw a NPE.
> >
> > +1 for opening a WIP PR
> >
> > Best,
> > Bruno
> >
> >
> > On Sun, Apr 28, 2019 at 10:57 PM Matthias J. Sax 
> > wrote:
> >
> > > Thank Paul!
> > >
> > > I agree with all of that. If we think that the general design is good,
> > > refactoring a PR if we want to pick a different name should not be too
> > > much additional work (hopefully). Thus, if you want to open a WIP PR
> and
> > > we use it to nail the open details, it might help to find a good
> > > conclusion.
> > >
> > > >> 2) Default method vs new interface:
> > >
> > > This seems to be the hardest tradeoff. I see the point about
> > > discoveability... Might be good to get input from others, which version
> > > they would prefer.
> > >
> > > Just to make clear, my suggestion from the last email was, that
> > > `Transformer` etc does not extend the new interface. Instead, a user
> > > that want to use this feature would need to implement both interfaces.
> > >
> > > If `Transformer extends StoreProvider` (just picking a name here)
> > > without default implementation existing code would break and thus it
> not
> > > a an 

[jira] [Created] (KAFKA-8315) Cannot pass Materialized into a join operation

2019-05-02 Thread Andrew (JIRA)
Andrew created KAFKA-8315:
-

 Summary: Cannot pass Materialized into a join operation
 Key: KAFKA-8315
 URL: https://issues.apache.org/jira/browse/KAFKA-8315
 Project: Kafka
  Issue Type: Bug
Reporter: Andrew


The documentation says to use `Materialized` not `JoinWindows.until()` 
(https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-),
 but there is no where to pass a `Materialized` instance to the join operation, 
only to the group operation is supported it seems.

 

Slack conversation here : 
https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-464 Default Replication Factor for AdminClient#createTopic

2019-05-02 Thread Andy Coates
Rather than adding overloaded constructors, which can lead to API bloat, how 
about using a builder pattern?

I see it’s already got some constructor overloading, but we could add a single 
new constructor that takes just the name, and support everything else being set 
via builder methods.

This would result in a better long term api as the number of options increases.

Sent from my iPhone

> On 30 Apr 2019, at 16:28, Almog Gavra  wrote:
> 
> Hello Everyone,
> 
> I'd like to start a discussion on KIP-464:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Default+Replication+Factor+for+AdminClient%23createTopic
> 
> It's about allowing users of the AdminClient to supply only a partition
> count and to use the default replication factor configured by the kafka
> cluster. Happy to receive any and all feedback!
> 
> Cheers,
> Almog


Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-05-02 Thread Matthias J. Sax
I also agree with Michael's observation about the core problem of
current `branch()` implementation.

However, I also don't like to pass in a clumsy Map object. My thinking
was more aligned with Paul's proposal to just add a name to each
`branch()` statement and return a `Map`.

It makes the code easier to read, and also make the order of
`Predicates` (that is essential) easier to grasp.

 Map> branches = stream.split()
.branch("branchOne", Predicate)
.branch( "branchTwo", Predicate)
.defaultBranch("defaultBranch");

An open question is the case for which no defaultBranch() should be
specified. Atm, `split()` and `branch()` would return `BranchedKStream`
and the call to `defaultBranch()` that returns the `Map` is mandatory
(what is not the case atm). Or is this actually not a real problem,
because users can just ignore the branch returned by `defaultBranch()`
in the result `Map` ?


About "inlining": So far, it seems to be a matter of personal
preference. I can see arguments for both, but no "killer argument" yet
that clearly make the case for one or the other.


-Matthias

On 5/1/19 6:26 PM, Paul Whalen wrote:
> Perhaps inlining is the wrong terminology. It doesn’t require that a lambda 
> with the full downstream topology be defined inline - it can be a method 
> reference as with Ivan’s original suggestion.  The advantage of putting the 
> predicate and its downstream logic (Consumer) together in branch() is that 
> they are required to be near to each other. 
> 
> Ultimately the downstream code has to live somewhere, and deep branch trees 
> will be hard to read regardless.
> 
>> On May 1, 2019, at 1:07 PM, Michael Drogalis  
>> wrote:
>>
>> I'm less enthusiastic about inlining the branch logic with its downstream
>> functionality. Programs that have deep branch trees will quickly become
>> harder to read as a single unit.
>>
>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen  wrote:
>>>
>>> Also +1 on the issues/goals as Michael outlined them, I think that sets a
>>> great framework for the discussion.
>>>
>>> Regarding the SortedMap solution, my understanding is that the current
>>> proposal in the KIP is what is in my PR which (pending naming decisions) is
>>> roughly this:
>>>
>>> stream.split()
>>>.branch(Predicate, Consumer>)
>>>.branch(Predicate, Consumer>)
>>>.defaultBranch(Consumer>);
>>>
>>> Obviously some ordering is necessary, since branching as a construct
>>> doesn't work without it, but this solution seems like it provides as much
>>> associativity as the SortedMap solution, because each branch() call
>>> directly associates the "conditional" with the "code block."  The value it
>>> provides over the KIP solution is the accessing of streams in the same
>>> scope.
>>>
>>> The KIP solution is less "dynamic" than the SortedMap solution in the sense
>>> that it is slightly clumsier to add a dynamic number of branches, but it is
>>> certainly possible.  It seems to me like the API should favor the "static"
>>> case anyway, and should make it simple and readable to fluently declare and
>>> access your branches in-line.  It also makes it impossible to ignore a
>>> branch, and it is possible to build an (almost) identical SortedMap
>>> solution on top of it.
>>>
>>> I could also see a middle ground where instead of a raw SortedMap being
>>> taken in, branch() takes a name and not a Consumer.  Something like this:
>>>
>>> Map> branches = stream.split()
>>>.branch("branchOne", Predicate)
>>>.branch( "branchTwo", Predicate)
>>>.defaultBranch("defaultBranch", Consumer>);
>>>
>>> Pros for that solution:
>>> - accessing branched KStreams in same scope
>>> - no double brace initialization, hopefully slightly more readable than
>>> SortedMap
>>>
>>> Cons
>>> - downstream branch logic cannot be specified inline which makes it harder
>>> to read top to bottom (like existing API and SortedMap, but unlike the KIP)
>>> - you can forget to "handle" one of the branched streams (like existing
>>> API and SortedMap, but unlike the KIP)
>>>
>>> (KBranchedStreams could even work *both* ways but perhaps that's overdoing
>>> it).
>>>
>>> Overall I'm curious how important it is to be able to easily access the
>>> branched KStream in the same scope as the original.  It's possible that it
>>> doesn't need to be handled directly by the API, but instead left up to the
>>> user.  I'm sort of in the middle on it.
>>>
>>> Paul
>>>
>>>
>>>
>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman 
>>> wrote:
>>>
 I'd like to +1 what Michael said about the issues with the existing
>>> branch
 method, I agree with what he's outlined and I think we should proceed by
 trying to alleviate these problems. Specifically it seems important to be
 able to cleanly access the individual branches (eg by mapping
 name->stream), which I thought was the original intention of this KIP.

 That said, I don't think we should so easily give in to the double brace
 

[jira] [Created] (KAFKA-8314) Managing the doc field in case of schema projection - kafka connect

2019-05-02 Thread kaushik srinivas (JIRA)
kaushik srinivas created KAFKA-8314:
---

 Summary: Managing the doc field in case of schema projection - 
kafka connect
 Key: KAFKA-8314
 URL: https://issues.apache.org/jira/browse/KAFKA-8314
 Project: Kafka
  Issue Type: Bug
Reporter: kaushik srinivas


Doc field change in the schema while writing to hdfs using hdfs sink connector 
via connect framework would cause failures in schema projection.

 

java.lang.RuntimeException: 
org.apache.kafka.connect.errors.SchemaProjectorException: Schema parameters not 
equal. source parameters: \{connect.record.doc=xxx} and target parameters: 
\{connect.record.doc=yyy} 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Kafka Consumer - Random access fetching of partition range

2019-05-02 Thread Nitzan Aviram
Hi Kafka developers,

*The question:* How can I randomly fetch an old chunk of messages with a
given range definition of [partition, start offset, end offset]. Hopefully
ranges from multiple partitions at once (one range for each partition).
This needs to be supported in a concurrent environment too.


*My ideas for solution so far* I guess I can use a pool of consumers for
the concurrency, and for each fetch, use Consumer.seek and Consumer.poll
 with max.poll.records. But this seems wrong. No promise that I will get
the same exact chunk, for example in a case when a message get deleted
(using log compact). As a whole this seek + poll method not seems like the
right fit for one time random fetch.

*My use case:* Like the typical consumer, mine reads 10MB chunks of
messages and processes it. In order to process that chunk I am pushing 3-20
jobs to different topics, in some kind of workflow. Now, my goal is to
avoid pushing the same chunk into the other topics again and again. Seems
to me that it is better to push a reference to that chunk. e.g. [Topic X /
partition Y, start offset, end offset]. Then, on the processing of the
jobs, it will fetch the exact chunk again.
I also posted a question in SO -
https://stackoverflow.com/q/55950565/1265306

-- 
Thanks,
Nitzan


Re: [VOTE] KIP-464: Defaults for AdminClient#createTopic

2019-05-02 Thread Mickael Maison
I was planning to write a KIP for the exact same feature!
+1 (non binding)

Thanks for the KIP

On Wed, May 1, 2019 at 7:24 PM Almog Gavra  wrote:
>
> Hello Everyone!
>
> Kicking off the voting for
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic
>
>
> You can see discussion thread here (please respond with suggestions on that
> thread):
> https://lists.apache.org/thread.html/c0adfd2457e5984be7471fe6ade8a94d52c647356c81c039445d6b34@%3Cdev.kafka.apache.org%3E
>
>
> Cheers,
> Almog


Jenkins build is back to normal : kafka-trunk-jdk8 #3594

2019-05-02 Thread Apache Jenkins Server
See