Re: [DISCUSS] KIP-1000: List Client Metrics Configuration Resources

2023-11-07 Thread Adam Bellemare
Let's take this moment to celebrate the creation of KIP-1000!

On Tue, Nov 7, 2023 at 3:49 PM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

> Hi,
> I would like to start discussion of a small KIP which fills a gap in the
> administration of client metrics configuration.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1000%3A+List+Client+Metrics+Configuration+Resources
>
> Thanks,
> Andrew


Re: [ANNOUNCE] New PMC chair: Mickael Maison

2023-04-21 Thread Adam Bellemare
Thank you for all your hard work Jun - that's a decade-long legacy!
And congratulations to you Mickael!

On Fri, Apr 21, 2023 at 11:20 AM Josep Prat 
wrote:

> Thanks Jun for your work as Chair all these years!
> Congratulations Mickael!
>
> Best,
>
> ———
> Josep Prat
>
> Aiven Deutschland GmbH
>
> Alexanderufer 3-7, 10117 Berlin
>
> Amtsgericht Charlottenburg, HRB 209739 B
>
> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>
> m: +491715557497
>
> w: aiven.io
>
> e: josep.p...@aiven.io
>
> On Fri, Apr 21, 2023, 17:10 Jun Rao  wrote:
>
> > Hi, everyone,
> >
> > After more than 10 years, I am stepping down as the PMC chair of Apache
> > Kafka. We now have a new chair Mickael Maison, who has been a PMC member
> > since 2020. I plan to continue to contribute to Apache Kafka myself.
> >
> > Congratulations, Mickael!
> >
> > Jun
> >
>


Re: [ANNOUNCE] New Kafka PMC Member: A. Sophie Blee-Goldman

2022-08-01 Thread Adam Bellemare
Congratulations Sophie! I’m glad to see you made as a PMC member! Well earned. 

> On Aug 1, 2022, at 6:42 PM, Guozhang Wang  wrote:
> 
> Hi everyone,
> 
> I'd like to introduce our new Kafka PMC member, Sophie. She has been a
> committer since Oct. 2020 and has been contributing to the community
> consistently, especially around Kafka Streams and Kafka java consumer. She
> has also presented about Kafka Streams at Kafka Summit London this year. It
> is my pleasure to announce that Sophie agreed to join the Kafka PMC.
> 
> Congratulations, Sophie!
> 
> -- Guozhang Wang, on behalf of Apache Kafka PMC


Re: Support regarding the connection of Redshift to Kafka

2022-06-10 Thread Adam Bellemare
Hi Israa

I suspect you could use the JDBC Source Connector to connect to Redshift.
You may also need to play around with the Redshift JDBC driver to get it
working. : https://github.com/aws/amazon-redshift-jdbc-driver
For clarity, I haven't done this myself but I suspect this would be a
reasonable approach.



On Fri, Jun 10, 2022 at 10:08 AM Israa Jaffery  wrote:

> Good Afternoon,
> There is a requirement wherein we need to select data from few tables in
> AWS redshift fit them into a data model and then transfer it via a
> datapipeline to GCP Bigquery.
> We are planning to use Apache Kafka and Kafka Connectors as the data
> modelling/pipe-lining interface between redshift and bigquery.
> Can you please let us know how we can transfer data from Redshift to Kafka
> as mostly we got information on how to transfer data from Kafka to Redshift
> but not the other way around?
>
>
> Regards,
>
> Israa Jaffery
> Big Data Engineer
> 91-7030562393
>
>
>
>
>
>


Re: newbie: PR 11955 review

2022-03-31 Thread Adam Bellemare
Thanks Bruno :)

On Thu, Mar 31, 2022 at 10:09 AM Bruno Cadonna  wrote:

> Hi Rajani and Adam,
>
> I do not think "retest this" or any other comment works for
> non-committers anymore.
>
> Rajani, the failures seem to be flaky tests since they are unrelated to
> your change. Could you please look into JIRA for the flaky tests? You
> should find tickets that start with "Flaky test" or similar. If you find
> tickets, comment on the tickets that the test failed again. I think you
> will find other comments that you can use as an example.
>
> I restarted the builds.
>
> Best,
> Bruno
>
> On 31.03.22 15:51, Adam Bellemare wrote:
> > It could be an intermittent error. I think there is a way to issue a
> retest
> > via a comment in github ("retest this") but I am not sure if that's the
> > precise command, nor if you will have the ability to kick it off.
> >
> >
> >
> > On Thu, Mar 31, 2022 at 9:12 AM Rajani Karuturi 
> wrote:
> >
> >> Hi All,
> >> I am new here. To get started, I took one of the "newbie" issues and
> made
> >> changes for the same.
> >>
> >> Issue: https://issues.apache.org/jira/browse/KAFKA-12380
> >> PR: https://github.com/apache/kafka/pull/11955
> >>
> >> PR build shows some test failures but the same pass on my local.
> >> The change is very isolated and should not cause any failures ideally.
> >> Can someone please help me on the next steps?
> >>
> >> Thanks,
> >> ~ Rajani
> >>
> >
>


Re: newbie: PR 11955 review

2022-03-31 Thread Adam Bellemare
It could be an intermittent error. I think there is a way to issue a retest
via a comment in github ("retest this") but I am not sure if that's the
precise command, nor if you will have the ability to kick it off.



On Thu, Mar 31, 2022 at 9:12 AM Rajani Karuturi  wrote:

> Hi All,
> I am new here. To get started, I took one of the "newbie" issues and made
> changes for the same.
>
> Issue: https://issues.apache.org/jira/browse/KAFKA-12380
> PR: https://github.com/apache/kafka/pull/11955
>
> PR build shows some test failures but the same pass on my local.
> The change is very isolated and should not cause any failures ideally.
> Can someone please help me on the next steps?
>
> Thanks,
> ~ Rajani
>


Re: Gigantic list of Avro spec issues

2022-02-15 Thread Adam Bellemare
Hi Askar

This is certainly an extensive list. I wanted to email you just to let you
know that *someone* has seen it. Though I can't speak to all of it, I do
have a few of my own impressions:

> How it is happened that such good format has so bad spec? How it is
happened that *best* format for this task happened to be so bad? What this
says about our industry?

I think you're hitting on some very good points here. I contributed to Avro
a ways back, and after not having much in the way of traction or responses,
Doug Cutting himself stepped in to help me out and get my PR pushed
through. I think it took another year until the official version with the
fix I needed was released. I suspect, though I don't want to accuse in any
way, that there are not that many contributors to Avro. Part of this may be
that it has predominantly been Java-centric, and part of this may be that
many folks may have consider it "solved". I am not sure. This is only my
thoughts.

I am equally concerned about the seeming lack of involvement in this space.
I don't have an answer. I do know that many open source software projects
suffer a similar sort of "high-use, low-contribution" problem. I think it's
important that you voiced this as a concern. But I don't have a solution
for you at the moment. Perhaps someone with more knowledge or experience
working on or with Avro can chime in - again, I hope it is clear that I am
not finger pointing - I know maintaining OSS is *hard*, and I suspect that
we may simply have run out of people willing to put the effort in to make
Avro better than it currently is...

Adam



On Sat, Feb 12, 2022 at 6:22 PM Askar Safin 
wrote:

> Hi. I'm writing my own Avro implementation in Rust for personal use.
> During this work I found a lot of issues in Avro
> spec (the list follows).
>
> I send this mail not only to user and dev mailing lists of Avro, but also
> to Apache community list, Kafka list and 3
> semi-randomly chosen Materialize employees. Because I want to draw
> attention to this problems. I hope this wider
> community helps Avro fix their issues and possible will give necessary
> resources.
>
> As well as I understand Avro is used in Kafka. And Kafka, according to
> their site, is used in "80% of all Fortune 100
> companies". So Avro is critical piece of infrastructure of humanity. It
> should be absolutely perfect (and so I list
> even very small bugs). But it is not perfect.
>
> Some of items in this list are (small and big) bugs, some are typos, some
> are my objections to the design. Some are
> fixable while keeping compatibility, some are not. I don't want to spend
> my time to report them as separate bugs, but
> you can try to convince me to do so.
>
> I created this list simply by reading the spec from end to end (I skipped
> sections on RPC and logical types). And I
> even didn't look at implementations!
>
> I write this is hope to help Avro.
>
> I think big audit of spec and its implementations should be done.
>
> All line numbers apply to spec.xml from tag release-1.11.0 (i. e.
>
> https://github.com/apache/avro/blob/release-1.11.0/doc/src/content/xdocs/spec.xml
> ). As well as I understand this tag
> corresponds to currently published version at
> https://avro.apache.org/docs/current/spec.html .
>
> So, here we go.
>
> * [Opinion] [No line]. In Avro one have to define named records inside
> each other like so:
>
> { "type": "record", "name": "a", "fields":
> [{"name":"b","type":{"type":"record","name":"c",...}}] }
>
> This is very unnatural. In popular programming languages one usually
> define named record next to each other, not one
> inside other. Such representation is not handy to deal programmatically.
> In my implementation I have to convert this
> representation to usual form "root type + list of named types" right after
> reading JSON and convert back just before
> writing.
>
> * [Opinion] [No line]. In this list you will see a lot of questions on
> Avro schema (encoded as JSON). Good JSON schema
> ( https://json-schema.org/ ) would resolve many of them
>
> * [Seems to be bug] [Line 49]. "derived type name" is vague term. In fact,
> in whole spec phrase "type name" is used
> very vaguely. Sometimes it means strings like "record" and sometimes it
> means names of named types. I propose to define
> in very beginning of the spec terms for primitive types, terms for strings
> like "record" and terms for names of defined
> types. Here is one possible way to do this: name strings like "record" and
> "fixed" "type kinds" and never name them
> type names, thus reserving term "type name" to named types only (and
> possibly to primitive types).
>
> This issue already caused problems: look, for example, to this problems
> with {"type":"record","name":"record",...}:
> https://lists.apache.org/thread/0wmgyx6z69gy07lvj9ndko75752b8cn2 .
>
> * [Opinion] [Line 58]. There is no primitive type for unsigned 64 bit
> integers. Such type is present in languages such
> as C and Rust
>
> * [Very 

Re: [VOTE] KIP-775: Custom partitioners in foreign key joins

2021-09-28 Thread Adam Bellemare
+1 (non-binding)

Glad to see this in here :)

On Tue, Sep 28, 2021 at 5:11 PM Bill Bejeck  wrote:

> +1 (binding)
>
> On Tue, Sep 28, 2021 at 12:59 PM Matthias J. Sax  wrote:
>
> > +1 (binding)
> >
> > On 9/28/21 8:29 AM, Victoria Xia wrote:
> > > Hi all,
> > >
> > > I'd like to start a vote for KIP-775 for adding Kafka Streams support
> for
> > > foreign key joins on tables with custom partitioners:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins
> > >
> > > Thanks,
> > > Victoria
> > >
> >
>


Re: Kafka Issue

2021-08-19 Thread Adam Bellemare
Hi folks

You’ll need to host the images externally and send a link. The email server 
doesn’t send images due to the sheer number of recipients.
Try using something like imgur 

Adam 

> On Aug 19, 2021, at 2:18 AM, Roel Angeles  
> wrote:
> 
> 
> Hi Luke,
>  
> Resending the screenshot.
> 
> 
>  
>  
> Changes from
> 
>  
> to
> 
>  
>  
>  
> All the best,
>  
> 
> Roel Angeles
> Consultant
>  
> +63 9157163557
> roel.ange...@lingarogroup.com
> www.lingarogroup.com
>  
>  
> Lingaro Philippines Inc.
> 41F Philamlife Tower
> 8767 Paseo de Roxas
> Makati City, 1226
> +63 2753 8865
>  
> Next OOO planned:  
>  
>  
> From: Luke Chen  
> Sent: Thursday, 19 August 2021 2:44 pm
> To: dev 
> Cc: Roel Angeles ; Errol Pontillas 
> ; Jhunabelle Santos 
> ; Andrew Astrologo 
> ; yminag...@coca-cola.com; coke.support 
> 
> Subject: Re: Kafka Issue
>  
> CAUTION: This email originated from outside of the Lingaro organization. Do 
> not click links or open attachments unless you recognize the sender and know 
> the content is safe.
> Hi Roel,
> I'm not sure if it's just me or not, but I can't see your screenshots, so I 
> don't know what happened.
>  
> Thanks.
> Luke
>  
> On Wed, Aug 18, 2021 at 10:12 PM Jason Kamdon  
> wrote:
> Hi Team,
>  
> Can we have an update regarding below concern.
>  
>  
> Thanks!
>  
> Regards,
> Jason Kamdon
> Service Level Manager
>  +639178544422
> jason.kam...@lingarogroup.com
> www.lingarogroup.com
>  
>  
>  
> Lingaro Philippines Inc.
> 41F Philamlife Tower
> 8767 Paseo de Roxas
> Makati City, 1226
> +63 2753 8865
>  
>  
> Next OOO planned:
> Privacy notice: Your personal data is being administered by Lingaro sp. z 
> o.o., with its registered office in Warsaw under Polish National Court 
> Register (KRS) number: 241638 ("Lingaro"). Lingaro processes your 
> personal data for the purpose of contacting you and to perform its services 
> on the basis of a contract (as per Article 6 sec. 1(b) GDPR: in connection 
> with performing a contract or to undertake the activities upon your request 
> prior to the conclusion of a contract), on the basis of legitimate interest 
> of Lingaro (Article 6 sec. 1(f) GDPR) and/or on the basis of established 
> provisions of law (Article 6 sec. 1(c) GDPR) - depending on the 
> circumstances. See the details on Lingaro website.
> 
>  
>  
>  
>  
> From: Roel Angeles  
> Sent: Monday, August 16, 2021 2:25 PM
> To: dev@kafka.apache.org
> Cc: Jason Kamdon ; Errol Pontillas 
> ; Jhunabelle Santos 
> ; Andrew Astrologo 
> 
> Subject: Kafka Issue
>  
> Hi Kafka Dev Team,
>  
> We are requesting on your help about on our encountered issue about kafka.
> Please see below screenshot of error. It says partition does not exist but on 
> our event hub it is active.
>  
>  
>  
> Hoping for your kind response about this and big help for our team. Thanks.
>  
> All the best,
>  
> Roel Angeles
> Consultant
>  
> +63 9157163557
> roel.ange...@lingarogroup.com
> www.lingarogroup.com
>  
>  
>  
> Lingaro Philippines Inc.
> 41F Philamlife Tower
> 8767 Paseo de Roxas
> Makati City, 1226
> +63 2753 8865
>  
>  
> Next OOO planned:  
>  
>  


Re: [ANNOUNCE] New Kafka PMC Member: Konstantine Karantasis

2021-06-22 Thread Adam Bellemare
Congratulations Konstantine!!!

On Tue, Jun 22, 2021 at 10:34 AM Jeremy Custenborder <
jcustenbor...@gmail.com> wrote:

> NICE work buddy!
>
> On Tue, Jun 22, 2021 at 9:33 AM Rankesh Kumar
>  wrote:
> >
> > Yay! Congratulations, KK!
> >
> > Best regards,
> > Rankesh Kumar
> > Partner Solutions Engineer
> > +91 (701)913-0147
> > Follow us:  Blog • Slack • Twitter • YouTube
> >
> > > On 21-Jun-2021, at 8:58 PM, Mickael Maison 
> wrote:
> > >
> > > Hi,
> > >
> > > It's my pleasure to announce that Konstantine Karantasis is now a
> > > member of the Kafka PMC.
> > >
> > > Konstantine has been a Kafka committer since Feb 2020. He has remained
> > > active in the community since becoming a committer.
> > >
> > > Congratulations Konstantine!
> > >
> > > Mickael, on behalf of the Apache Kafka PMC
> >
>


Re: [kafka-clients] Re: [ANNOUNCE] Apache Kafka 2.8.0

2021-04-19 Thread Adam Bellemare
Wow, 2.8! I remember 0.8.2.0 :)

Thanks for all the work everyone, and thank you John for putting together
the release!

On Mon, Apr 19, 2021 at 3:48 PM Israel Ekpo  wrote:

> This is fantastic news!
>
> Thanks everyone for contributing and thanks John for managing the release.
>
> On Mon, Apr 19, 2021 at 1:10 PM Guozhang Wang  wrote:
>
> > This is great! Thanks to everyone who has contributed to the release.
> >
> > On Mon, Apr 19, 2021 at 9:36 AM John Roesler 
> wrote:
> >
> >> The Apache Kafka community is pleased to announce the
> >> release for Apache Kafka 2.8.0
> >>
> >> Kafka 2.8.0 includes a number of significant new features.
> >> Here is a summary of some notable changes:
> >>
> >> * Early access of replace ZooKeeper with a self-managed
> >> quorum
> >> * Add Describe Cluster API
> >> * Support mutual TLS authentication on SASL_SSL listeners
> >> * JSON request/response debug logs
> >> * Limit broker connection creation rate
> >> * Topic identifiers
> >> * Expose task configurations in Connect REST API
> >> * Update Streams FSM to clarify ERROR state meaning
> >> * Extend StreamJoined to allow more store configs
> >> * More convenient TopologyTestDriver construtors
> >> * Introduce Kafka-Streams-specific uncaught exception
> >> handler
> >> * API to start and shut down Streams threads
> >> * Improve TimeWindowedDeserializer and TimeWindowedSerde to
> >> handle window size
> >> * Improve timeouts and retries in Kafka Streams
> >>
> >> All of the changes in this release can be found in the
> >> release notes:
> >> https://www.apache.org/dist/kafka/2.8.0/RELEASE_NOTES.html
> >>
> >>
> >> You can download the source and binary release (Scala 2.12
> >> and 2.13) from:
> >> https://kafka.apache.org/downloads#2.8.0
> >>
> >> 
> >> ---
> >>
> >>
> >> Apache Kafka is a distributed streaming platform with four
> >> core APIs:
> >>
> >>
> >> ** The Producer API allows an application to publish a
> >> stream records to one or more Kafka topics.
> >>
> >> ** The Consumer API allows an application to subscribe to
> >> one or more topics and process the stream of records
> >> produced to them.
> >>
> >> ** The Streams API allows an application to act as a stream
> >> processor, consuming an input stream from one or more topics
> >> and producing an output stream to one or more output topics,
> >> effectively transforming the input streams to output
> >> streams.
> >>
> >> ** The Connector API allows building and running reusable
> >> producers or consumers that connect Kafka topics to existing
> >> applications or data systems. For example, a connector to a
> >> relational database might capture every change to a table.
> >>
> >>
> >> With these APIs, Kafka can be used for two broad classes of
> >> application:
> >>
> >> ** Building real-time streaming data pipelines that reliably
> >> get data between systems or applications.
> >>
> >> ** Building real-time streaming applications that transform
> >> or react to the streams of data.
> >>
> >>
> >> Apache Kafka is in use at large and small companies
> >> worldwide, including Capital One, Goldman Sachs, ING,
> >> LinkedIn, Netflix, Pinterest, Rabobank, Target, The New York
> >> Times, Uber, Yelp, and Zalando, among others.
> >>
> >> A big thank you for the following 128 contributors to this
> >> release!
> >>
> >> 17hao, abc863377, Adem Efe Gencer, Alexander Iskuskov, Alok
> >> Nikhil, Anastasia Vela, Andrew Lee, Andrey Bozhko, Andrey
> >> Falko, Andy Coates, Andy Wilkinson, Ankit Kumar, APaMio,
> >> Arjun Satish, ArunParthiban-ST, A. Sophie Blee-Goldman,
> >> Attila Sasvari, Benoit Maggi, bertber, bill, Bill Bejeck,
> >> Bob Barrett, Boyang Chen, Brajesh Kumar, Bruno Cadonna,
> >> Cheng Tan, Chia-Ping Tsai, Chris Egerton, CHUN-HAO TANG,
> >> Colin Patrick McCabe, Colin P. Mccabe, Cyrus Vafadari, David
> >> Arthur, David Jacot, David Mao, dengziming, Dhruvil Shah,
> >> Dima Reznik, Dongjoon Hyun, Dongxu Wang, Emre Hasegeli,
> >> feyman2016, fml2, Gardner Vickers, Geordie, Govinda Sakhare,
> >> Greg Harris, Guozhang Wang, Gwen Shapira, Hamza Slama,
> >> high.lee, huxi, Igor Soarez, Ilya Ganelin, Ismael Juma, Ivan
> >> Ponomarev, Ivan Yurchenko, jackyoh, James Cheng, James
> >> Yuzawa, Jason Gustafson, Jesse Gorzinski, Jim Galasyn, John
> >> Roesler, Jorge Esteban Quilcate Otoya, José Armando García
> >> Sancio, Julien Chanaud, Julien Jean Paul Sirocchi, Justine
> >> Olshan, Kengo Seki, Kowshik Prakasam, leah, Lee Dongjin,
> >> Levani Kokhreidze, Lev Zemlyanov, Liju John, Lincong Li,
> >> Lucas Bradstreet, Luke Chen, Manikumar Reddy, Marco Aurelio
> >> Lotz, mathieu, Matthew Wong, Matthias J. Sax, Matthias
> >> Merdes, Michael Bingham, Michael G. Noll, Mickael Maison,
> >> Montyleo, mowczare, Nikolay, Nikolay Izhikov, Ning Zhang,
> >> Nitesh Mor, Okada Haruki, panguncle, parafiend, Patrick
> >> Dignan, Prateek Agarwal, Prithvi, Rajini Sivaram, Raman
> >> Verma, Ramesh 

Re: [ANNOUNCE] New Kafka PMC Member: Randall Hauch

2021-04-17 Thread Adam Bellemare
Congratulations Randall !

> On Apr 17, 2021, at 6:24 PM, Randall Hauch  wrote:
> 
> Thanks for the kind responses, everyone!
> 
> Best regards,
> 
> Randall
> 
>> On Sat, Apr 17, 2021 at 4:00 PM Guozhang Wang  wrote:
>> 
>> Congratulations Randall ! Well deserved.
>> 
>> Guozhang
>> 
>>> On Fri, Apr 16, 2021 at 4:45 PM Matthias J. Sax  wrote:
>>> 
>>> Hi,
>>> 
>>> It's my pleasure to announce that Randall Hauch in now a member of the
>>> Kafka PMC.
>>> 
>>> Randall has been a Kafka committer since Feb 2019. He has remained
>>> active in the community since becoming a committer.
>>> 
>>> 
>>> 
>>> Congratulations Randall!
>>> 
>>> -Matthias, on behalf of Apache Kafka PMC
>>> 
>> 
>> 
>> --
>> -- Guozhang
>> 


Re: [ANNOUNCE] New Kafka PMC Member: Bill Bejeck

2021-04-17 Thread Adam Bellemare
Congratulations Bill!!

> On Apr 17, 2021, at 5:20 PM, Kowshik Prakasam 
>  wrote:
> 
> Congrats Bill!
> 
> 
> Cheers,
> Kowshik
> 
>> On Mon, Apr 12, 2021, 11:15 AM Randall Hauch  wrote:
>> 
>> Congratulations, Bill!
>> 
>>> On Mon, Apr 12, 2021 at 11:02 AM Guozhang Wang  wrote:
>>> 
>>> Congratulations Bill !
>>> 
>>> Guozhang
>>> 
 On Wed, Apr 7, 2021 at 6:16 PM Matthias J. Sax  wrote:
>>> 
 Hi,
 
 It's my pleasure to announce that Bill Bejeck in now a member of the
 Kafka PMC.
 
 Bill has been a Kafka committer since Feb 2019. He has remained
 active in the community since becoming a committer.
 
 
 
 Congratulations Bill!
 
 -Matthias, on behalf of Apache Kafka PMC
 
>>> 
>>> 
>>> --
>>> -- Guozhang
>>> 
>> 


Re: Known issues with time stepping

2021-04-05 Thread Adam Bellemare
Did you check the JIRAs?

On Mon, Apr 5, 2021 at 6:58 PM Tirtha Chatterjee <
tirtha.p.chatter...@gmail.com> wrote:

> Hi team
>
> Are there any known issues with Kafka that can happen because of the system
> time getting stepped forward or backward in one shot? This can happen
> because of NTP time syncs.
>
> --
> Regards
> Tirtha Chatterjee
>


Re: [DISCUSS] Please review 2.8.0 blog post

2021-04-05 Thread Adam Bellemare
Read it all. It looks good to me in terms of structure and content. I am
not sufficiently up to date on all the features that are otherwise included
in 2.8.0, but the ones listed seem very prominent!



On Thu, Apr 1, 2021 at 4:39 PM John Roesler  wrote:

> Hello all,
>
> In the steady march toward the Apache Kafka 2.8.0 release, I
> have prepared a draft of the release announcement post:
>
> https://blogs.apache.org/preview/kafka/?previewEntry=what-s-new-in-apache5
>
> If you have a moment, I would greatly appreciate your
> reviews.
>
> Thank you,
> -John
>
>


[jira] [Created] (KAFKA-12323) Record timestamps not populated in event

2021-02-11 Thread Adam Bellemare (Jira)
Adam Bellemare created KAFKA-12323:
--

 Summary: Record timestamps not populated in event
 Key: KAFKA-12323
 URL: https://issues.apache.org/jira/browse/KAFKA-12323
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.7.0
Reporter: Adam Bellemare


Upgraded a kafka streams application from 2.6.0 to 2.7.0. Noticed that the 
events being produced had a "CreatedAt" timestamp = 0, causing downstream 
failures as we depend on those timestamps. Reverting back to 2.6.0/2.6.1 fixed 
this issue. This was the only change to the Kafka Streams application.


Consuming the event stream produced by 2.6.0 results in events that, when 
consumed using the `kafka-avro-console-consumer` and `--property 
print.timestamp=true` result in events prepended with the event times, such as:
```
CreateTime:1613072202271  
CreateTime:1613072203412  
CreateTime:1613072205431  
```

etc.

However, when those events are produced by the Kafka Streams app using 2.7.0, 
we get:

```
CreateTime:0  
CreateTime:0  
CreateTime:0  
```

I don't know if these is a default value somewhere that changed, but this is 
actually a blocker for our use-cases as we now need to circumnavigate this 
limitation (or roll back to 2.6.1, though there are other issues we must deal 
with then). I am not sure which unit tests in the code base to look at to 
validate this, but I wanted to log this bug now in case someone else has 
already seen this or an open one exists (I didn't see one though).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Why many "Load Bug xxx" JIRA bug by Tim?

2021-01-07 Thread Adam Bellemare
If we do look to enable Captchas, I think it would be important that we
avoid corporate offerings (eg: Google's).


On Thu, Jan 7, 2021 at 12:12 PM Govinda Sakhare 
wrote:

> Hi,
>
> If it is possible, we should configure/enable Captcha to prevent automated
> spamming attacks.
>
> Thanks
> Govinda
>
> On Wed, Jan 6, 2021 at 11:30 PM Matthias J. Sax  wrote:
>
> > This was a spamming attack.
> >
> > The user was blocked and the corresponding tickets were deleted. (Cf.
> > https://issues.apache.org/jira/browse/INFRA-21268)
> >
> > The "problem" is, that anybody can create an Jira account and create
> > tickets. It's in the spirit of open source and the ASF to not lock down
> > Jira, to make it easy for people to report issues.
> >
> > The drawback is, that stuff like this can happen. It's easy to write a
> > bot to spam the Jira board...
> >
> > Because Jira is managed by the ASF infra-team, Kafka committers/PMC
> > cannot block users and thus it takes a little longer to react to an
> > issue like this, as we need to wait for the infra team to help out.
> >
> >
> > -Matthias
> >
> >
> > On 1/6/21 1:14 AM, M. Manna wrote:
> > > I had to register this as spam and block them. I couldn’t disable it
> from
> > > ASF JiRA.
> > >
> > >  I’m also curious to know how/why such surge occurred.
> > >
> > > Regards,
> > >
> > > On Wed, 6 Jan 2021 at 03:45, Luke Chen  wrote:
> > >
> > >> Hi,
> > >> I received a lot of JIRA notification emails today, and they are all
> > >> titled: "Load Bug xxx" by Tim.
> > >> The bug content doesn't look like a real bug, they are like generated
> by
> > >> automation.
> > >> I'm wondering why that could happen?
> > >> Do we have any way to delete them all?
> > >>
> > >> Thanks.
> > >> Luke
> > >>
> > >
> >
>
>
> --
> Thanks  & Regards,
> Govinda Sakhare.
>


Serious Java Consumer performance issues / expectations vs. librdkafka

2020-09-17 Thread Adam Bellemare
Hi

I am trying to use a plain Java consumer (over SSL) to consume a very large
amount of historic data (20+TB across 20+ partitions). Consumption
performance is very low when fully parallelized.

We are seeing about* 200k rec/s* with java consumer versus *950k rec/s*
with librdkafka
We are seeing about *1 gigabit/s* with java consumer versus *5.3 gigabit/s*
with librdkafka

Both applications are doing no-ops (eg: consume, deserialize as byte
arrays, print a line for every 100 events). Both applications are using
defaults (including the same fetch sizes, maximums, batch sizes, etc). The
java processes do not appear to be starved for resources, CPU, memory, etc,
nor do the kafkacat instances. Everything is being run in exactly the same
environments with the same resources, but the Java Kafka client is just
incredibly slow.

Java Kafka Client version 2.4.x
JDK 11 (I think there was an SSL performance issue that required upgrading
to at least JDK 11).

Am I doing wrong here? The last time I tested the performance difference
between these two libraries was years ago, and it was something like
librdkafka was a bit faster in most cases, but certainly not 5x faster in a
no-op scenario. Is this in line with expectations?

Any thoughts or suggestions would be very much appreciated.

Thanks
Adam


Re: virtual KIP meeting for KIP-405

2020-08-20 Thread Adam Bellemare
Hello

I am interested in attending, mostly just to listen and observe.

Thanks ! 

> On Aug 20, 2020, at 6:20 PM, Jun Rao  wrote:
> 
> Hi, everyone,
> 
> We plan to have weekly virtual meetings for KIP-405 to discuss progress and
> outstanding issues, starting from this coming Tuesday at 9am PT. If you are
> interested in attending, please let Harsha or me know.
> 
> The recording of the meeting will be posted in
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> .
> 
> Thanks,
> 
> Jun


Re: [VOTE] KIP-657: Add Customized Kafka Streams Logo

2020-08-15 Thread Adam Bellemare
I prefer Design B, but given that I missed the discussion thread, I think
it would be better without the Otter obscuring any part of the Kafka logo.

On Thu, Aug 13, 2020 at 6:31 PM Boyang Chen 
wrote:

> Hello everyone,
>
> I would like to start a vote thread for KIP-657:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-657%3A+Add+Customized+Kafka+Streams+Logo
>
> This KIP is aiming to add a new logo for the Kafka Streams library. And we
> prepared two candidates with a cute otter. You could look up the KIP to
> find those logos.
>
>
> Please post your vote against these two customized logos. For example, I
> would vote for *design-A (binding)*.
>
> This vote thread shall be open for one week to collect enough votes to call
> for a winner. Still, feel free to post any question you may have regarding
> this KIP, thanks!
>


Re: [ANNOUNCE] New Kafka PMC Member: John Roesler

2020-08-10 Thread Adam Bellemare
Congratulations John! You have been an excellent help to me and many others. I 
am pleased to see this!

> On Aug 10, 2020, at 5:54 PM, Bill Bejeck  wrote:
> 
> Congrats!
> 
>> On Mon, Aug 10, 2020 at 4:52 PM Guozhang Wang  wrote:
>> 
>> Congratulations!
>> 
>>> On Mon, Aug 10, 2020 at 1:11 PM Jun Rao  wrote:
>>> 
>>> Hi, Everyone,
>>> 
>>> John Roesler has been a Kafka committer since Nov. 5, 2019. He has
>> remained
>>> active in the community since becoming a committer. It's my pleasure to
>>> announce that John is now a member of Kafka PMC.
>>> 
>>> Congratulations John!
>>> 
>>> Jun
>>> on behalf of Apache Kafka PMC
>>> 
>> 
>> 
>> --
>> -- Guozhang
>> 


Re: [VOTE] KIP-614: Add Prefix Scan support for State Stores

2020-07-19 Thread Adam Bellemare
LGTM
+1 non-binding

On Sun, Jul 19, 2020 at 4:13 AM Sagar  wrote:

> Hi All,
>
> Bumping this thread to see if there are any feedbacks.
>
> Thanks!
> Sagar.
>
> On Tue, Jul 14, 2020 at 9:49 AM John Roesler  wrote:
>
> > Thanks for the KIP, Sagar!
> >
> > I’m +1 (binding)
> >
> > -John
> >
> > On Sun, Jul 12, 2020, at 02:05, Sagar wrote:
> > > Hi All,
> > >
> > > I would like to start a new voting thread for the below KIP to add
> prefix
> > > scan support to state stores:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 614%3A+Add+Prefix+Scan+support+for+State+Stores
> > > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
> > >
> > >
> > > Thanks!
> > > Sagar.
> > >
> >
>


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2020-07-11 Thread Adam Bellemare
My 2 cents -

I agree with Colin. I think that it's important that the metadata not grow
unbounded without being delegated to external storage. Indefinite long-term
storage of entity data in Kafka can result in extremely large datasets
where the vast majority of data is stored in the external tier. I would be
very disappointed to have the metadata storage be a limiting factor to
exactly how much data I can store in Kafka. Additionally, and for example,
I think it's very reasonable that an AWS metadata store could be
implemented with DynamoDB (key-value store) paired with S3 - faster
random-access metadata lookup than plain S3, but without needing to rebuild
rocksDB state locally.



On Fri, Jul 10, 2020 at 3:57 PM Colin McCabe  wrote:

> Hi all,
>
> Thanks for the KIP.
>
> I took a look and one thing that stood out to me is that the more metadata
> we have, the more storage we will need on local disk for the rocksDB
> database.  This seems like it contradicts some of the goals of the
> project.  Ideally the space we need on local disk should be related only to
> the size of the hot set, not the size of the cold set.  It also seems like
> it could lead to extremely long rocksdb rebuild times if we somehow lose a
> broker's local storage and have to rebuild it.
>
> Instead, I think it would be more reasonable to store cold metadata in the
> "remote" storage (HDFS, s3, etc.).  Not only does this free up space on the
> local and avoid long rebuild times, but it also gives us more control over
> the management of our cache.  With rocksDB we are delegating cache
> management to an external library that doesn't really understand our
> use-case.
>
> To give a concrete example of how this is bad, imagine that we have 10
> worker threads and we get  10 requests for something that requires us to
> fetch cold tiered storage metadata.  Now every worker thread is blocked
> inside rocksDB and the broker can do nothing until it finishes fetching
> from disk.  When accessing a remote service like HDFS or S3, in contrast,
> we would be able to check if the data was in our local cache first.  If it
> wasn't, we could put the request in a purgatory and activate a background
> thread to fetch the needed data, and then release the worker thread to be
> used by some other request.  Having control of our own caching strategy
> increases observability, maintainability, and performance.
>
> I can anticipate a possible counter-argument here: the size of the
> metadata should be small and usually fully resident in memory anyway.
> While this is true today, I don't think it will always be true.  The
> current low limit of a few thousand partitions is not competitive in the
> long term and needs to be lifted.  We'd like to get to at least a million
> partitions with KIP-500, and much more later.  Also, when you give people
> the ability to have unlimited retention, they will want to make use of it.
> That means lots of historical log segments to track.  This scenario is by
> no means hypothetical.  Even with the current software, it's easy to think
> of cases where someone misconfigured the log segment roll settings and
> overwhelmed the system with segments.  So overall, I like to understand why
> we want to store metadata on local disk rather than remote, and what the
> options are for the future.
>
> best,
> Colin
>
>
> On Thu, Jul 9, 2020, at 09:55, Harsha Chintalapani wrote:
> > Hi Jun,
> >   Thanks for the replies and feedback on design and giving input.
> > We are coming close to finish the implementation.
> > We also did several perf tests as well at our peak production loads and
> > with tiered storage we didn't see any degradation on write throughputs
> and
> > latencies.
> > Ying already added some of the perf tests results in the KIP itself.
> >  It will be great if we can get design and code reviews from you
> > and others in the community as we make progress.
> > Thanks,
> > Harsha
> >
> > On Tue, Jul 7, 2020 at 10:34 AM Jun Rao  wrote:
> >
> > > Hi, Ying,
> > >
> > > Thanks for the update. It's good to see the progress on this. Please
> let
> > > us know when you are done updating the KIP wiki.
> > >
> > > Jun
> > >
> > > On Tue, Jul 7, 2020 at 10:13 AM Ying Zheng 
> wrote:
> > >
> > >> Hi Jun,
> > >>
> > >> Satish and I have added more design details in the KIP, including how
> to
> > >> keep consistency between replicas (especially when there is leadership
> > >> changes / log truncations) and new metrics. We also made some other
> minor
> > >> changes in the doc. We will finish the KIP changes in the next couple
> of
> > >> days. We will let you know when we are done. Most of the changes are
> > >> already updated to the wiki KIP. You can take a look. But it's not the
> > >> final version yet.
> > >>
> > >> As for the implementation, the code is mostly done and we already had
> some
> > >> feature tests / system tests. I have added the performance test
> results in
> > >> the KIP. However the recent 

Re: Feedback: Print schemaId using bin/kafka-dump-log.sh

2020-07-07 Thread Adam Bellemare
Hi Mohanraj

While I see the usefulness of your suggestion, the main issue is that
you're using the Confluent schema registry's conventions and hardwiring
them into Kafka core. Given that Confluent's standards are not part of
Kafka's official standards, I do not think you will get approval to submit
this code into Kafka core.

There may be Confluent tools that are available that already do this, or
perhaps they have their own custom tools available where this may be more
suitable for submission.

Adam



On Mon, Jul 6, 2020 at 11:00 AM Mohanraj Nagasamy 
wrote:

> Do anyone have feedback on this? ☺
>
> From: Mohanraj Nagasamy 
> Date: Wednesday, July 1, 2020 at 6:29 PM
> To: "dev@kafka.apache.org" 
> Subject: Feedback: Print schemaId using bin/kafka-dump-log.sh
>
> When I try to dump kafka logs for diagnosing or debugging a problem, It's
> handy to see if the kafka log message schemaId or not. If it has got, print
> the schemaId.
>
> I'm soliciting feedback as to whether it is worth making this change to
> kafka-core codebase.
>
> I'm new to the kafka-community - forgive me if this wasn't part of the
> process.
>
> This is the change I made:
>
> ```
>  core/src/main/scala/kafka/tools/DumpLogSegments.scala | 21
> +++--
>  1 file changed, 19 insertions(+), 2 deletions(-)
>
> diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
> b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
> index 9e9546a92..a8750ac3d 100755
> --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
> +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
> @@ -35,6 +35,7 @@ object DumpLogSegments {
>
>// visible for testing
>private[tools] val RecordIndent = "|"
> +  private val MAGIC_BYTE = 0x0
>
>def main(args: Array[String]): Unit = {
>  val opts = new DumpLogSegmentsOptions(args)
> @@ -277,8 +278,24 @@ object DumpLogSegments {
>}
>  } else if (printContents) {
>val (key, payload) = parser.parse(record)
> -  key.foreach(key => print(s" key: $key"))
> -  payload.foreach(payload => print(s" payload: $payload"))
> +  key.foreach(key => {
> +val keyBuffer = record.key()
> +if (keyBuffer.get() == MAGIC_BYTE) {
> +  print(s" keySchemaId: ${keyBuffer.getInt} key: $key")
> +}
> +else {
> +  print(s" key: $key")
> +}
> +  })
> +
> +  payload.foreach(payload => {
> +val valueBuffer = record.value()
> +if (valueBuffer.get() == MAGIC_BYTE) {
> +  print(s" payloadSchemaId: ${valueBuffer.getInt}
> payload: $payload")
> +} else {
> +  print(s" payload: $payload")
> +}
> +  })
>  }
>  println()
>}
> (END)
> ```
>
> And this is how the output looks like:
>
> ```
> $ bin/kafka-dump-log.sh --files
> data/kafka/logdir/avro_topic_test-0/.log
> --print-data-log
>
> | offset: 50 CreateTime: 1593570942959 keysize: -1 valuesize: 16 sequence:
> -1 headerKeys: [] payloadSchemaId: 1 payload:
> TracRowe
> baseOffset: 51 lastOffset: 51 count: 1 baseSequence: -1 lastSequence: -1
> producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
> false isControl: false position: 2918 CreateTime: 1593570958044 size: 101
> magic: 2 compresscodec: NONE crc: 1913155179 isvalid: true
> | offset: 51 CreateTime: 1593570958044 keysize: 3 valuesize: 30 sequence:
> -1 headerKeys: [] key: ... payloadSchemaId: 2 payload:
> .iRKoMVeoVVnTmQEuqwSTHZQ
> baseOffset: 52 lastOffset: 52 count: 1 baseSequence: -1 lastSequence: -1
> producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
> false isControl: false position: 3019 CreateTime: 1593570969001 size: 84
> magic: 2 compresscodec: NONE crc: 2188466405 isvalid: true
> ```
>
> -Mohan
>


Re: [ANNOUNCE] New committer: Boyang Chen

2020-06-23 Thread Adam Bellemare
Just adding my congratulations, Boyang! Thank you for all your
contributions and effort!

On Tue, Jun 23, 2020 at 9:14 PM Kowshik Prakasam 
wrote:

> Congrats, Boyang! :)
>
>
> Cheers,
> Kowshik
>
> On Tue, Jun 23, 2020 at 8:43 AM Aparnesh Gaurav 
> wrote:
>
> > Congrats Boyang.
> >
> > On Tue, 23 Jun, 2020, 9:07 PM Vahid Hashemian, <
> vahid.hashem...@gmail.com>
> > wrote:
> >
> > > Congrats Boyang!
> > >
> > > --Vahid
> > >
> > > On Tue, Jun 23, 2020 at 6:41 AM Wang (Leonard) Ge 
> > > wrote:
> > >
> > > > Congrats Boyang! This is a great achievement.
> > > >
> > > > On Tue, Jun 23, 2020 at 10:33 AM Mickael Maison <
> > > mickael.mai...@gmail.com>
> > > > wrote:
> > > >
> > > > > Congrats Boyang! Well deserved
> > > > >
> > > > > On Tue, Jun 23, 2020 at 8:20 AM Tom Bentley 
> > > wrote:
> > > > > >
> > > > > > Congratulations Boyang!
> > > > > >
> > > > > > On Tue, Jun 23, 2020 at 8:11 AM Bruno Cadonna <
> br...@confluent.io>
> > > > > wrote:
> > > > > >
> > > > > > > Congrats, Boyang!
> > > > > > >
> > > > > > > Best,
> > > > > > > Bruno
> > > > > > >
> > > > > > > On Tue, Jun 23, 2020 at 7:50 AM Konstantine Karantasis
> > > > > > >  wrote:
> > > > > > > >
> > > > > > > > Congrats, Boyang!
> > > > > > > >
> > > > > > > > -Konstantine
> > > > > > > >
> > > > > > > > On Mon, Jun 22, 2020 at 9:19 PM Navinder Brar
> > > > > > > >  wrote:
> > > > > > > >
> > > > > > > > > Many Congratulations Boyang. Very well deserved.
> > > > > > > > >
> > > > > > > > > Regards,Navinder
> > > > > > > > >
> > > > > > > > > On Tuesday, 23 June, 2020, 07:21:23 am IST, Matt Wang <
> > > > > > > wang...@163.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >  Congratulations, Boyang!
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Matt Wang
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On 06/23/2020 07:59,Boyang Chen >
> > > > wrote:
> > > > > > > > > Thanks a lot everyone, I really appreciate the recognition,
> > and
> > > > > hope to
> > > > > > > > > make more solid contributions to the community in the
> future!
> > > > > > > > >
> > > > > > > > > On Mon, Jun 22, 2020 at 4:50 PM Matthias J. Sax <
> > > > mj...@apache.org>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > Congrats! Well deserved!
> > > > > > > > >
> > > > > > > > > -Matthias
> > > > > > > > >
> > > > > > > > > On 6/22/20 4:38 PM, Bill Bejeck wrote:
> > > > > > > > > Congratulations Boyang! Well deserved.
> > > > > > > > >
> > > > > > > > > -Bill
> > > > > > > > >
> > > > > > > > > On Mon, Jun 22, 2020 at 7:35 PM Colin McCabe <
> > > cmcc...@apache.org
> > > > >
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > Congratulations, Boyang!
> > > > > > > > >
> > > > > > > > > cheers,
> > > > > > > > > Colin
> > > > > > > > >
> > > > > > > > > On Mon, Jun 22, 2020, at 16:26, Guozhang Wang wrote:
> > > > > > > > > The PMC for Apache Kafka has invited Boyang Chen as a
> > committer
> > > > > and we
> > > > > > > > > are
> > > > > > > > > pleased to announce that he has accepted!
> > > > > > > > >
> > > > > > > > > Boyang has been active in the Kafka community more than two
> > > years
> > > > > ago.
> > > > > > > > > Since then he has presented his experience operating with
> > Kafka
> > > > > Streams
> > > > > > > > > at
> > > > > > > > > Pinterest as well as several feature development including
> > > > > rebalance
> > > > > > > > > improvements (KIP-345) and exactly-once scalability
> > > improvements
> > > > > > > > > (KIP-447)
> > > > > > > > > in various Kafka Summit and Kafka Meetups. More recently
> he's
> > > > also
> > > > > been
> > > > > > > > > participating in Kafka broker development including
> > > > post-Zookeeper
> > > > > > > > > controller design (KIP-500). Besides all the code
> > > contributions,
> > > > > Boyang
> > > > > > > > > has
> > > > > > > > > also helped reviewing even more PRs and KIPs than his own.
> > > > > > > > >
> > > > > > > > > Thanks for all the contributions Boyang! And look forward
> to
> > > more
> > > > > > > > > collaborations with you on Apache Kafka.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > -- Guozhang, on behalf of the Apache Kafka PMC
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Leonard Ge
> > > > Software Engineer Intern - Confluent
> > > >
> > >
> > >
> > > --
> > >
> > > Thanks!
> > > --Vahid
> > >
> >
>


Re: [DISCUSS] KIP-317 - Add end-to-end data encryption functionality to Apache Kafka

2020-05-16 Thread Adam Bellemare
Hi Sönke

I've been giving it more thought over the last few days, and looking into
other systems as well, and I think that I've derailed your proposal a bit
with suggesting that at-rest encryption may be sufficient. I believe that
many of us are lacking the context of the sorts of discussions you have had
with stakeholders concerned about encryption. Anyways, a very brief
abbreviation of my thoughts:

1) We should look to do encryption at-rest, but it should be outside the
scope of this KIP. (Is disk encryption as provided by the OS or cloud
provider sufficient?)

2) For end-to-end encryption, the part that concerns me is the various
roles that the broker may play in this plan. For instance, in Phase 2:

> This phase will concentrate on server-side configuration of encryption.
Topic settings will be added that allow the specification of encryption
settings that consumers and producers should use. Producers and Consumers
will be enabled to fetch these settings and use them for encryption without
the end-user having to configure anything in addition.

> Brokers will be extended with pluggable Key Managers that will allow for
automatic key rotation later on. A basic, keystore based implementation
will be created.
Again, I am not a security expert, but it seems to me that if we want
end-to-end encryption on par with the sort of encryption we see in our
RelationalDB cousins, it would require that the broker (which could be
hosted remotely, with a potentially malicious admin) have no knowledge of
any of the keys, nor be responsible for any sort of key rotation. I believe
that all of this would be required to be handled by the clients themselves
(though please correct me if I am misinterpreting this), and that to reduce
attack surface possibilities, we should handle the encryption + decryption
keys in a manner similar to how we handle TLS keys (client must supply
their own).

Ryanne does point out that automatic key-rotation of end-to-end encrypted
data would be an incredibly useful feature to have. However, I am not sure
how to square this against what is done with relational databases, as it
seems that they require that the client perform any updates or changes to
the encryption keys and data and wash their hands completely of that duty
(which makes sense - keep the database out of it, reduce the attack
surface). End-to-end, by definition requires that the broker be unable to
decrypt any of the data, and having it responsible for rolling keys, while
seemingly useful, does deftly throw end-to-end out the window.

Final Q:
Would it be reasonable to create a new optional service in the Kafka
project that is strictly responsible for these sorts of encryption matters?
Something like Confluent's schema registry, but as a mechanism for
coordinating key rotations with clients, encryption key registrations per
topic, etc.? KeyManager would plug into here, could use Kafka as the
storage layer for the keys (as we do with schemas, but encrypted themselves
of course) or use the whole thing as just a thin layer over a full blown
remote KeyManager that simply coordinates the producers, consumers, and
keys required for the data per topic. This independent service would give
organizations the ability to host it locally for security purposes, while
farming out the brokers to perhaps less trustworthy sources?

Adam






On Sun, May 10, 2020 at 7:52 PM Adam Bellemare 
wrote:

> @Ryanne
> > Seems that could still get us per-topic keys (vs encrypting the entire
> > volume), which would be my main requirement.
>
> Agreed, I think that per-topic separation of keys would be very valuable
> for multi-tenancy.
>
>
> My 2 cents is that if encryption at rest is sufficient to satisfy GDPR +
> other similar data protection measures, then we should aim to do that
> first. The demand is real and privacy laws wont likely be loosening any
> time soon. That being said, I am not sufficiently familiar with the myriad
> of data laws. I will look into it some more though, as I am now curious.
>
>
> On Sat, May 9, 2020 at 6:12 PM Maulin Vasavada 
> wrote:
>
>> Hi Sonke
>>
>> Thanks for bringing this for discussion. There are lot of considerations
>> even if we assume we have end-to-end encryption done. Example depending
>> upon company's setup there could be restrictions on how/which encryption
>> keys are shared. Environment could have multiple security and network
>> boundaries beyond which keys are not allowed to be shared. That will mean
>> that consumers may not be able to decrypt the messages at all if the data
>> is moved from one zone to another. If we have mirroring done, are
>> mirror-makers supposed to decrypt and encrypt again OR they would be
>> pretty
>> much bytes-in bytes-out paradigm that it is today? Also having a polyglot
>> Kafka client base will force you to support encryption/decr

Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-05-12 Thread Adam Bellemare
Hi Guozhang

For clarity, the issues I was running into was not about the actual
*prefixSeek* function itself, but about exposing it to the same level of
access as the *range* function throughout Kafka Streams. It required a lot
of changes, and also required that most state stores stub it out since it
wasn't clear how they would implement it. It was basically an overreaching
API change that was easily solved (for the specific prefix-scan in FKJ) by
simply using *range*. So to be clear, the blockers were predominantly
around correctly handling the API changes, nothing to do with the
mechanisms of the RocksDB prefix scanning.

As for KAFKA-5285 I'll look into it more to see if I can get a better
handle on the problem!

Hope this helps clear it up.

Adam


On Tue, May 12, 2020 at 7:16 PM Guozhang Wang  wrote:

> Hello Adam,
>
> I'm wondering if you can provide a bit more context on the blockers of
> using prefixSeek of RocksDB (I saw you have a RocksDBPrefixIterator class
> but not used anywhere yet)? I'm currently looking at ways to allow some
> secondary indices with rocksDB following some existing approaches
> from CockroachDB etc so I'm very curious to learn your experience.
>
> 1) Before considering any secondary indices, a quick thought is that for
> (key, timeFrom, timeTo) queries, we can easily replace the current
> `range()` impl with a `prefixRange()` impl via a prefix iterator; though
> for (keyFrom, keyTo, timeFrom, timeTo) it is much more complicated indeed
> and hence existing `range()` impl may still be used.
>
> 2) Another related issue I've been pondering for a while is
> around KAFKA-5285: with the default lexicograpic byte comparator, since the
> key length varies, the combo (key, window) would have interleaving byte
> layouts like:
>
> AAA0001  (key AAA, timestamp 0001)
> AAA00011(key AAA0, timestamp 0011)
> AAA0002  (key AAA, timestamp 0002)
>
> which is challenging for prefix seeks to work efficiently. Although we can
> overwrite the byte-comparator in JNI it is very expensive and the cost of
> JNI overwhelms its benefits. If you've got some ideas around it please lmk
> as well.
>
> Guozhang
>
>
>
>
> On Tue, May 12, 2020 at 6:26 AM Adam Bellemare 
> wrote:
>
> > Hi Sagar
> >
> > I implemented a very similar interface for KIP-213, the foreign-key
> joiner.
> > We pulled it out of the final implementation and instead used RocksDB
> range
> > instead. You can see the particular code where we use RocksDB.range(...)
> to
> > get the same iterator result.
> >
> >
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java#L95
> >
> > We pulled it out because there were numerous awkward acrobatics to
> > integrate *prefixSeek()* function into the Kafka Streams code.
> Basically, I
> > wanted to be able to access *prefixSeek()* the same way I can access
> > *range()* for any state store, and in particular use it for storing data
> > with a particular foreign key (as per the previous URL). However, I found
> > out that it required way too many changes to expose the *prefixSeek()*
> > functionality while still being able to leverage all the nice Kafka
> Streams
> > state management + supplier functionality, so we made a decision just to
> > stick with *range()* and pull everything else out.
> >
> > I guess my question here is, how do you anticipate using *prefixSeek()*
> > within the framework of Kafka Streams, or the Processor API?
> >
> > Adam
> >
> >
> >
> > On Tue, May 12, 2020 at 2:52 AM Sagar  wrote:
> >
> > > Hi All,
> > >
> > > I would like to start a discussion on the KIP that I created below to
> add
> > > prefix scan support in State Stores:
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
> > >
> > > Thanks!
> > > Sagar.
> > >
> >
>
>
> --
> -- Guozhang
>


Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-05-12 Thread Adam Bellemare
Hi Sagar

I implemented a very similar interface for KIP-213, the foreign-key joiner.
We pulled it out of the final implementation and instead used RocksDB range
instead. You can see the particular code where we use RocksDB.range(...) to
get the same iterator result.

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java#L95

We pulled it out because there were numerous awkward acrobatics to
integrate *prefixSeek()* function into the Kafka Streams code. Basically, I
wanted to be able to access *prefixSeek()* the same way I can access
*range()* for any state store, and in particular use it for storing data
with a particular foreign key (as per the previous URL). However, I found
out that it required way too many changes to expose the *prefixSeek()*
functionality while still being able to leverage all the nice Kafka Streams
state management + supplier functionality, so we made a decision just to
stick with *range()* and pull everything else out.

I guess my question here is, how do you anticipate using *prefixSeek()*
within the framework of Kafka Streams, or the Processor API?

Adam



On Tue, May 12, 2020 at 2:52 AM Sagar  wrote:

> Hi All,
>
> I would like to start a discussion on the KIP that I created below to add
> prefix scan support in State Stores:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
>
> Thanks!
> Sagar.
>


Re: [DISCUSS] KIP-317 - Add end-to-end data encryption functionality to Apache Kafka

2020-05-10 Thread Adam Bellemare
@Ryanne
> Seems that could still get us per-topic keys (vs encrypting the entire
> volume), which would be my main requirement.

Agreed, I think that per-topic separation of keys would be very valuable
for multi-tenancy.


My 2 cents is that if encryption at rest is sufficient to satisfy GDPR +
other similar data protection measures, then we should aim to do that
first. The demand is real and privacy laws wont likely be loosening any
time soon. That being said, I am not sufficiently familiar with the myriad
of data laws. I will look into it some more though, as I am now curious.


On Sat, May 9, 2020 at 6:12 PM Maulin Vasavada 
wrote:

> Hi Sonke
>
> Thanks for bringing this for discussion. There are lot of considerations
> even if we assume we have end-to-end encryption done. Example depending
> upon company's setup there could be restrictions on how/which encryption
> keys are shared. Environment could have multiple security and network
> boundaries beyond which keys are not allowed to be shared. That will mean
> that consumers may not be able to decrypt the messages at all if the data
> is moved from one zone to another. If we have mirroring done, are
> mirror-makers supposed to decrypt and encrypt again OR they would be pretty
> much bytes-in bytes-out paradigm that it is today? Also having a polyglot
> Kafka client base will force you to support encryption/decryption libraries
> that work for all the languages and that may not work depending upon the
> scope of the team owning Kafka Infrastructure.
>
> Combining disk encryption with TLS+ACLs could be enough instead of having
> end-to-end message level encryption. What is your opinion on that?
>
> We have experimented with end-to-end encryption with custom
> serializers/deserializers and I felt that was good enough because
> other challenges I mentioned before may not be ease to address with a
> generic solution.
>
> Thanks
> Maulin
>
>
>
> Thanks
> Maulin
>
>
>
>
> On Sat, May 9, 2020 at 2:05 PM Ryanne Dolan  wrote:
>
> > Adam, I agree, seems reasonable to limit the broker's responsibility to
> > encrypting only data at rest. I guess whole segment files could be
> > encrypted with the same key, and rotating keys would just involve
> > re-encrypting entire segments. Maybe a key rotation would involve closing
> > all affected segments and kicking off a background task to re-encrypt
> them.
> > Certainly that would not impede ingestion of new records, and seems
> > consumers could use the old segments until they are replaced with the
> newly
> > encrypted ones.
> >
> > Seems that could still get us per-topic keys (vs encrypting the entire
> > volume), which would be my main requirement.
> >
> > Not really "end-to-end", but combined with TLS or something, seems
> > reasonable.
> >
> > Ryanne
> >
> > On Sat, May 9, 2020, 11:00 AM Adam Bellemare 
> > wrote:
> >
> > > Hi All
> > >
> > > I typed up a number of replies which I have below, but I have one major
> > > overriding question: Is there a reason we aren't implementing
> > > encryption-at-rest almost exactly the same way that most relational
> > > databases do? ie:
> > > https://wiki.postgresql.org/wiki/Transparent_Data_Encryption
> > >
> > > I ask this because it seems like we're going to end up with something
> > > similar to what they did in terms of requirements, plus...
> > >
> > > "For the *past 16 months*, there has been discussion about whether and
> > how
> > > to implement Transparent Data Encryption (tde) in Postgres. Many other
> > > relational databases support tde, and *some security standards require*
> > it.
> > > However, it is also debatable how much security value tde provides.
> > > The tde *400-email
> > > thread* became difficult for people to follow..."
> > > What still isn't clear to me is the scope that we're trying to cover
> > here.
> > > Encryption at rest suggests that we need to have the data encrypted on
> > the
> > > brokers, and *only* on the brokers, since they're the durable units of
> > > storage. Any encryption over the wire should be covered by TLS.  I
> think
> > > that our goals for this should be (from
> > >
> >
> https://wiki.postgresql.org/wiki/Transparent_Data_Encryption#Threat_models
> > > )
> > >
> > > > TDE protects data from theft when file system access controls are
> > > > compromised:
> > > >
> > > >- Malicious user steals storage devices and reads database files
> > > >dire

Re: [DISCUSS] KIP-317 - Add end-to-end data encryption functionality to Apache Kafka

2020-05-09 Thread Adam Bellemare
Hi All

I typed up a number of replies which I have below, but I have one major
overriding question: Is there a reason we aren't implementing
encryption-at-rest almost exactly the same way that most relational
databases do? ie:
https://wiki.postgresql.org/wiki/Transparent_Data_Encryption

I ask this because it seems like we're going to end up with something
similar to what they did in terms of requirements, plus...

"For the *past 16 months*, there has been discussion about whether and how
to implement Transparent Data Encryption (tde) in Postgres. Many other
relational databases support tde, and *some security standards require* it.
However, it is also debatable how much security value tde provides.
The tde *400-email
thread* became difficult for people to follow..."
What still isn't clear to me is the scope that we're trying to cover here.
Encryption at rest suggests that we need to have the data encrypted on the
brokers, and *only* on the brokers, since they're the durable units of
storage. Any encryption over the wire should be covered by TLS.  I think
that our goals for this should be (from
https://wiki.postgresql.org/wiki/Transparent_Data_Encryption#Threat_models)

> TDE protects data from theft when file system access controls are
> compromised:
>
>- Malicious user steals storage devices and reads database files
>directly.
>- Malicious backup operator takes backup.
>- Protecting data at rest (persistent data)
>
> This does not protect from users who can read system memory, e.g., shared
> buffers, which root users can do.
>

I am not a security expert nor am I an expert on relational databases.
However, I can't identify any reason why the approach outlined by
PostgresDB, which is very similar to MySQL/InnoDB and IBM (from my
understanding) wouldn't work for data-at-rest encryption. In addition, we'd
get the added benefit of being consistent with other solutions, which is an
easier sell when discussing security with management (Kafka? Oh yeah, their
encryption solution is just like the one we already have in place for our
Postgres solutions), and may let us avoid reinventing a good part of the
wheel.


--

@Ryanne
One more complicating factor, regarding joins - the foreign key joiner
requires access to the value to extract the foreign key - if it's
encrypted, the FKJ would need to decrypt it to apply the value extractor.

@Soenk re (1)
> When people hear that this is not part of Apache Kafka itself, but that
> would need to develop something themselves that more often than not is the
> end of that discussion. Using something that is not "stock" is quite often
> simply not an option.

> I strongly feel that this is a needed feature in Kafka and that there is a
> large number of people out there that would want to use it - but I may
very
> well be mistaken, responses to this thread have not exactly been plentiful
> this last year and a half..

I agree with you on the default vs. non-default points made. We must all
note that this mailing list is *not *representative of the typical users of
Kafka, and that many organizations are predominantly looking to use
out-of-the-box solutions. This will only become more common as hosted Kafka
solutions (think AWS hosted Kafka) gain more traction. I think the goal of
this KIP to provide that out-of-the-box experience is extremely important,
especially for all the reasons noted so far (GDPR, privacy, financials,
interest by many parties but no default solution).

re: (4)
>> Regarding plaintext data in RocksDB instances, I am a bit torn to be
>> honest. On the one hand, I feel like this scenario is not something that
we
>> can fully control.

I agree with this in principle. I think that our responsibility to encrypt
data at rest ends the moment that data leaves the broker. That being said,
it isn't unreasonable. I am going to think more about this and see if I can
come up with something.





On Fri, May 8, 2020 at 5:05 AM Sönke Liebau
 wrote:

> Hey everybody,
>
> thanks a lot for reading and giving feedback!! I'll try and answer all
> points that I found going through the thread in this mail, but if I miss
> something please feel free to let me know! I've added a running number to
> the discussed topics for ease of reference down the road.
>
> I'll go through the KIP and update it with everything that I have written
> below after sending this mail.
>
> @Tom:
> (1) If I understand your concerns correctly you feel that this
> functionality would have a hard time getting approved into Apache Kafka
> because it can be achieved with custom Serializers in the same way and that
> we should maybe develop this outside of Apache Kafka at first.
> I feel like it is precisely the fact that this is not part of core Apache
> Kafka that makes people think twice about doing end-to-end encryption. I
> may be working in a market (Germany) that is a bit special when compared to
> the rest of the world where encryption and things like that are 

[jira] [Resolved] (KAFKA-9732) Kafka Foreign-Key Joiner has unexpected default value used when a table is created via a stream+groupByKey+reduce

2020-03-23 Thread Adam Bellemare (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Bellemare resolved KAFKA-9732.
---
Resolution: Not A Problem

Issue was with reporter's usage of the API.

> Kafka Foreign-Key Joiner has unexpected default value used when a table is 
> created via a stream+groupByKey+reduce
> -
>
> Key: KAFKA-9732
> URL: https://issues.apache.org/jira/browse/KAFKA-9732
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1
>    Reporter: Adam Bellemare
>Priority: Major
>
> I'm upgrading some internal business code that used to use a prototype 
> version of the FKJoiner, migrating to the 2.4.1 Kafka release. I am running 
> into an issue where the joiner is using the default Serde, despite me clearly 
> specifying NOT to use the default serde (unless I am missing something!). 
> Currently, this is how I generate the left KTable, used in the 
> _*leftTable.join(rightTable, ...)*_ FKJoin.
> Let's call this process 1:
> {code:scala}
> val externalMyKeySerde = ... //Confluent Kafka S.R. serde.
> val externalMyValueSerde = ...//Confluend Kafka S.R. value serde
> val myConsumer = Consumed.`with`(externalMyKeySerde, externalMyValueSerde)
> //For wrapping nulls in mapValues below
> case class OptionalDeletable[T](elem: Option[T])
> //Internal Serdes that do NOT use the SR
> //Same serde logic as externalMyKeySerde, but doesn't register schemas to 
> schema registry.
> val internalMyKeySerde = ... 
> //Same serde logic as externalMyValueSerde, but doesn't register schemas to 
> schema registry.
> val internalOptionalDeletableMyValueSerde: Serde[OptionalDeletable[MyValue]] 
> = ... 
> val myLeftTable: KTable[MyKey, MyValue] =
>   streamBuilder.stream[MyKey, MyValue]("inputTopic")(myConsumer)
> .mapValues(
>   v => {
> //We need the nulls to propagate deletes.
> //Wrap this in a simple case-class because we can't 
> groupByKey+reduce null values as they otherwise get filtered out. 
> OptionalDeletable(Some(v))
>   }
> )
> .groupByKey(Grouped.`with`(internalMyKeySerde, 
> internalOptionalDeletableMyValueSerde))
> .reduce((_,x) => x)(
> Materialized.as("myLeftTable")(internalMyKeySerde, 
> internalOptionalDeletableMyValueSerde))
> .mapValues(v => v.elem.get) //Unwrap the element
> {code}
> Next, we create the right table and specify the FKjoining logic
> {code:scala}
> //This is created in an identical way to Process 1... I wont show it here for 
> brevity.
> val rightTable: KTable[RightTableKey, RightTableValue] = 
> streamBuilder.table(...)
> //Not showing previous definitions because I don't think they're relevant to 
> this issue...
> val itemMaterialized =
> Materialized.as[MyKey, JoinedOutput, KeyValueStore[Bytes, 
> Array[Byte]]]("materializedOutputTable")(
>   internalMyKeySerde, internalJoinedOutputSerde)
> val joinedTable = myLeftTable.join[JoinedOutput, RightTableKey, 
> RightTableValue](
>   rightTable, foreignKeyExtractor, joinerFunction, 
> materializedOutputTable)
> //Force evaluation to output some data
> joinedTable.toStream.to("outputStream")
> {code}
> When I execute this with leftTable generated via process 1, I end up somehow 
> losing the leftTable serde along the way and end up falling back onto the 
> default serde. This results in a runtime exception as follows:
> {code:java}
> 
> Caused by: java.lang.ClassCastException: com.bellemare.sample.MyValue cannot 
> be cast to [B
>   at 
> org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:94)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:71)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
>   ... 30 more
> {code}
> Now, if I change process 1 to the following:
> Process 2:
> {code:scala}
> val externalMyKeySerde = ... //Confluent Kafka S.R. serde.
> val externalMyValueSerde = ...//Confluend Kafka S.R. value serde
> val myCons

[jira] [Created] (KAFKA-9732) Kafka Foreign-Key Joiner has unexpected default value used when a table is created via a stream+groupByKey+reduce

2020-03-18 Thread Adam Bellemare (Jira)
Adam Bellemare created KAFKA-9732:
-

 Summary: Kafka Foreign-Key Joiner has unexpected default value 
used when a table is created via a stream+groupByKey+reduce
 Key: KAFKA-9732
 URL: https://issues.apache.org/jira/browse/KAFKA-9732
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.4.1, 2.4.0
Reporter: Adam Bellemare


I'm upgrading some internal business code that used to use a prototype version 
of the FKJoiner, migrating to the 2.4.1 Kafka release. I am running into an 
issue where the joiner is using the default Serde, despite me clearly 
specifying NOT to use the default serde (unless I am missing something!). 
Currently, this is how I generate the left KTable, used in the 
_*leftTable.join(rightTable, ...)*_ FKJoin.

Let's call this process 1:
{code:scala}
val externalMyKeySerde = ... //Confluent Kafka S.R. serde.
val externalMyValueSerde = ...//Confluend Kafka S.R. value serde
val myConsumer = Consumed.`with`(externalMyKeySerde, externalMyValueSerde)

//For wrapping nulls in mapValues below
case class OptionalDeletable[T](elem: Option[T])

//Internal Serdes that do NOT use the SR
//Same serde logic as externalMyKeySerde, but doesn't register schemas to 
schema registry.
val internalMyKeySerde = ... 
//Same serde logic as externalMyValueSerde, but doesn't register schemas to 
schema registry.
val internalOptionalDeletableMyValueSerde: Serde[OptionalDeletable[MyValue]] = 
... 

val myLeftTable: KTable[MyKey, MyValue] =
  streamBuilder.stream[MyKey, MyValue]("inputTopic")(myConsumer)
.mapValues(
  v => {
//We need the nulls to propagate deletes.
//Wrap this in a simple case-class because we can't 
groupByKey+reduce null values as they otherwise get filtered out. 
OptionalDeletable(Some(v))
  }
)
.groupByKey(Grouped.`with`(internalMyKeySerde, 
internalOptionalDeletableMyValueSerde))
.reduce((_,x) => x)(
Materialized.as("myLeftTable")(internalMyKeySerde, 
internalOptionalDeletableMyValueSerde))
.mapValues(v => v.elem.get) //Unwrap the element
{code}

Next, we create the right table and specify the FKjoining logic
{code:scala}
//This is created in an identical way to Process 1... I wont show it here for 
brevity.
val rightTable: KTable[RightTableKey, RightTableValue] = 
streamBuilder.table(...)

//Not showing previous definitions because I don't think they're relevant to 
this issue...
val itemMaterialized =
Materialized.as[MyKey, JoinedOutput, KeyValueStore[Bytes, 
Array[Byte]]]("materializedOutputTable")(
  internalMyKeySerde, internalJoinedOutputSerde)

val joinedTable = myLeftTable.join[JoinedOutput, RightTableKey, 
RightTableValue](
  rightTable, foreignKeyExtractor, joinerFunction, materializedOutputTable)

//Force evaluation to output some data
joinedTable.toStream.to("outputStream")
{code}

When I execute this with leftTable generated via process 1, I end up somehow 
losing the leftTable serde along the way and end up falling back onto the 
default serde. This results in a runtime exception as follows:
{code:java}

Caused by: java.lang.ClassCastException: com.bellemare.sample.MyValue cannot be 
cast to [B
at 
org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
at 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:94)
at 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:71)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
... 30 more
{code}
Now, if I change process 1 to the following:

Process 2:
{code:scala}
val externalMyKeySerde = ... //Confluent Kafka S.R. serde.
val externalMyValueSerde = ...//Confluend Kafka S.R. value serde
val myConsumer = Consumed.`with`(externalMyKeySerde, externalMyValueSerde)

val myLeftTable: KTable[MyKey, MyValue] =
  streamBuilder.table[MyKey, MyValue]("inputTopic")(myConsumer)
//The downside of this approach is that we end up registering a bunch of 
internal topics to the schema registry (S.R.), significantly increasing the 
clutter in our lookup UI.
{code}
Everything works as expected, and the expected `_*externalMyValueSerde*_` is 
used to serialize the events (though I don't want this, as it registers to the 
SR and clutters it up).

I don't think I'm missing any Serdes inputs anywhere in the DSL, but I'm having 
a hard time figuring out *if this is normal existing behaviour for how a KTable 
is created via* *Process 1* or if I'm stumbling upon a bug somewhere. When I 
tr

Re: Can I ask protocol questions here?

2020-03-09 Thread Adam Bellemare
Hi Chris 

I think it’s fine to ask it here. I’m not aware of any rules against it.

Adam 

> On Mar 9, 2020, at 10:43 AM, Chris Done  wrote:
> 
> Hi all,
> 
> I'm writing a Kafka client at the protocol level and was wondering whether 
> here, or the users@ mailing list was more appropriate for questions of that 
> nature?
> 
> I looked on the web site, but didn't see clarification on this point.
> 
> I'll start a fresh thread if here is indeed the correct place. I have a 
> question to ask about fetch requests not working in specific conditions.
> 
> Thanks,
> 
> Chris


Re: Mistake in official documentation ?

2020-02-06 Thread Adam Bellemare
Screenshot didn't arrive for me - you may need to host it in an image site
(ie: imgur), not sure if the mailing list will allow it to be attached.

It could very well be a mistake. highlight it and send it to
dev@kafka.apache.org and if it is, we'll make a ticket and address it.

On Thu, Feb 6, 2020 at 10:53 AM Fares Oueslati 
wrote:

> Hello,
>
> While going through the official docs
> https://kafka.apache.org/documentation/#messageformat
>
> If I'm not wrong, I believe there is a mismatch between description of a
> segment and the diagram illustrating the concept.
>
> I pointed out the issue in the attached screenshot.
>
> Didn't really know where to ask about this, so sorry if it's not the
> appropriate place.
>
> Regards,
> Fares
>


Re: Is there a way to auto scale topic partitions in kafka?

2020-01-21 Thread Adam Bellemare
There is no way to automatically scale it, but you could write a script to
increase the partition count using the command line tools, and trigger it
on certain metrics.

One thing to consider is that any *keyed* events would need to be rewritten
to topics that have their partition count increased. This is to ensure that
keyed data locality is preserved within each partition, such that all
events of a single key stay in a single partition. If you don't care about
data-locality, then you can increase the partition count without concern.





On Tue, Jan 21, 2020 at 11:35 AM Pushkar Deole  wrote:

> Hello Dev community,
>
> Got no response from user community on below query. Can you respond back on
> this please?
>
> -- Forwarded message -
> From: Pushkar Deole 
> Date: Fri, Jan 17, 2020 at 1:46 PM
> Subject: Is there a way to auto scale topic partitions in kafka?
> To: 
>
>
> Hello,
>
> I am working on developing a microservice based system which uses kafka as
> a messaging infrastructure. The microservices application are mainly kafka
> consumers and kafka streams applications and are deployed as docker
> containers on kubernetes.
>
> The system should be designed to be auto scalable for which we are using
> Horizontal Pod Autoscaler feature of kubernetes which allows to instantiate
> more number of pods if a certain metric (e.g. cpu utilization) touches the
> threshold or reduce the pods in case the metric is way below the threshold.
> However, the problem is number of partitions in kafka are fixed so even if
> load on the system increases and the number of consumer pods are
> autoscaled, it could not be scaled beyond the number of partitions.
> So, after a point where number of pods is equal to number of partitions,
> the system can't be scaled beyond that.
> Is there a way to autoscale number of partitions also in kafka so the
> system can be auto scaled in cloud?
>


Re: [RESULTS] [VOTE] Release Kafka version 2.4.0

2019-12-14 Thread Adam Bellemare
Yes, thank you very much Manikumar!

> On Dec 14, 2019, at 1:02 AM, Ismael Juma  wrote:
> 
> Thanks for driving the release Manikumar!
> 
> Ismael
> 
>> On Fri, Dec 13, 2019, 5:42 PM Manikumar  wrote:
>> 
>> This vote passes with 6 +1 votes (3 bindings) and no 0 or -1 votes.
>> 
>> +1 votes
>> PMC Members:
>> * Gwen Shapira
>> * Jun Rao
>> * Guozhang Wang
>> 
>> Committers:
>> * Mickael Maison
>> 
>> Community:
>> * Adam Bellemare
>> * Israel Ekpo
>> 
>> 0 votes
>> * No votes
>> 
>> -1 votes
>> * No votes
>> 
>> Vote thread:
>> https://markmail.org/message/qlira627sqbmmzz4
>> 
>> I'll continue with the release process and the release announcement will
>> follow in the next few days.
>> 
>> Manikumar
>> 


Re: [VOTE] 2.4.0 RC4

2019-12-10 Thread Adam Bellemare
- All PGP signatures are good
- All md5, sha1sums and sha512sums pass

Initial test results:
1310 tests completed, 2 failed, 17 skipped

> Task :core:integrationTest FAILED

The failed tests:
SaslSslAdminClientIntegrationTest. testElectPreferredLeaders
SslAdminClientIntegrationTest.
testSynchronousAuthorizerAclUpdatesBlockRequestThreads

Both failed due to timeout:

java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.TimeoutException: Aborted due to
timeout.

Reran the tests and both passed.

+1 from me.





On Mon, Dec 9, 2019 at 12:32 PM Manikumar  wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the fifth candidate for release of Apache Kafka 2.4.0.
>
> This release includes many new features, including:
> - Allow consumers to fetch from closest replica
> - Support for incremental cooperative rebalancing to the consumer rebalance
> protocol
> - MirrorMaker 2.0 (MM2), a new multi-cluster, cross-datacenter replication
> engine
> - New Java authorizer Interface
> - Support for non-key joining in KTable
> - Administrative API for replica reassignment
> - Sticky partitioner
> - Return topic metadata and configs in CreateTopics response
> - Securing Internal connect REST endpoints
> - API to delete consumer offsets and expose it via the AdminClient.
>
> Release notes for the 2.4.0 release:
> https://home.apache.org/~manikumar/kafka-2.4.0-rc4/RELEASE_NOTES.html
>
> *** Please download, test and vote by Thursday, December 12, 9am PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> https://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> https://home.apache.org/~manikumar/kafka-2.4.0-rc4/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> * Javadoc:
> https://home.apache.org/~manikumar/kafka-2.4.0-rc4/javadoc/
>
> * Tag to be voted upon (off 2.4 branch) is the 2.4.0 tag:
> https://github.com/apache/kafka/releases/tag/2.4.0-rc4
>
> * Documentation:
> https://kafka.apache.org/24/documentation.html
>
> * Protocol:
> https://kafka.apache.org/24/protocol.html
>
> Thanks,
> Manikumar
>


Re: [VOTE] 2.4.0 RC1

2019-11-28 Thread Adam Bellemare
mjsax found an important issue for the foreign-key joiner, which I think
should be a blocker (if it isn't already) since it is functionally
incorrect without the fix:

https://github.com/apache/kafka/pull/7758



On Tue, Nov 26, 2019 at 6:26 PM Sean Glover 
wrote:

> Hi,
>
> I also used Eric's test script.  I had a few issues running it that I
> address below[0][1], otherwise looks good.
>
> - Signing keys all good
> - All md5, sha1sums and sha512sums are good
> - A couple transient test failures that passed on a second run
> (ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker,
> SaslScramSslEndToEndAuthorizationTest.
> testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl)
> - Passes our own test suite for Alpakka Kafka (
> https://travis-ci.org/akka/alpakka-kafka/builds/616861540,
> https://github.com/akka/alpakka-kafka/pull/971)
>
> +1 (non-binding)
>
> ..
>
> Issues while running test script:
>
> [0] Error with Eric test script.  I had an issue running the script with my
> version of bash (TMPDIR was unassigned), which I provided a PR for (
> https://github.com/elalonde/kafka/pull/1)
> [1] Gradle incompatibility. I ran into difficulty running the gradle build
> with the latest version of gradle (6.0.1).  I had to revert to the last
> patch of version 5 (5.6.4):
>
>  ✘ seglo@slice  /tmp/verify-kafka-SP06GE1GpP/10169.out/kafka-2.4.0-src 
> gradle wrapper --warning-mode all
>
> > Configure project :
> The maven plugin has been deprecated. This is scheduled to be removed in
> Gradle 7.0. Please use the maven-publish plugin instead.
> at
>
> build_c0129pbfzzxjolwxmds3lsevz$_run_closure5.doCall(/tmp/verify-kafka-SP06GE1GpP/10169.out/kafka-2.4.0-src/build.gradle:160)
> (Run with --stacktrace to get the full stack trace of this
> deprecation warning.)
>
> FAILURE: Build failed with an exception.
>
> * Where:
> Build file
> '/tmp/verify-kafka-SP06GE1GpP/10169.out/kafka-2.4.0-src/build.gradle' line:
> 472
>
> * What went wrong:
> A problem occurred evaluating root project 'kafka-2.4.0-src'.
> > Could not create task ':clients:spotbugsMain'.
>> Could not create task of type 'SpotBugsTask'.
>   > Could not create an instance of type
> com.github.spotbugs.internal.SpotBugsReportsImpl.
>  >
>
> org.gradle.api.reporting.internal.TaskReportContainer.(Ljava/lang/Class;Lorg/gradle/api/Task;)V
>
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or
> --debug option to get more log output. Run with --scan to get full
> insights.
>
> * Get more help at https://help.gradle.org
>
> BUILD FAILED in 699ms
>
> On Tue, Nov 26, 2019 at 1:31 PM Manikumar 
> wrote:
>
> > Hi All,
> >
> > Please download, test and vote the RC1 in order to provide quality
> > assurance for the forthcoming 2.4 release.
> >
> > Thanks.
> >
> > On Tue, Nov 26, 2019 at 8:11 PM Adam Bellemare  >
> > wrote:
> >
> > > Hello,
> > >
> > > Ran Eric's test script:
> > > $ git clone https://github.com/elalonde/kafka
> > > $ ./kafka/bin/verify-kafka-rc.sh 2.4.0
> > > https://home.apache.org/~manikumar/kafka-2.4.0-rc1
> > > <https://home.apache.org/~manikumar/kafka-2.4.0-rc0>
> > >
> > > - All PGP signatures are good
> > > - All md5, sha1sums and sha512sums pass
> > > - Had a few intermittent failures in tests that passed upon rerunning.
> > >
> > > +1 (non-binding) from me.
> > >
> > > Adam
> > >
> > > On Wed, Nov 20, 2019 at 10:37 AM Manikumar 
> > > wrote:
> > >
> > > > Hello Kafka users, developers and client-developers,
> > > >
> > > > This is the second candidate for release of Apache Kafka 2.4.0.
> > > >
> > > > This release includes many new features, including:
> > > > - Allow consumers to fetch from closest replica
> > > > - Support for incremental cooperative rebalancing to the consumer
> > > rebalance
> > > > protocol
> > > > - MirrorMaker 2.0 (MM2), a new multi-cluster, cross-datacenter
> > > replication
> > > > engine
> > > > - New Java authorizer Interface
> > > > - Support for  non-key joining in KTable
> > > > - Administrative API for replica reassignment
> > > > - Sticky partitioner
> > > > - Return topic metadata and configs in CreateTopics response
> > > > - Securing Internal connect REST endpoints
> > > > - API to delete consumer offsets and expose it via the AdminClient.
> > >

Re: [VOTE] 2.4.0 RC1

2019-11-26 Thread Adam Bellemare
Hello,

Ran Eric's test script:
$ git clone https://github.com/elalonde/kafka
$ ./kafka/bin/verify-kafka-rc.sh 2.4.0
https://home.apache.org/~manikumar/kafka-2.4.0-rc1


- All PGP signatures are good
- All md5, sha1sums and sha512sums pass
- Had a few intermittent failures in tests that passed upon rerunning.

+1 (non-binding) from me.

Adam

On Wed, Nov 20, 2019 at 10:37 AM Manikumar 
wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the second candidate for release of Apache Kafka 2.4.0.
>
> This release includes many new features, including:
> - Allow consumers to fetch from closest replica
> - Support for incremental cooperative rebalancing to the consumer rebalance
> protocol
> - MirrorMaker 2.0 (MM2), a new multi-cluster, cross-datacenter replication
> engine
> - New Java authorizer Interface
> - Support for  non-key joining in KTable
> - Administrative API for replica reassignment
> - Sticky partitioner
> - Return topic metadata and configs in CreateTopics response
> - Securing Internal connect REST endpoints
> - API to delete consumer offsets and expose it via the AdminClient.
>
> Release notes for the 2.4.0 release:
> https://home.apache.org/~manikumar/kafka-2.4.0-rc1/RELEASE_NOTES.html
>
> ** Please download, test and vote by Tuesday, November 26, 9am PT **
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> https://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> https://home.apache.org/~manikumar/kafka-2.4.0-rc1/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> * Javadoc:
> https://home.apache.org/~manikumar/kafka-2.4.0-rc1/javadoc/
>
> * Tag to be voted upon (off 2.4 branch) is the 2.4.0 tag:
> https://github.com/apache/kafka/releases/tag/2.4.0-rc1
>
> * Documentation:
> https://kafka.apache.org/24/documentation.html
>
> * Protocol:
> https://kafka.apache.org/24/protocol.html
>
> Thanks,
> Manikumar
>


Re: [ANNOUNCE] New committer: John Roesler

2019-11-12 Thread Adam Bellemare
Congratulations John, and thanks for all your help on KIP-213!

> On Nov 12, 2019, at 6:24 PM, Bill Bejeck  wrote:
> 
> Congratulations John!
> 
> On Tue, Nov 12, 2019 at 6:20 PM Matthias J. Sax 
> wrote:
> 
>> Congrats John!
>> 
>> 
>>> On 11/12/19 2:52 PM, Boyang Chen wrote:
>>> Great work John! Well deserved
>>> 
>>> On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang 
>> wrote:
>>> 
 Hi Everyone,
 
 The PMC of Apache Kafka is pleased to announce a new Kafka committer,
>> John
 Roesler.
 
 John has been contributing to Apache Kafka since early 2018. His main
 contributions are primarily around Kafka Streams, but have also included
 improving our test coverage beyond Streams as well. Besides his own code
 contributions, John has also actively participated on community
>> discussions
 and reviews including several other contributors' big proposals like
 foreign-key join in Streams (KIP-213). He has also been writing,
>> presenting
 and evangelizing Apache Kafka in many venues.
 
 Congratulations, John! And look forward to more collaborations with you
>> on
 Apache Kafka.
 
 
 Guozhang, on behalf of the Apache Kafka PMC
 
>>> 
>> 
>> 


Re: [VOTE] - KIP-213 (new vote) - Simplified and revised.

2019-10-17 Thread Adam Bellemare
Awesome. Thanks John for fixing this!

On Thu, Oct 17, 2019 at 3:07 PM John Roesler  wrote:

> Hello all,
>
> While writing some new test cases for foreign key joins (as accepted
> in KIP-213), I realized that there was an oversight in the review
> process: we only proposed to add join methods that take a Materialized
> parameter.
>
> This poses an unnecessary burden on users who don't _need_ to
> materialize the join result, which could be quite a lot of state and
> changelog topic data.
>
> I filed https://issues.apache.org/jira/browse/KAFKA-9058 and then
> https://github.com/apache/kafka/pull/7541 to fix it. I've updated the
> KIP page accordingly as well.
>
> This change consists solely of adding the missing method overloads,
> and doesn't change the behavior or semantics of the operation at all.
>
> Thanks,
> -John
>


Re: server-side message filter

2019-08-28 Thread Adam Bellemare
I do not think this will be implemented in the server. Why not simply create a 
consumer that filters as required and outputs to a new topic? 

> On Aug 28, 2019, at 7:48 AM, Xiangyuan LI  wrote:
> 
> Hi,
> I want to know whether kafka has any plan to filter message on
> server-side.this can reduce network flow obviously when many groups
> subscribe the same one topic and only need part of message.


Re: ACL for group creation?

2019-08-22 Thread Adam Bellemare
I see the consumer group much like the username in relational database
access credentials. We routinely use the consumer group name as the means
of identifying the consumer for operational things, such as alerting based
on consumer group lag, autoscaling based on lag, and tooling around
manipulating the consumer group offset for a particular service. The
consumer group allows us to know, operationally, which offsets we are
observing or manipulating, and ideally we would like to limit the set of
consumer groups possible.

In practice, I regularly see people simply append an incrementing integer
to the end of their consumer group (cg, cg1, cg2, cg1234) when they want to
reset their application, INSTEAD of following offset reset or kafka-streams
application reset procedures. Sure, it would be nice to get everyone to
follow recommended procedures, but people do what they CAN do, not what
they're supposed to do. This has significant impact on the brokers and
surrounding tooling (orphaned internal topics with indefinite retention,
falsely-firing lag monitoring, broken auto-scaling), and instead of us
building out layers of supportive tooling to isolate ourselves from it, why
not simply set up ACLs to enforce which consumer groups an application can
and cannot use?

Does this need a KIP? Or is a bug report simply enough? I am unsure how
compatibility would work, so I am leaning towards a KIP...

Thanks
Adam

On Wed, Aug 21, 2019 at 5:59 PM Colin McCabe  wrote:

> I think it's worth considering  separating out the permissions needed to
> create a consumer group from the permissions needed to join one.  We
> distinguish these permissions for topics, and people generally find it
> useful.  We could start checking CREATE on GROUP, perhaps?  It might be
> hard to do in a compatible way.
>
> cheers,
> Colin
>
>
> On Wed, Aug 21, 2019, at 12:05, Adam Bellemare wrote:
> > +users mailing list
> >
> > David,
> >
> > I don't think I really understand your email. Are you saying that this
> can
> > already be achieved only using the READ ACL?
> >
> > Thanks
> > Adam
> >
> >
> >
> > On Wed, Aug 21, 2019 at 3:58 AM David Jacot  wrote:
> >
> > > Hello,
> > >
> > > It would be better to ask such question on the user mailing list.
> > >
> > > The reason is that the group is created automatically when a consumer
> > > joins it. It is not created explicitly so it can be restricted.
> > >
> > > In your case, you could setup a ACL to authorize the application to
> only
> > > use the group you have defined. It would prevent the application from
> > > creating new groups. (READ Acl on Group resource with a specific name).
> > >
> > > Best,
> > > David
> > >
> > > On Mon, Aug 19, 2019 at 9:01 PM Adam Bellemare <
> adam.bellem...@gmail.com>
> > > wrote:
> > >
> > > > Hi All
> > > >
> > > > I am looking through the Confluent docs and core Kafka docs and
> don't see
> > > > an ACL for group creation:
> > > >
> https://docs.confluent.io/current/kafka/authorization.html#acl-format
> > > > and
> > > > https://kafka.apache.org/documentation/#security_authz
> > > >
> > > > My scenario is simple: We use the consumer group as the means of
> > > > identifying a single application, including tooling for managing
> > > > application resets, offset management, lag monitoring, etc. We often
> have
> > > > situations where someone resets their consumer group by appending an
> > > > incremented integer ("cg" to "cg1"), but it throws the rest of the
> > > > monitoring and management tooling out of whack.
> > > >
> > > > Is there a reason why we do not have ACL-based CREATE restrictions
> to a
> > > > particular consumer group? I am willing to do the work to implement
> this
> > > > and test it out, but I wanted to validate that there isn't a reason
> I am
> > > > missing.
> > > >
> > > > Thanks
> > > > Adam
> > > >
> > >
> >
>


Re: ACL for group creation?

2019-08-21 Thread Adam Bellemare
+users mailing list

David,

I don't think I really understand your email. Are you saying that this can
already be achieved only using the READ ACL?

Thanks
Adam



On Wed, Aug 21, 2019 at 3:58 AM David Jacot  wrote:

> Hello,
>
> It would be better to ask such question on the user mailing list.
>
> The reason is that the group is created automatically when a consumer
> joins it. It is not created explicitly so it can be restricted.
>
> In your case, you could setup a ACL to authorize the application to only
> use the group you have defined. It would prevent the application from
> creating new groups. (READ Acl on Group resource with a specific name).
>
> Best,
> David
>
> On Mon, Aug 19, 2019 at 9:01 PM Adam Bellemare 
> wrote:
>
> > Hi All
> >
> > I am looking through the Confluent docs and core Kafka docs and don't see
> > an ACL for group creation:
> > https://docs.confluent.io/current/kafka/authorization.html#acl-format
> > and
> > https://kafka.apache.org/documentation/#security_authz
> >
> > My scenario is simple: We use the consumer group as the means of
> > identifying a single application, including tooling for managing
> > application resets, offset management, lag monitoring, etc. We often have
> > situations where someone resets their consumer group by appending an
> > incremented integer ("cg" to "cg1"), but it throws the rest of the
> > monitoring and management tooling out of whack.
> >
> > Is there a reason why we do not have ACL-based CREATE restrictions to a
> > particular consumer group? I am willing to do the work to implement this
> > and test it out, but I wanted to validate that there isn't a reason I am
> > missing.
> >
> > Thanks
> > Adam
> >
>


ACL for group creation?

2019-08-19 Thread Adam Bellemare
Hi All

I am looking through the Confluent docs and core Kafka docs and don't see
an ACL for group creation:
https://docs.confluent.io/current/kafka/authorization.html#acl-format
and
https://kafka.apache.org/documentation/#security_authz

My scenario is simple: We use the consumer group as the means of
identifying a single application, including tooling for managing
application resets, offset management, lag monitoring, etc. We often have
situations where someone resets their consumer group by appending an
incremented integer ("cg" to "cg1"), but it throws the rest of the
monitoring and management tooling out of whack.

Is there a reason why we do not have ACL-based CREATE restrictions to a
particular consumer group? I am willing to do the work to implement this
and test it out, but I wanted to validate that there isn't a reason I am
missing.

Thanks
Adam


Re: KIP-382 + Kafka Streams Question

2019-07-24 Thread Adam Bellemare
Hi Ryanne

> Lemme know if I haven't answered this clearly.

Nope, this was very helpful. Thank you!

> A single "stream" can come from multiple input topics
I overlooked that - I was thinking of simply using the
StreamBuilder.table() functionality instead, but that function doesn't
support a Collection of topics.

Since the topics would be copartitioned by definition, wouldn't the event
dispatcher in PartitionGroup (priorityQueue and streamtime ordering) ensure
that the topics are processed in incrementing streamtime order?

Alternately, I suppose this could be a case where it is a good idea to have
the timestamp of the event within the event's value payload, such that:
StreamBuilder.streams(Set("userEntity", "primary.userEntity"))
.groupByKey()
.reduce()
can allow us to materialize the latest state for a given key.

Thanks Ryanne, this has been a very helpful discussion for me. We are
prototyping the usage of MM2 internally at the moment in anticipation of
its release in 2.4 and want to ensure we have our replication + recovery
strategies sorted out.

Adam

On Tue, Jul 23, 2019 at 7:26 PM Ryanne Dolan  wrote:

> Adam, I think we are converging :)
>
> > "userEntity"...where I only want the latest emailAddress (basic
> materialization) to send an email on account password update.
>
> Yes, you want all "userEntity" data on both clusters. Each cluster will
> have "userEntity" and the remote counterpart
> "secondary/primary.userEntity", as in my example (1). The send-email part
> can run on either cluster (but not both, to avoid duplicate emails),
> subscribing to both "userEntity" and "secondary/primary.userEntity". For
> DR, you can migrate this app between clusters via offset translation and
> the kafka-streams-application-reset tool.
>
> Then, you want a materialize-email-table app running in _both_ clusters,
> so that the latest emails are readily available in RocksDB from either
> cluster. This also subscribes to both "userEntity" and
> "secondary/primary.userEntity" s.t. records originating from either cluster
> are processed.
>
> (Equivalently, send-email and materialize-email-table could be parts of
> the same Streams app, just configured differently, e.g. with send-email
> short-circuited in all but one cluster.)
>
> Under normal operation, your userEntity events are sent to the primary
> cluster (topic: userEntity), processed there via materialize-email-table
> and send-email, and replicated to the secondary cluster (topic:
> primary.userEntity) via MM2. When primary goes down, your producers
> (whatever is sending userEntity events) can failover to the secondary
> cluster (topic: userEntity). This can happen in real-time, i.e. as soon as
> the producer detects an outage or via a load balancer with healthchecks
> etc. So under normal operation, you have all userEntity events in both
> clusters, and both clusters are available for producing to.
>
> N.B. this is not dual-ingest, which would require you always produce
> directly to both clusters. It's active/active, b/c you can produce to
> either cluster at any point in time, and the effect is the same.
>
> > Q1) Where does the producer write its data to if the primary cluster is
> dead?
>
> With active/active like this, you can send to either cluster.
>
> > Q2) How does a Kafka Streams application materialize state from two
> topics?
>
> A Streams app can subscribe to multiple topics. A single "stream" can come
> from multiple input topics (see:
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream-java.util.Collection-
> )
>
> Likewise, a KTable can be materialized from multiple source topics -- in
> this case, userEntity, primary.userEntity and/or secondary.userEntity. You
> can think of these as parts of a "virtual topic", as you described.
>
> > (loaded question, I know)
>
> There is one caveat I can think of: there is no ordering guarantee across
> different topics in the same stream, so materialization could be
> inconsistent between the two clusters if, say, the same users's email was
> changed to different values at the same millisecond in both clusters. This
> may or may not be a problem.
>
> > Q3) ... recommendations on how to handle replication/producing of
> entity-data (ie: userEntity) across multiple clusters...
>
> Lemme know if I haven't answered this clearly.
>
> Ryanne
>
> On Tue, Jul 23, 2019 at 1:03 PM Adam Bellemare 
> wrote:
>
>> Hi Ryanne
>>
>> Thanks for the clarifications! Here is one of my own, as I think it's the
>> biggest stumbling block in my description:
>>
>> *> What is "tabl

Re: KIP-382 + Kafka Streams Question

2019-07-23 Thread Adam Bellemare
table"), and the other cluster (secondary) contains only a
> replication of the data via MM2 ("primary.table").
>
> That, by definition, is not active/active.
>
> >What you seemed to be proposing is that the producer's "table" data is
> sent fully to each cluster, such that the state can be materialized as a
> KTable in each application running on each cluster.
>
> Correct.
>
> > This wouldn't require MM2 at all, so I'm not sure if this is what you
> advocated.
>
> You could use a dual-ingest method and send all your data to both
> clusters, which would not require MM2. There are many issues with this
> approach, primarily wrt to consistency and efficiency.
>
> > The trivial solution seems to be to make your producers produce all
> stateful data (topic "table") to each cluster, which makes MM2 unnecessary,
> but can also lead to data inconsistencies so it's not exactly foolproof.
>
> Yes, that's something like "dual ingest", which I would not recommend.
>
> > SteamsAppPrimary is consuming from ("table")
>
> What is "table" exactly? I am interpreting this as a KTable changelog
> topic, in which case "table" is an output topic of some streams app, i.e.
> the app producing the change events. _This_ is the app I mean to suggest
> you run on both clusters. Then, "table" will appear on both clusters (no
> "primary.table").
>
> The app that is creating the "table" changelog would be processing events
> from some other topic, say "events". Then, this is what I recommend:
>
> Primary cluster:
> Topics: events, secondary.events, table-changelog
> App subscription: events, secondary.events
> App output: table-changelog
>
> Secondary cluster:
> Topics: events, primary.events, table-changelog
> App subscription: events, primary.events
> App output: table-changelog
>
> With this arrangement, the app on either cluster will have built up state
> in RocksDB based on events from both clusters.
>
> Now, it seems you also want a second app to process this changelog. I can
> see a few scenarios:
>
> 1) you want to take some external action based on records in the table
> changelog, e.g. to send an email every time a password is updated. In this
> case, you don't want this app running in both clusters, as you'd get two
> emails. So you could run it in one cluster and use offset translation to
> migrate during failover. The send-email app is stateless, so you just need
> to translate and reset offsets (there is no internal state to rebuild).
>
> 2) you want to use the table changelog in a stateful but non-effecting
> way, e.g. by keeping a running count of records. This app, like the first,
> can be run in both clusters.
>
> 3) you want some combination of state and external actions in one big app.
> In this case, I'd consider splitting your app in two so that you can built
> state in both clusters while effecting external actions in only one cluster
> at a time.
>
> Lemme know if that makes sense.
>
> Ryanne
>
> On Tue, Jul 23, 2019 at 10:19 AM Adam Bellemare 
> wrote:
>
>> Hi Ryanne
>>
>> I think we have inconsistent definitions of Active-Active. The producer
>> is only producing to one cluster (primary) and one topic (topic "table"),
>> and the other cluster (secondary) contains only a replication of the data
>> via MM2 ("primary.table"). What you seemed to be proposing is that the
>> producer's "table" data is sent fully to each cluster, such that the state
>> can be materialized as a KTable in each application running on each
>> cluster. This wouldn't require MM2 at all, so I'm not sure if this is what
>> you advocated.
>>
>> You also state that "As with normal consumers, the Streams app should 
>> *subscribe
>> to any remote topics*, e.g. with a regex, s.t. the application state
>> will reflect input from either source cluster.". Wouldn't this mean that
>> the stateful "table" topic that we wish to materialize would be replicated
>> by MM2 from Primary, such that we end up with the following:
>>
>> *Replicated Entity/Stateful Data:*
>> *Primary Cluster: (Live)*
>> Topic: "table" (contains data from T = 0 to T = n)
>> SteamsAppPrimary is consuming from ("table")
>>
>> *Secondary Cluster: (Live)*
>> Topic: "primary.table" (contains data from T = 0 to T = n)
>> SteamsAppSecondary is consuming from ("primary.table")
>>
>> What does StreamsAppSecondary do when "primary.table" is no longer
>> replicated because Primary ha

Re: KIP-382 + Kafka Streams Question

2019-07-23 Thread Adam Bellemare
Hi Ryanne

I think we have inconsistent definitions of Active-Active. The producer is
only producing to one cluster (primary) and one topic (topic "table"), and
the other cluster (secondary) contains only a replication of the data via
MM2 ("primary.table"). What you seemed to be proposing is that the
producer's "table" data is sent fully to each cluster, such that the state
can be materialized as a KTable in each application running on each
cluster. This wouldn't require MM2 at all, so I'm not sure if this is what
you advocated.

You also state that "As with normal consumers, the Streams app should
*subscribe
to any remote topics*, e.g. with a regex, s.t. the application state will
reflect input from either source cluster.". Wouldn't this mean that the
stateful "table" topic that we wish to materialize would be replicated by
MM2 from Primary, such that we end up with the following:

*Replicated Entity/Stateful Data:*
*Primary Cluster: (Live)*
Topic: "table" (contains data from T = 0 to T = n)
SteamsAppPrimary is consuming from ("table")

*Secondary Cluster: (Live)*
Topic: "primary.table" (contains data from T = 0 to T = n)
SteamsAppSecondary is consuming from ("primary.table")

What does StreamsAppSecondary do when "primary.table" is no longer
replicated because Primary has died? Additionally, where should the
producer of topic "table" now write its data to, assuming that Primary
Cluster is irrevocably lost?

I hope this better outlines my scenario. The trivial solution seems to be
to make your producers produce all stateful data (topic "table") to each
cluster, which makes MM2 unnecessary, but can also lead to data
inconsistencies so it's not exactly foolproof.

Thanks

On Mon, Jul 22, 2019 at 6:32 PM Ryanne Dolan  wrote:

> Hello Adam, thanks for the questions. Yes my organization uses Streams,
> and yes you can use Streams with MM2/KIP-382, though perhaps not in the way
> you are describing.
>
> The architecture you mention is more "active/standby" than "active/active"
> IMO. The "secondary" cluster is not being used until a failure, at which
> point you migrate your app and expect the data to already be there. This
> works for normal consumers where you can seek() and --reset-offsets.
> Streams apps can be reset with the kafka-streams-application-reset tool,
> but as you point out, that doesn't help with rebuilding an app's internal
> state, which would be missing on the secondary cluster. (Granted, that may
> be okay depending on your particular application.)
>
> A true "active/active" solution IMO would be to run your same Streams app
> in _both_ clusters (primary, secondary), s.t. the entire application state
> is available and continuously updated in both clusters. As with normal
> consumers, the Streams app should subscribe to any remote topics, e.g. with
> a regex, s.t. the application state will reflect input from either source
> cluster.
>
> This is essentially what Streams' "standby replicas" are -- extra copies
> of application state to support quicker failover. Without these replicas,
> Streams would need to start back at offset 0 and re-process everything in
> order to rebuild state (which you don't want to do during a disaster,
> especially!). The same logic applies to using Streams with MM2. You _could_
> failover by resetting the app and rebuilding all the missing state, or you
> could have a copy of everything sitting there ready when you need it. The
> easiest way to do the latter is to run your app in both clusters.
>
> Hope that helps.
>
> Ryanne
>
> On Mon, Jul 22, 2019 at 3:11 PM Adam Bellemare 
> wrote:
>
>> Hi Ryanne
>>
>> I have a quick question for you about Active+Active replication and Kafka
>> Streams. First, does your org /do you use Kafka Streams? If not then I
>> think this conversation can end here. ;)
>>
>> Secondly, and for the broader Kafka Dev group - what happens if I want to
>> use Active+Active replication with my Kafka Streams app, say, to
>> materialize a simple KTable? Based on my understanding, I topic "table" on
>> the primary cluster will be replicated to the secondary cluster as
>> "primary.table". In the case of a full cluster failure for primary, the
>> producer to topic "table" on the primary switches over to the secondary
>> cluster, creates its own "table" topic and continues to write to there. So
>> now, assuming we have had no data loss, we end up with:
>>
>>
>> *Primary Cluster: (Dead)*
>>
>>
>> *Secondary Cluster: (Live)*
>> Topic: "primary.table" (contains data from T = 0 to T = n)
>> To

KIP-382 + Kafka Streams Question

2019-07-22 Thread Adam Bellemare
Hi Ryanne

I have a quick question for you about Active+Active replication and Kafka
Streams. First, does your org /do you use Kafka Streams? If not then I
think this conversation can end here. ;)

Secondly, and for the broader Kafka Dev group - what happens if I want to
use Active+Active replication with my Kafka Streams app, say, to
materialize a simple KTable? Based on my understanding, I topic "table" on
the primary cluster will be replicated to the secondary cluster as
"primary.table". In the case of a full cluster failure for primary, the
producer to topic "table" on the primary switches over to the secondary
cluster, creates its own "table" topic and continues to write to there. So
now, assuming we have had no data loss, we end up with:


*Primary Cluster: (Dead)*


*Secondary Cluster: (Live)*
Topic: "primary.table" (contains data from T = 0 to T = n)
Topic: "table" (contains data from T = n+1 to now)

If I want to materialize state from using Kafka Streams, obviously I am now
in a bit of a pickle since I need to consume "primary.table" before I
consume "table". Have you encountered rebuilding state in Kafka Streams
using Active-Active? For non-Kafka Streams I can see using a single
consumer for "primary.table" and one for "table", interleaving the
timestamps and performing basic event dispatching based on my own tracked
stream-time, but for Kafka Streams I don't think there exists a solution to
this.

If you have any thoughts on this or some recommendations for Kafka Streams
with Active-Active I would be very appreciative.

Thanks
Adam


Re: Nag!! KAFKA-8629 - Some feedback wanted

2019-07-20 Thread Adam Bellemare
Hey Andy

Any follow up to this? I think this is an important feature and would love
to see it move forward. I think the main thing now is just getting a better
handle on the limitations as questioned by John to get a better idea of
what the terrain looks like. From there we can spec out the necessary
verification stuff, and perhaps others an help contribute to that.

Thanks
Adam

On Tue, Jul 9, 2019 at 5:12 PM John Roesler  wrote:

> Hey Andy,
>
> Thanks for looking into this. I've been curious about it for some
> time. I'm glad to hear that the gap to get there is so small.
>
> You mentioned potentially switching off the JMX stuff with a config
> option. I'm not sure hiding the JMX features behind a config flag
> would be good enough... Can you elaborate on the nature of the
> incomatibility? I.e., is it a compile-time problem, or a run-time one?
>
> It wouldn't really be straightforward to determine if your alternative
> to the reflection code is acceptable until you send a PR... By all
> means, feel free to create a PR and just start the title off with
> `[POC]`, so that everyone know it's not a final proposal.
>
> FWIW, I think if we're going to really state that Streams works on
> GraalVM, we do need to build some verification of this statement into
> the build and test cycle. So, once you get out of POC phase and start
> making a serious proposal, be sure to consider how we can ensure we
> _remain_ compatible going forward.
>
> Thanks again for considering this!
> -John
>
> On Tue, Jul 9, 2019 at 3:34 PM Ismael Juma  wrote:
> >
> > I think it would be awesome to support GraalVM native images for Kafka
> > Streams and CLI tools.
> >
> > Ismael
> >
> > On Tue, Jul 9, 2019, 12:40 PM Andy Muir 
> > wrote:
> >
> > > Hi
> > >
> > > I hope you can have a look at
> > > https://issues.apache.org/jira/browse/KAFKA-8629 <
> > > https://issues.apache.org/jira/browse/KAFKA-8629> and perhaps give me
> > > some feedback? I’d like to decide if this is worth pursuing!
> > >
> > > I believe that making apps built on top of Kafka Streams can benefit
> > > hugely from the use of GraalVM. I’d like to help as much as I can.
> > >
> > > PS: I can’t change the assignee of the ticket to myself! :(
> > >
> > > Regards
> > >
> > > Andy Muir
> > > muira...@yahoo.co.uk
> > > @andrewcmuir
>


Re: Stopping All Tasks When a New Connector Added

2019-07-19 Thread Adam Bellemare
Hi Luying

Would you be willing to make a PR to address this? It seems that you have
already done most of the work.

Thanks
Adam

On Thu, Jul 18, 2019 at 11:00 PM Liu Luying  wrote:

> Hi all,
> I have noticed that Kafka Connect 2.3.0 will stop all existing tasks and
> then start all the tasks, including the new tasks and the existing ones
> when adding a new connector or changing a connector configuration. However,
> I do not think it is a must. Only the new connector and tasks need to be
> started. As the rebalancing can be applied for both running and suspended
> tasks.
>
> The problem lies in the
> KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623
> in KafkaConfigBackingStore.java). When record.key() startsWith "commit-",
> the tasks are being committed, and the deferred tasks are processed, Some
> new tasks are added to the 'updatedTasks'(line 623 in
> KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to
> updateListener to complete the task configuration update(line 638 in
> KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate()
> function, the  'updatedTasks' are added to the member variable,
> 'taskConfigUpdates', of class DistributedHerder(line 1295 in
> DistributedHerder.java).
>
> In another thread, 'taskConfigUpdates' is copied to
> 'taskConfigUpdatesCopy' in updateConfigsWithIncrementalCooperative() (line
> 445 in DistributedHerder.java). The 'taskConfigUpdatesCopy' is subsequently
> used in processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in
> DistributedHerder.java). This function then uses  'taskConfigUpdatesCopy'
> to find connectors to stop(line 492 in DistributedHerder.java), and finally
> get the tasks to stop, which are all the tasks. The worker thread does the
> actual job of stop(line 499 in DistributedHerder.java).
>
> In the original code, all the tasks are added to the 'updatedTasks' (line
> 623 in KafkaConfigBackingStore.java), which means all the active connectors
> are in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the
> 'tasksToStop' list. This causes the stops, and of course the subsequent
> restarts, of all the tasks.
>
> So, adding only the 'deferred' tasks to the  'updatedTasks' can avoid the
> stops and restarts of unnecessary tasks.
>
> Best,
> Luying
>
>


Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-07-12 Thread Adam Bellemare
@Matthias J. Sax  - Thoughts on the semantics of
simply leaving it as-is, with the extra tombstones? As John put it: "It may
be unnecessary to "delete" a
non-existant record from a view, but it's never incorrect."

It may not be ideal, but the complexity of eliminating it seems to be high
and frankly I don't have any better ideas at the moment.

Unless you strongly object, I think we'll have to move forward with it
as-is. There is still time to come up with another solution before I
*hopefully* get this into 2.4, but in the meantime I'll look to continue on
otherwise.

Adam


On Thu, Jul 11, 2019 at 9:57 AM Jan Filipiak 
wrote:

>
>
> On 10.07.2019 06:25, Adam Bellemare wrote:
> > In my experience (obviously empirical) it seems that many people just
> want
> > the ability to join on foreign keys for the sake of handling all the
> > relational data in their event streams and extra tombstones don't matter
> at
> > all. This has been my own experience from our usage of our internal
> > implementation at my company, and that of many others who have reached
> out
> > to me.
>
> backing this.
>
>


Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-07-10 Thread Adam Bellemare
 is the case, then the
> probability we get a FK change from one broken reference to another
> broken reference before the first reference gets repaired should be
> quite rare for "normal" data sets.
>
> Caveat: https://www.xkcd.com/2167/
>
> -
>
> So, under the assumption that these unnecessary tombstones are rare,
> and with the understanding that they're semantically ok to emit, it
> really doesn't seem worthwhile to take on all the extra complexity
> proposed earlier in this conversation.
>
> A much simpler solution, if someone is really struggling with extra
> tombstones, would be just be to slap an LRU cache on the result table
> and drop unnecessary tombstones that way.
>
> Again, just my personal thoughts, FWIW...
> -John
>
> On Tue, Jul 9, 2019 at 11:25 PM Adam Bellemare 
> wrote:
> >
> > I know what I posted was a bit of a wall of text, but three follow up
> > thoughts to this:
> >
> > 1) Is it possible to enforce exactly-once for a portion of the topology?
> I
> > was trying to think about how to process my proposal with at-least-once
> > processing (or at-most-once processing) and I came up empty-handed.
> >
> > 2) A very deft solution is to also just support left-joins but not
> > inner-joins. Practically speaking, either INNER or LEFT join as it
> > currently is would support all of my use-cases.
> >
> > 3) Accept that there may be some null tombstones (though this makes me
> want
> > to just go with LEFT only instead of LEFT and PSEUDO-INNER).
> >
> > In my experience (obviously empirical) it seems that many people just
> want
> > the ability to join on foreign keys for the sake of handling all the
> > relational data in their event streams and extra tombstones don't matter
> at
> > all. This has been my own experience from our usage of our internal
> > implementation at my company, and that of many others who have reached
> out
> > to me.
> >
> > What would help most at this point is if someone can come up with a
> > scenario where sending unnecessary tombstones actually poses a downstream
> > problem beyond that of confusing behaviour, as I cannot think of one
> > myself.  With that being said, I am far more inclined to actually then
> > support just option #2 above and only have LEFT joins, forgoing INNER
> > completely since it would not be a true inner join.
> >
> > Adam
> >
> >
> >
> >
> >
> >
> >
> >
> > On Thu, Jul 4, 2019 at 8:50 AM Adam Bellemare 
> > wrote:
> >
> > > Hi Matthias
> > >
> > > A thought about a variation of S1 that may work - it has a few moving
> > > parts, so I hope I explained it clearly enough.
> > >
> > > When we change keys on the LHS:
> > > (k,a) -> (k,b)
> > > (k,a,hashOf(b),PROPAGATE_OLD_VALUE) goes to RHS-0
> > > (k,b,PROPAGATE_NEW_VALUE) goes to RHS-1
> > >
> > > A) When the (k,a,hashOf(b),PROPAGATE_OLD_VALUE) hits RHS-0, the
> following
> > > occurs:
> > >   1) Store the current (CombinedKey, Value=(Hash, ForeignValue))
> in
> > > a variable
> > >   2) Delete the key from the store
> > >   3) Publish the event from step A-1 downstream with an instruction:
> > > (eventType = COMPARE_TO_OTHER) (or whatever)
> > > *  (key, (hashOf(b),wasForeignValueNull, eventType))*
> > > //Don't need the old hashOf(b) as it is guaranteed to be out of
> date
> > > //We do need the hashOf(b) that came with the event to be passed
> > > along. Will be used in resolution.
> > > //Don't need the actual value as we aren't joining or comparing the
> > > values, just using it to determine nulls. Reduces payload size.
> > >
> > > B) When (k,b,PROPAGATE_NEW_VALUE) hits RHS-1, the following occurs:
> > >   1) Store it in the prefix state store (as we currently do)
> > >   2) Get the FK-value (as we currently do)
> > >   3) Return the normal SubscriptionResponse payload (eventType =
> UPDATE)
> > > (or whatever)
> > > * (key, (hashOf(b), foreignValue, eventType))*
> > >
> > >
> > > C) The Resolver Table is keyed on (as per our example):
> > > key = CombinedKey, value =
> > > NullValueResolution (set
> > > by RHS-1)>
> > >
> > > Resolution Steps per event:
> > >
> > > When one of either the output events from A (eventType ==
> > > COMPARE_TO_OTHER) or B (eventType == UPDATE) is received
> > > 1) Check if this event matches the current hashOf(

Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-07-09 Thread Adam Bellemare
I know what I posted was a bit of a wall of text, but three follow up
thoughts to this:

1) Is it possible to enforce exactly-once for a portion of the topology? I
was trying to think about how to process my proposal with at-least-once
processing (or at-most-once processing) and I came up empty-handed.

2) A very deft solution is to also just support left-joins but not
inner-joins. Practically speaking, either INNER or LEFT join as it
currently is would support all of my use-cases.

3) Accept that there may be some null tombstones (though this makes me want
to just go with LEFT only instead of LEFT and PSEUDO-INNER).

In my experience (obviously empirical) it seems that many people just want
the ability to join on foreign keys for the sake of handling all the
relational data in their event streams and extra tombstones don't matter at
all. This has been my own experience from our usage of our internal
implementation at my company, and that of many others who have reached out
to me.

What would help most at this point is if someone can come up with a
scenario where sending unnecessary tombstones actually poses a downstream
problem beyond that of confusing behaviour, as I cannot think of one
myself.  With that being said, I am far more inclined to actually then
support just option #2 above and only have LEFT joins, forgoing INNER
completely since it would not be a true inner join.

Adam








On Thu, Jul 4, 2019 at 8:50 AM Adam Bellemare 
wrote:

> Hi Matthias
>
> A thought about a variation of S1 that may work - it has a few moving
> parts, so I hope I explained it clearly enough.
>
> When we change keys on the LHS:
> (k,a) -> (k,b)
> (k,a,hashOf(b),PROPAGATE_OLD_VALUE) goes to RHS-0
> (k,b,PROPAGATE_NEW_VALUE) goes to RHS-1
>
> A) When the (k,a,hashOf(b),PROPAGATE_OLD_VALUE) hits RHS-0, the following
> occurs:
>   1) Store the current (CombinedKey, Value=(Hash, ForeignValue)) in
> a variable
>   2) Delete the key from the store
>   3) Publish the event from step A-1 downstream with an instruction:
> (eventType = COMPARE_TO_OTHER) (or whatever)
> *  (key, (hashOf(b),wasForeignValueNull, eventType))*
> //Don't need the old hashOf(b) as it is guaranteed to be out of date
> //We do need the hashOf(b) that came with the event to be passed
> along. Will be used in resolution.
> //Don't need the actual value as we aren't joining or comparing the
> values, just using it to determine nulls. Reduces payload size.
>
> B) When (k,b,PROPAGATE_NEW_VALUE) hits RHS-1, the following occurs:
>   1) Store it in the prefix state store (as we currently do)
>   2) Get the FK-value (as we currently do)
>   3) Return the normal SubscriptionResponse payload (eventType = UPDATE)
> (or whatever)
> * (key, (hashOf(b), foreignValue, eventType))*
>
>
> C) The Resolver Table is keyed on (as per our example):
> key = CombinedKey, value =
> NullValueResolution by RHS-1)>
>
> Resolution Steps per event:
>
> When one of either the output events from A (eventType ==
> COMPARE_TO_OTHER) or B (eventType == UPDATE) is received
> 1) Check if this event matches the current hashOf(b). If not, discard it,
> for it is stale and no longer matters.  Additionally, delete entry
> CombinedKey from the Resolver Table.
>
> 2) Lookup event in table on its CombinedKey:
>   - If it's not in the table, create the  NullValueResolution value,
> populate the field related to the eventType, and add it to the table.
>   - If it already IS in the table, get the existing NullValueResolution
> object and finish populating it:
>
> 3) If the NullValueResolution is fully populated, move on to the
> resolution logic below.
>
> Format:
> (wasForeignValueNull, foreignValue) -> Result
> If:
> ( false  , Null ) -> Send tombstone. Old value was not null, new one is,
> send tombstone.
> (  true  , Null ) -> Do nothing.  See * below for more details.
> (  true  , NewValue ) -> Send the new result
> (  true  , NewValue ) -> Send the new result
>
> * wasForeignValueNull may have been false at some very recent point, but
> only just translated to true (race condition). In this case, the RHS table
> was updated and the value was set to null due to a an RHS update of (a,
> oldVal) -> (a, null). This event on its own will propagate a delete event
> through to the resolver (of a different eventType), so we don't need to
> handle this case from the LHS and doing nothing is OK.
>
> In the case that it's truly (true, Null), we also don't need to send a
> tombstone because wasForeignKeyNull == true means that a tombstone was
> previously sent.
>
> 4) Check the hashOf(b) one last time before sending the resolved message
> out. If the hash is old, discard it.
>
> 5) Delete the row from the Resolver Table.
>
&

Re: Nag!! KAFKA-8629 - Some feedback wanted

2019-07-09 Thread Adam Bellemare
Hi Andy

Can you elaborate on two things for clarity’s sake?

1) Are there any alternatives we can use for JMX instead of commenting it out?

2) Are there any other limitations with Graal VM that may impede future 
development?

The second one is more difficult to answer I suspect because it relies on us 
adopting GraalVM support as a requirement for future releases of Kafka Streams. 
I think we would need a better idea of the impacts of this and if it should be 
scoped out beyond Kafka Streams or just limited as such. 



> On Jul 9, 2019, at 7:39 AM, Andy Muir  wrote:
> 
> Hi
> 
> I hope you can have a look at 
> https://issues.apache.org/jira/browse/KAFKA-8629 
>  and perhaps give me some 
> feedback? I’d like to decide if this is worth pursuing!
> 
> I believe that making apps built on top of Kafka Streams can benefit hugely 
> from the use of GraalVM. I’d like to help as much as I can.
> 
> PS: I can’t change the assignee of the ticket to myself! :(
> 
> Regards
> 
> Andy Muir
> muira...@yahoo.co.uk
> @andrewcmuir


Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-07-04 Thread Adam Bellemare
ssing load. It's hard to judge, how much the overhead will be, as
> it will depend on the selectivity of the join. But it might be significant?
>
> Also, users have certain expectations on the result and it's unintuitive
> (even if semantically correct) to send those tombstones. From
> experience, we often have a hard time to explain semantics to people and
> I was hoping we could avoid introducing unintuitive behavior.
>
>
> Would be good to get input from others and how they judge the impact. I
> think it might be still worth to explore how complex S-1 would be. If we
> think it's too complex it might be a good argument to just accept the
> unnecessary tombstones?
>
>
> -Matthias
>
>
> On 7/3/19 8:03 AM, Adam Bellemare wrote:
> > Hi Matthias
> >
> > Do you happen to recall what the impact was of having unnecessary
> > tombstones? I am wondering if the negative impact is still relevant
> today,
> > and if so, if you can recall the PRs or KIPs related to it.
> >
> > That being said, I think that S-1 is too complex in terms of
> > synchronization. It seems to me that the processor would need to block
> > while it waits for the unsubscribe to propagate and return, which would
> > cause throughput to drop significantly. Alternately, we would need to
> > maintain state anyways about which events were sent and which responses
> > returned, while being sure to respect the offset order in which they're
> > emitted. I think this would only reduce blocking slightly while
> increasing
> > complexity. If I am wrong in understanding this, please let me know where
> > my thinking is erroneous.
> >
> > S-2 could probably be simplified to "for a given key, was the previous
> > propagated result a null/tombstone or not?"
> > It would act very similarly to the hash value mechanism, where we discard
> > any events that are not of the correct hash. In this case, we simply
> store
> > (key, wasLastOutputATombstone) right before the event is output
> downstream
> > of the Join + Resolver. This ignores all the complexities of which event
> is
> > propagating over which wire and simply squelches any extra tombstones
> from
> > being sent.
> >
> > For storage, we need to use the full primary key and a boolean. However,
> > the table will grow indefinitely large as we can never remove keys from
> it.
> > If we delete key=k from the table and propagate a tombstone, but later
> (say
> > 3 weeks, 3 months, etc) we publish (k, baz), but baz does not exist on
> the
> > RHS, we will end up publishing an extra tombstone because we have no idea
> > what the previously sent record was for k. For this reason I think it's
> > worth asking if we really can maintain state, and if it's even necessary
> > (again, a full understanding of the impact of extra tombstones may help
> us
> > figure out a better solution).
> >
> > As it stands, I don't think either of these will work well. That being
> > said, I myself do not have any better ideas at the moment, but I would
> > still like to better understand circumstances where it has a negative
> > impact downstream as that may provide some insights.
> >
> >
> > Thanks
> >
> > Adam
> >
> >
> >
> >
> >
> > On Tue, Jul 2, 2019 at 11:18 PM Matthias J. Sax 
> > wrote:
> >
> >> Thanks for the example. I was thinking about the problem a little bit,
> >> and I believe we should look at it in some more details.
> >>
> >> Basically, there are 3 cases:
> >>
> >> a) insert new record LHS
> >> b) delete record LHS
> >> c) update exiting record LHS
> >>
> >> For those cases we want different things to happen:
> >>
> >> a-1) sent subscribe message to RHS
> >> a-2) RHS lookup and send result back if there is one
> >> a-3) emit result on LHS if any is returned
> >>
> >> b-1) delete subscription from RHS
> >> b-2) if there was a previous result (can easily be decided by looking up
> >> RHS table for an existing key), send tombstone back
> >> b-3) emit tombstone on LHS if any is returned
> >>
> >> c-1) delete old subscription from RHS
> >> c-2) send new subscription to RHS
> >> c-3) if there was no previous result and there is no new result emit
> >> nothing
> >> c-4) if there was a previous result and there is no new result emit a
> >> tombstone LHS
> >> c-5) if there is a new result (old result may or may not exist), emit
> >> only new result LHS (don't emit a tombstone

Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-07-03 Thread Adam Bellemare
scribe" does not know if it needs to send a tombstone back for
> the case that there was an old result but there is no new result.
> Similarly, the "new subscribe" cannot know if it needs to send a
> tombstone or not (as it does not know if there was a previous result) if
> it does not match.
>
> To really solve the issue, I see two possible solutions (both are not
> great, but I wanted to discuss them anyway):
>
> S-1: First unsubscribe, and send new subscription after result comes
> back. For this case, the RHS must always send something back to the LHS
> on unsubscribe. The answer if "previous result exists/not-exist" can be
> added to the new-subscription and hence RHS can either return nothing, a
> tombstone, or a new result. The LHS can blindly emit whatever RHS
> returns. This would also cover (a) and (b) cases. However, the overall
> time to emit the join result is doubled for the (common) update case...
> (we need two consecutive round-trips to the RHS).
>
> S-2: Remember/store if a previous result exists on LHS: for this case,
> (a) is handled straightforward, (b) is handled by telling RHS to send
> tombstone if previous result exits, and (c) can send both request in
> parallel letting the unsubscribe never return anything, and subscribe is
> handled as in (b). However, we need a second store on the LHS to
> remember if there was a previous result. (Also not sure how
> interleaving/inflight computation might affect the algorithm...)
>
> I think, sending unnecessary tombstones is quite bad (in very old
> releases we had a similar issue and fixed it). However, I am also not
> 100% sure if the solutions I came up with are good enough to justify
> them. (Personally, I slightly tend to prefer S-2 because I think that
> the additional store is less of an issue than the increase processing
> time).
>
> Would love to hear your thoughts.
>
>
> -Matthias
>
>
> On 6/28/19 6:19 AM, Adam Bellemare wrote:
> > Hi Matthias
> >
> > Yes, thanks for the questions - I know it's hard to keep up with all of
> the
> > various KIPs and everything.
> >
> > The instructions are not stored anywhere, but are simply a way of letting
> > the RHS know how to handle the subscription and reply accordingly.
> >
> > The only case where we send an unnecessary tombstone is (that I can
> > tell...) when we do the following:
> > RHS:
> > (1, bar)
> >
> > LHS
> > (K,1)  -> Results in (K, 1, bar) being output
> > (K,1) -> (K,2) ->  Results in (K, null) being output for INNER (no
> matching
> > element on LHS)
> > (K,2) -> (K,3) ->  Results in (K, null) being output for INNER (because
> we
> > don't maintain state to know we already output the tombstone on the
> > previous transition).
> > (K,2) -> (K,9000) ->  Results in (K, null)... etc.
> >
> > Byte versioning is going in today, then I hope to get back to addressing
> a
> > number of John's previous questions in the PR.
> >
> > Adam
> >
> >
> > On Thu, Jun 27, 2019 at 5:47 PM Matthias J. Sax 
> > wrote:
> >
> >> Thanks for bringing this issue to our attention. Great find @Joe!
> >>
> >> Adding the instruction field to the `subscription` sounds like a good
> >> solution. What I don't understand atm: for which case would we need to
> >> send unnecessary tombstone? I thought that the `instruction` field helps
> >> to avoid any unnecessary tombstone? Seems I a missing case?
> >>
> >> Also for my own understanding: the `instruction` is only part of the
> >> message? It is no necessary to store it in the RHS auxiliary store,
> right?
> >>
> >> About right/full-outer joins. Agreed. Getting left-joins would be
> awesome!
> >>
> >> About upgrading: Good call John! Adding a version byte for subscription
> >> and response is good forward thinking. I personally prefer version
> >> numbers, too, as they carry more information.
> >>
> >> Thanks for all the hard to everybody involved!
> >>
> >>
> >> -Matthias
> >>
> >> On 6/27/19 1:44 PM, John Roesler wrote:
> >>> Hi Adam,
> >>>
> >>> Hah! Yeah, I felt a headache coming on myself when I realized this
> >>> would be a concern.
> >>>
> >>> For what it's worth, I'd also lean toward versioning. It seems more
> >>> explicit and more likely to keep us all sane in the long run. Since we
> >>> don't _think_ our wire protocol will be subject to a lot of revisions,
> >>> we can just use one byte. The wor

Re: Synchronized consumption + processing based on timestamps?

2019-07-03 Thread Adam Bellemare
Thanks Matthias,

For #1 this is something we have done already, but it can obviously take a
while to catch up for large topics (or may not catch up at all if the
volume is large).

For #2, this is what I am wondering about, if anyone knows of any
implementations that provide an option like this under the hood of the
consumer, some sort of SynchronizedConsumer. I can see doing it on a
per-client basis, but it would be great to abstract it out into a general
approach.

I'll think about this one more.

Thanks



On Tue, Jul 2, 2019 at 11:27 PM Matthias J. Sax 
wrote:

> I think you can only achieve this, if
>
> 1) you don't use two clients, but only one client that reads both
> partitions
>
> or
>
> 2) let both clients exchange data about their time progress
>
>
> -Matthias
>
>
> On 7/2/19 6:01 PM, Adam Bellemare wrote:
> > Hi All
> >
> > The use-case is pretty simple. Lets say we have a history of events with
> > the following:
> > key=userId, value = (timestamp, productId)
> >
> > and we want to remap it to (just as we would with an internal topic):
> > key=productId, value=(original_timestamp, userId)
> >
> > Now, say I have 30 days of backlog, and 2 partitions for the input
> topic. I
> > spin up two instances and let them process the data from the start of
> time,
> > but one instance is only half as powerful (less CPU, Mem, etc), such that
> > instance 0 processes X events / sec which instance 1 processes x/2 events
> > /sec.
> >
> > My question is: Are there *any* clients, kafka streams, spark, flink, etc
> > or otherwise, that would allow these two consumers to remain in sync
> *according
> > to their timestamps*? I don't want to see events with original_timestamp
> of
> > today (from instance 0) interleaved with events from 15 days ago (from
> the
> > underpowered instance 1). Yes, I do realize this would bring my
> throughput
> > down, but I am looking for any existing tech that would effectively say
> *"cap
> > the time difference of events coming out of this repartition processor at
> > 60 seconds max"*
> >
> > Currently, I am not aware of ANY open source solutions that do this for
> > Kafka, but if someone has heard otherwise I would love to know.
> > Alternately, perhaps this could lead to a KIP.
> >
> > Thanks
> > Adam
> >
>
>


Synchronized consumption + processing based on timestamps?

2019-07-02 Thread Adam Bellemare
Hi All

The use-case is pretty simple. Lets say we have a history of events with
the following:
key=userId, value = (timestamp, productId)

and we want to remap it to (just as we would with an internal topic):
key=productId, value=(original_timestamp, userId)

Now, say I have 30 days of backlog, and 2 partitions for the input topic. I
spin up two instances and let them process the data from the start of time,
but one instance is only half as powerful (less CPU, Mem, etc), such that
instance 0 processes X events / sec which instance 1 processes x/2 events
/sec.

My question is: Are there *any* clients, kafka streams, spark, flink, etc
or otherwise, that would allow these two consumers to remain in sync *according
to their timestamps*? I don't want to see events with original_timestamp of
today (from instance 0) interleaved with events from 15 days ago (from the
underpowered instance 1). Yes, I do realize this would bring my throughput
down, but I am looking for any existing tech that would effectively say *"cap
the time difference of events coming out of this repartition processor at
60 seconds max"*

Currently, I am not aware of ANY open source solutions that do this for
Kafka, but if someone has heard otherwise I would love to know.
Alternately, perhaps this could lead to a KIP.

Thanks
Adam


Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-06-28 Thread Adam Bellemare
Just some thoughts around the versioning. I'm trying to work out a more
elegant way to handle it than what I've come up with so far below.


*1) I'm planning to use an enum for the versions, but I am not sure how to
tie the versions to any particular release. For instance, something like
this is doable, but may become more complex as versions go on. See 2 & 3
below.*

public enum Version {
  V0((byte) 0x00),  //2.4.0
  V1((byte) 0x01);  //2.5.0
}

*2) Keeping track of compatibility is a bit tricky. For instance, how do we
know which messages are compatible and which are breaking? Which upgrade
paths do we handle and which ones do we not?  How long do we handle support
for old message versions? 2 minor releases? 1 major release?*

For example:
Version 2.4.0:   V0
Version 2.5.0:   V0, V1  (V1 processor must also handle V0).
Version 2.6.0:   V0?, V1, V2  (Can we now drop support for V0? What happens
if someone upgrades from 2.4.0 to 2.6.0 directly and it's not supported?)

*3) Does the RHS 1:1 processor, which I currently call
`ForeignKeySingleLookupProcessorSupplier`, basically end up looking like
this?*
if (value.version == Version.V0)
  //do V0 processing
else if (value.version == Version.V1)
  //do V1 processing
else if (value.version == Version.V2)
  //do V2 processing


The tricky part becomes keeping all the Instructions straight for each
Version. For instance, one option (*OPTION-A*) is:
//Version = 2.4.0
enum Instruction {  A, B, C; }

//Version = 2.5.0
enum Instruction {  A, B, C, //Added in 2.4.0, Value.Version = V0
   D; //Added in 2.5.0, Value.Version = V1.
Also uses Value.Version = V0 instructions.
}

//Version = 2.6.0
enum Instruction{  A, B, C, //Added in 2.4.0, Value.Version = V0. *Don't
use for V2*
   D, //Added in 2.5.0, Value.Version = V1.
Also uses Value.Version = V0 instructions. *Don't use for V2*
   E,F,G,H,I,J; //Added in 2.6.0, Value.Version
= V2.
}

Alternatively, something like this (*OPTION-B*), where the Version and the
Instruction are tied together in the Instruction itself.

enum Instruction{  V0_A, V0_B, V0_C, //Added in 2.4.0
  V1_A, V1_B, V1_C, V1_D, //Added in 2.5.0
  V2_E, V2_F, V2_G, V2_H, V2_I;  //Added in
2.6.0
}
At this point our logic in the `ForeignKeySingleLookupProcessorSupplier`
looks something like this:
if (value.version == Version.V0) {
if (value.instruction == Instruction.V0_A) ...
else if (value.instruction == Instruction.V0_B) ...
else if (value.instruction == Instruction.V0_C) ...
else ...
} else if (value.version == Version.V1) {
if (value.instruction == Instruction.V1_A) ...
else if (value.instruction == Instruction.V1_B) ...
else if (value.instruction == Instruction.V1_C) ...
else if (value.instruction == Instruction.V1_D) ...
else ...
} else if ...

I prefer option B because Instruction meaning can change between versions,
especially in scenarios where we may be reworking, say, 2 instructions into
4 instructions that aren't neat subsets of the original 2.


*4) If we hard-stop on incompatible events (say we don't support that
version), how does the user go about handling the upgrade? *
We can't ignore the events as it would ruin the delivery guarantees. At
this point it seems to me that they would have to do a full streams reset
for that applicationId. Am I incorrect in this?


Adam


On Fri, Jun 28, 2019 at 9:19 AM Adam Bellemare 
wrote:

> Hi Matthias
>
> Yes, thanks for the questions - I know it's hard to keep up with all of
> the various KIPs and everything.
>
> The instructions are not stored anywhere, but are simply a way of letting
> the RHS know how to handle the subscription and reply accordingly.
>
> The only case where we send an unnecessary tombstone is (that I can
> tell...) when we do the following:
> RHS:
> (1, bar)
>
> LHS
> (K,1)  -> Results in (K, 1, bar) being output
> (K,1) -> (K,2) ->  Results in (K, null) being output for INNER (no
> matching element on LHS)
> (K,2) -> (K,3) ->  Results in (K, null) being output for INNER (because we
> don't maintain state to know we already output the tombstone on the
> previous transition).
> (K,2) -> (K,9000) ->  Results in (K, null)... etc.
>
> Byte versioning is going in today, then I hope to get back to addressing a
> number of John's previous questions in the PR.
>
> Adam
>
>
> On Thu, Jun 27, 2019 at 5:47 PM Matthias J. Sax 
> wrote:
>
>> Thanks for bringing this issue to our attention. Great find @Joe!
>>
>> Adding the instruction field to the `subscription` sounds like a good
>> solution. What I don't understand atm: for which case would we need to
>> send unnecessary tombstone? I thought that the `instruction` field helps
>> to avoid any unnecessary tombstone

Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-06-28 Thread Adam Bellemare
Hi Matthias

Yes, thanks for the questions - I know it's hard to keep up with all of the
various KIPs and everything.

The instructions are not stored anywhere, but are simply a way of letting
the RHS know how to handle the subscription and reply accordingly.

The only case where we send an unnecessary tombstone is (that I can
tell...) when we do the following:
RHS:
(1, bar)

LHS
(K,1)  -> Results in (K, 1, bar) being output
(K,1) -> (K,2) ->  Results in (K, null) being output for INNER (no matching
element on LHS)
(K,2) -> (K,3) ->  Results in (K, null) being output for INNER (because we
don't maintain state to know we already output the tombstone on the
previous transition).
(K,2) -> (K,9000) ->  Results in (K, null)... etc.

Byte versioning is going in today, then I hope to get back to addressing a
number of John's previous questions in the PR.

Adam


On Thu, Jun 27, 2019 at 5:47 PM Matthias J. Sax 
wrote:

> Thanks for bringing this issue to our attention. Great find @Joe!
>
> Adding the instruction field to the `subscription` sounds like a good
> solution. What I don't understand atm: for which case would we need to
> send unnecessary tombstone? I thought that the `instruction` field helps
> to avoid any unnecessary tombstone? Seems I a missing case?
>
> Also for my own understanding: the `instruction` is only part of the
> message? It is no necessary to store it in the RHS auxiliary store, right?
>
> About right/full-outer joins. Agreed. Getting left-joins would be awesome!
>
> About upgrading: Good call John! Adding a version byte for subscription
> and response is good forward thinking. I personally prefer version
> numbers, too, as they carry more information.
>
> Thanks for all the hard to everybody involved!
>
>
> -Matthias
>
> On 6/27/19 1:44 PM, John Roesler wrote:
> > Hi Adam,
> >
> > Hah! Yeah, I felt a headache coming on myself when I realized this
> > would be a concern.
> >
> > For what it's worth, I'd also lean toward versioning. It seems more
> > explicit and more likely to keep us all sane in the long run. Since we
> > don't _think_ our wire protocol will be subject to a lot of revisions,
> > we can just use one byte. The worst case is that we run out of numbers
> > and reserve the last one to mean, "consult another field for the
> > actual version number". It seems like a single byte on each message
> > isn't too much to pay.
> >
> > Since you point it out, we might as well put a version number on the
> > SubscriptionResponseWrapper as well. It may not be needed, but if we
> > ever need it, even just once, we'll be glad we have it.
> >
> > Regarding the instructions field, we can also serialize the enum very
> > compactly as a single byte (which is the same size a boolean takes
> > anyway), so it seems like an Enum in Java-land and a byte on the wire
> > is a good choice.
> >
> > Agreed on the right and full outer joins, it doesn't seem necessary
> > right now, although I am happy to see the left join "join" the party,
> > since as you said, we were so close to it anyway. Can you also add it
> > to the KIP?
> >
> > Thanks as always for your awesome efforts on this,
> > -John
> >
> > On Thu, Jun 27, 2019 at 3:04 PM Adam Bellemare 
> wrote:
> >>
> >> You're stretching my brain, John!
> >>
> >> I prefer STRATEGY 1 because it solves the problem in a simple way, and
> >> allows us to deprecate support for older message types as we go (ie, we
> >> only support the previous 3 versions, so V5,V4,V3, but not v2 or V1).
> >>
> >> STRATEGY 2 is akin to Avro schemas between two microservices - there are
> >> indeed cases where a breaking change must be made, and forward
> >> compatibility will provide us with no out other than requiring a full
> stop
> >> and full upgrade for all nodes, shifting us back towards STRATEGY 1.
> >>
> >> My preference is STRATEGY 1 with instructions as an ENUM, and we can
> >> certainly include a version. Would it make sense to include a version
> >> number in  SubscriptionResponseWrapper as well? Currently we don't have
> any
> >> instructions in there, as I removed the boolean, but it is certainly
> >> plausible that it could happen in the future. I don't *think* we'll need
> >> it, but I also didn't think we'd need it for SubscriptionWrapper and
> here
> >> we are.
> >>
> >> Thanks for the thoughts, and the info on the right-key. That was
> >> enlightening, though I can't think of a use-case for it *at this point
> in
> >> time*. :)
> >>
>

Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-06-27 Thread Adam Bellemare
the full message would still
> behave correctly. Using the example above, we'd instead evolve the
> schema to V2': "FK,PK,HASH,BOOLEAN,INSTRUCTIONS", and continue to set
> the boolean field to true for the "new" foreign key. Then, 2.4 workers
> encountering the a "new FK" message would just see the prefix of the
> payload that makes sense to them, and they would still continue
> processing the messages as they always have. Only after the 2.5 code
> is fully rolled out to the cluster would we be sure to see the desired
> behavior. Note: in the reverse case, a 2.5 worker knows how to fully
> parse the new message format, even if it plans to ignore the BOOLEAN
> field.
>
> There are some tradeoffs between these strategies: STRATEGY1 ensures
> that all messages are only handled by workers that can properly handle
> them, although it results in processing stalls while there are still
> old nodes in the cluster. STRATEGY2 ensures that all messages can be
> processed by all nodes, so there are no stalls, but we can never
> remove fields from the message, so if there are a lot of revisions in
> the future, the payloads will become bloated. Also, it's not clear
> that you can actually pull off STRATEGY2 in all cases. If there's some
> new kind of message you want to send that has no way to be correctly
> processed at all under the 2.4 code paths, the prefix thing simply
> doesn't work. Etc.
>
> Also, note that you can modify the above strategies by instead
> designing the message fields for extensibility. E.g., if you make the
> instructions field an enum, then you can make sure that the default
> case is handled sensibly (probably similarly to STRATEGY1, just choke
> on unknown instructions) and that you never remove an instruction type
> from the enum in future versions.
>
> Does this make sense?
> -John
>
>
>
>
> PS:
> We can define null keys for streaming tables, but it's tricky.
>
> Specifically, you'd want to define some concept of null keys that
> allows all null keys to be unique, but _also_ to have a fixed
> identity, so that a particular null-key can be updated later. One
> example could be to union the existing keyspace with a new
> null-keyspace, where normal keys are like "key" and null-keys are like
> "null(identity)". Then given a query like
> "KTable.rightJoin(KTable)", and
> inputs like:
> LHS:
> "a": 1
> "b": 2
>
> RHS:
> 1: true
> 3: false
>
> a full outer join would produce:
> "a": (1, true)
> "b": (2, null)
> null(3): (null, false)
>
> which can be correctly updated later if we get an update on the LHS:
> PUT("c": 3)
>
> We'd emit for the results:
> DELETE(null(e))
> EMIT("c": (3, false))
>
> Resulting in the correct result table of:
> "a": (1, true)
> "b": (2, null)
> "c": (3, false)
>
> As mentioned, this is tricky, and I would avoid it until we have
> evidence that it's actually useful to cover this part of the design
> space. Certainly, it would be a separate KIP if it came to that.
>
> On Wed, Jun 26, 2019 at 8:57 PM Adam Bellemare 
> wrote:
> >
> > Hi John
> >
> > Good thinking with regards to upgrade path between versions regarding
> > over-the-wire instructions in SubscriptionWrapper. At this point in time
> I
> > can't think of any new wire message instructions, but I would appreciate
> as
> > many eyes on it as possible. I have just included the LEFT join in the
> last
> > commit (about 10 min ago) along with INNER join. I do not think that
> RIGHT
> > join and OUTER are possible given that there is no LHS key available, so
> > LHSTable.outerJoinOnForeignKey(RHSTable) wouldn't even make sense. This
> is
> > in contrast to the current LHSTable.outerJoin(RHSTable), as they are both
> > keyed on the same key. I have buffed up the Integration tests and have
> > tried to make them more readable to ensure that we're covering all the
> > scenarios. I think that if we can get more eyes on the workflow showing
> the
> > various LHS and RHS events and outputs then that may help us validate
> that
> > we have all the scenarios covered.
> >
> > With regards to the 2.3->2.4 scenario you described, I'm not entirely
> sure
> > I follow. If they want to add a FK-join, they will need to rework their
> > code in the KStreams app and make a new release, since the underlying
> > topology would be different and new internal topics would need to be
> > created. In other words, I don't think a rolling upgrade where the user
> > introduces a FK join would be possible since th

Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-06-26 Thread Adam Bellemare
Hi John

Good thinking with regards to upgrade path between versions regarding
over-the-wire instructions in SubscriptionWrapper. At this point in time I
can't think of any new wire message instructions, but I would appreciate as
many eyes on it as possible. I have just included the LEFT join in the last
commit (about 10 min ago) along with INNER join. I do not think that RIGHT
join and OUTER are possible given that there is no LHS key available, so
LHSTable.outerJoinOnForeignKey(RHSTable) wouldn't even make sense. This is
in contrast to the current LHSTable.outerJoin(RHSTable), as they are both
keyed on the same key. I have buffed up the Integration tests and have
tried to make them more readable to ensure that we're covering all the
scenarios. I think that if we can get more eyes on the workflow showing the
various LHS and RHS events and outputs then that may help us validate that
we have all the scenarios covered.

With regards to the 2.3->2.4 scenario you described, I'm not entirely sure
I follow. If they want to add a FK-join, they will need to rework their
code in the KStreams app and make a new release, since the underlying
topology would be different and new internal topics would need to be
created. In other words, I don't think a rolling upgrade where the user
introduces a FK join would be possible since their topology would
necessitate a full KStreams reset. Is this what you meant?



On Wed, Jun 26, 2019 at 4:10 PM John Roesler  wrote:

> Thanks, Adam!
>
> One unrelated thought that has just now occurred to me is that (unlike
> the equi-joins we currently have), this join logic is potentially
> spread over multiple Streams instances, which in general means that
> the instances may be running different versions of Kafka Streams.
>
> This means that if we discover a bug that requires us to again change
> the wire message (as you did in this proposal update), we need to
> consider what should happen if the PK instance is newer than the FK
> instance, or vice-versa, during a rolling upgrade. We should think
> ahead to this condition and make sure the logic is forward compatible.
>
> Related: what about the initial case, when we release this feature
> (let's say in 2.4)? What will happen if I decide to adopt 2.4 and add
> a FK join together in one upgrade. Thus, the 2.4 member of the cluster
> is producing the SubscriptionWrapper messages, and some 2.3 members
> get the subscription topic assigned to them, but they have no idea
> what to do with it? I'm not sure this is a problem; hopefully they
> just do nothing. If it is a problem, it would be fine to say you have
> to upgrade completely to 2.4 before deploying a FK join.
>
> Just want to make sure we anticipate these issues in case it affects
> the design at all.
>
> Thanks,
> -John
>
> On Wed, Jun 26, 2019 at 2:38 PM Adam Bellemare 
> wrote:
> >
> > Sigh... Forgot the link:
> >
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836=78=74
> >
> > I'll update it when I validate that there are no issues with removing the
> > SubscriptionResponseWrapper boolean.
> >
> > On Wed, Jun 26, 2019 at 3:37 PM Adam Bellemare  >
> > wrote:
> >
> > > >Maybe just call it as (k, leftval, null) or (k, null, rightval)?
> > > Done.
> > >
> > > > if you update the KIP, you might want to send a new "diff link" to
> this
> > > thread
> > > Here it is:
> > >
> > > > Looking closely at the proposal, can you explain more about the
> > > propagateIfNull field in SubscriptionResponseWrapper? It sort of looks
> like
> > > it's always going to be equal to (RHS-result != null).
> > > I believe you are correct, and I missed the forest for the trees. They
> are
> > > effectively the same thing, and I can simply remove the flag. I will
> code
> > > it up and try it out locally just to be sure.
> > >
> > > Thanks again for your help, it is greatly appreciated!
> > >
> > > On Wed, Jun 26, 2019 at 2:54 PM John Roesler 
> wrote:
> > >
> > >> I think the "scenario trace" is very nice, but has one point that I
> > >> found confusing:
> > >>
> > >> You indicate a retraction in the join output as (k,null) and a join
> > >> result as (k, leftval, rightval), but confusingly, you also write a
> > >> join result as (k, JoinResult) when one side is null. Maybe just call
> > >> it as (k, leftval, null) or (k, null, rightval)? That way the readers
> > >> can more easily determine if the results meet their expectations for
> > >> each join type.
> > >>
> > >> (p

Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-06-26 Thread Adam Bellemare
Sigh... Forgot the link:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836=78=74

I'll update it when I validate that there are no issues with removing the
SubscriptionResponseWrapper boolean.

On Wed, Jun 26, 2019 at 3:37 PM Adam Bellemare 
wrote:

> >Maybe just call it as (k, leftval, null) or (k, null, rightval)?
> Done.
>
> > if you update the KIP, you might want to send a new "diff link" to this
> thread
> Here it is:
>
> > Looking closely at the proposal, can you explain more about the
> propagateIfNull field in SubscriptionResponseWrapper? It sort of looks like
> it's always going to be equal to (RHS-result != null).
> I believe you are correct, and I missed the forest for the trees. They are
> effectively the same thing, and I can simply remove the flag. I will code
> it up and try it out locally just to be sure.
>
> Thanks again for your help, it is greatly appreciated!
>
> On Wed, Jun 26, 2019 at 2:54 PM John Roesler  wrote:
>
>> I think the "scenario trace" is very nice, but has one point that I
>> found confusing:
>>
>> You indicate a retraction in the join output as (k,null) and a join
>> result as (k, leftval, rightval), but confusingly, you also write a
>> join result as (k, JoinResult) when one side is null. Maybe just call
>> it as (k, leftval, null) or (k, null, rightval)? That way the readers
>> can more easily determine if the results meet their expectations for
>> each join type.
>>
>> (procedural note: if you update the KIP, you might want to send a new
>> "diff link" to this thread, since the one I posted at the beginning
>> would not automatically show your latest changes)
>>
>> I was initially concerned that the proposed algorithm would wind up
>> propagating something that looks like a left join (k, leftval, null)
>> under the case that Joe pointed out, but after reviewing your
>> scenario, I see that it will emit a tombstone (k, null) instead. This
>> is appropriate, and unavoidable, since we have to retract the join
>> result from the logical view (the join result is a logical Table).
>>
>> Looking closely at the proposal, can you explain more about the
>> propagateIfNull field in SubscriptionResponseWrapper?
>> It sort of looks like it's always going to be equal to (RHS-result !=
>> null).
>>
>> In other words, can we drop that field and just send back RHS-result
>> or null, and then handle it on the left-hand side like:
>> if (rhsOriginalValueHash doesn't match) {
>> emit nothing, just drop the update
>> } else if (joinType==inner && rhsValue == null) {
>> emit tombstone
>> } else {
>> emit joiner(lhsValue, rhsValue)
>> }
>>
>> To your concern about emitting extra tombstones, personally, I think
>> it's fine. Clearly, we should try to avoid unnecessary tombstones, but
>> all things considered, it's not harmful to emit some unnecessary
>> tombstones: their payload is small, and they are trivial to handle
>> downstream. If users want to, they can materialize the join result to
>> suppress any extra tombstones, so there's a way out.
>>
>> Thanks for the awesome idea. It's better than what I was thinking.
>> -john
>>
>> On Wed, Jun 26, 2019 at 11:37 AM Adam Bellemare
>>  wrote:
>> >
>> > Thanks John.
>> >
>> > I'm looking forward to any feedback on this. In the meantime I will
>> work on
>> > the unit tests to ensure that we have well-defined and readable
>> coverage.
>> >
>> > At the moment I cannot see a way around emitting (k,null) whenever we
>> emit
>> > an event that lacks a matching foreign key on the RHS, except in the
>> > (k,null) -> (k,fk) case.
>> > If this LHS oldValue=null, we know we would have emitted a deletion and
>> so
>> > (k,null) would be emitted out of the join. In this case we don't need to
>> > send another null.
>> >
>> > Adam
>> >
>> > On Wed, Jun 26, 2019 at 11:53 AM John Roesler 
>> wrote:
>> >
>> > > Hi Adam,
>> > >
>> > > Thanks for the proposed revision to your KIP
>> > > (
>> > >
>> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836=77=74
>> > > )
>> > >
>> > > in response to the concern pointed out during code review
>> > > (https://github.com/apache/kafka/pull/5527#issuecomment-505137962)
>> > >
>> > > We should have a brief discussion thread (here) in the mailing list to
>> > > make sure everyone who wants to gets a chance to consider the
>> > > modification to the design.
>> > >
>> > > Thanks,
>> > > -John
>> > >
>>
>


Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-06-26 Thread Adam Bellemare
>Maybe just call it as (k, leftval, null) or (k, null, rightval)?
Done.

> if you update the KIP, you might want to send a new "diff link" to this
thread
Here it is:

> Looking closely at the proposal, can you explain more about the
propagateIfNull field in SubscriptionResponseWrapper? It sort of looks like
it's always going to be equal to (RHS-result != null).
I believe you are correct, and I missed the forest for the trees. They are
effectively the same thing, and I can simply remove the flag. I will code
it up and try it out locally just to be sure.

Thanks again for your help, it is greatly appreciated!

On Wed, Jun 26, 2019 at 2:54 PM John Roesler  wrote:

> I think the "scenario trace" is very nice, but has one point that I
> found confusing:
>
> You indicate a retraction in the join output as (k,null) and a join
> result as (k, leftval, rightval), but confusingly, you also write a
> join result as (k, JoinResult) when one side is null. Maybe just call
> it as (k, leftval, null) or (k, null, rightval)? That way the readers
> can more easily determine if the results meet their expectations for
> each join type.
>
> (procedural note: if you update the KIP, you might want to send a new
> "diff link" to this thread, since the one I posted at the beginning
> would not automatically show your latest changes)
>
> I was initially concerned that the proposed algorithm would wind up
> propagating something that looks like a left join (k, leftval, null)
> under the case that Joe pointed out, but after reviewing your
> scenario, I see that it will emit a tombstone (k, null) instead. This
> is appropriate, and unavoidable, since we have to retract the join
> result from the logical view (the join result is a logical Table).
>
> Looking closely at the proposal, can you explain more about the
> propagateIfNull field in SubscriptionResponseWrapper?
> It sort of looks like it's always going to be equal to (RHS-result !=
> null).
>
> In other words, can we drop that field and just send back RHS-result
> or null, and then handle it on the left-hand side like:
> if (rhsOriginalValueHash doesn't match) {
> emit nothing, just drop the update
> } else if (joinType==inner && rhsValue == null) {
> emit tombstone
> } else {
> emit joiner(lhsValue, rhsValue)
> }
>
> To your concern about emitting extra tombstones, personally, I think
> it's fine. Clearly, we should try to avoid unnecessary tombstones, but
> all things considered, it's not harmful to emit some unnecessary
> tombstones: their payload is small, and they are trivial to handle
> downstream. If users want to, they can materialize the join result to
> suppress any extra tombstones, so there's a way out.
>
> Thanks for the awesome idea. It's better than what I was thinking.
> -john
>
> On Wed, Jun 26, 2019 at 11:37 AM Adam Bellemare
>  wrote:
> >
> > Thanks John.
> >
> > I'm looking forward to any feedback on this. In the meantime I will work
> on
> > the unit tests to ensure that we have well-defined and readable coverage.
> >
> > At the moment I cannot see a way around emitting (k,null) whenever we
> emit
> > an event that lacks a matching foreign key on the RHS, except in the
> > (k,null) -> (k,fk) case.
> > If this LHS oldValue=null, we know we would have emitted a deletion and
> so
> > (k,null) would be emitted out of the join. In this case we don't need to
> > send another null.
> >
> > Adam
> >
> > On Wed, Jun 26, 2019 at 11:53 AM John Roesler  wrote:
> >
> > > Hi Adam,
> > >
> > > Thanks for the proposed revision to your KIP
> > > (
> > >
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836=77=74
> > > )
> > >
> > > in response to the concern pointed out during code review
> > > (https://github.com/apache/kafka/pull/5527#issuecomment-505137962)
> > >
> > > We should have a brief discussion thread (here) in the mailing list to
> > > make sure everyone who wants to gets a chance to consider the
> > > modification to the design.
> > >
> > > Thanks,
> > > -John
> > >
>


Re: [DISCUSS] KIP-213: Second follow-up on Foreign Key Joins

2019-06-26 Thread Adam Bellemare
Thanks John.

I'm looking forward to any feedback on this. In the meantime I will work on
the unit tests to ensure that we have well-defined and readable coverage.

At the moment I cannot see a way around emitting (k,null) whenever we emit
an event that lacks a matching foreign key on the RHS, except in the
(k,null) -> (k,fk) case.
If this LHS oldValue=null, we know we would have emitted a deletion and so
(k,null) would be emitted out of the join. In this case we don't need to
send another null.

Adam

On Wed, Jun 26, 2019 at 11:53 AM John Roesler  wrote:

> Hi Adam,
>
> Thanks for the proposed revision to your KIP
> (
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=74684836=77=74
> )
>
> in response to the concern pointed out during code review
> (https://github.com/apache/kafka/pull/5527#issuecomment-505137962)
>
> We should have a brief discussion thread (here) in the mailing list to
> make sure everyone who wants to gets a chance to consider the
> modification to the design.
>
> Thanks,
> -John
>


Re: [RESULT] [VOTE] 2.3.0 RC3

2019-06-25 Thread Adam Bellemare
Thanks for the hard work Colin, and everyone else who helped get this out!

On Mon, Jun 24, 2019 at 1:38 PM Colin McCabe  wrote:

> Hi all,
>
> This vote passes with 6 +1 votes (3 of which are binding) and no 0 or -1
> votes.  Thanks to everyone who voted.
>
> +1 votes
> PMC Members:
> * Ismael Juma
> * Guozhang Wang
> * Gwen Shapira
>
> Community:
> * Kamal Chandraprakash
> * Jakub Scholz
> * Mickael Maison
>
> 0 votes
> * No votes
>
> -1 votes
> * No votes
>
> Vote thread:
> https://www.mail-archive.com/dev@kafka.apache.org/msg98814.html
>
> I'll continue with the release process and the release announcement will
> follow in the next few days.
>
> thanks,
> Colin
>
>
> On Mon, Jun 24, 2019, at 01:17, Mickael Maison wrote:
> > Thanks Colin for making a new RC for KAFKA-8564.
> > +1 (non binding)
> > I checked signatures and ran quickstart on the 2.12 binary
> >
> > On Mon, Jun 24, 2019 at 6:03 AM Gwen Shapira  wrote:
> > >
> > > +1 (binding)
> > > Verified signatures, verified good build on jenkins, built from
> > > sources anyway and ran quickstart on the 2.11 binary.
> > >
> > > Looks good!
> > >
> > > On Sun, Jun 23, 2019 at 3:06 PM Jakub Scholz  wrote:
> > > >
> > > > +1 (non-binding). I used the binaries and run some of my tests
> against them.
> > > >
> > > > On Thu, Jun 20, 2019 at 12:03 AM Colin McCabe 
> wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > We discovered some problems with the second release candidate
> (RC2) of
> > > > > 2.3.0.  Specifically, KAFKA-8564.  I've created a new RC which
> includes the
> > > > > fix for this issue.
> > > > >
> > > > > Check out the release notes for the 2.3.0 release here:
> > > > >
> https://home.apache.org/~cmccabe/kafka-2.3.0-rc3/RELEASE_NOTES.html
> > > > >
> > > > > The vote will go until Saturday, June 22nd, or until we create
> another RC.
> > > > >
> > > > > * Kafka's KEYS file containing PGP keys we use to sign the release
> can be
> > > > > found here:
> > > > > https://kafka.apache.org/KEYS
> > > > >
> > > > > * The release artifacts to be voted upon (source and binary) are
> here:
> > > > > https://home.apache.org/~cmccabe/kafka-2.3.0-rc3/
> > > > >
> > > > > * Maven artifacts to be voted upon:
> > > > >
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
> > > > >
> > > > > * Javadoc:
> > > > > https://home.apache.org/~cmccabe/kafka-2.3.0-rc3/javadoc/
> > > > >
> > > > > * The tag to be voted upon (off the 2.3 branch) is the 2.3.0 tag:
> > > > > https://github.com/apache/kafka/releases/tag/2.3.0-rc3
> > > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > > C.
> > > > >
> > >
> > >
> > >
> > > --
> > > Gwen Shapira
> > > Product Manager | Confluent
> > > 650.450.2760 | @gwenshap
> > > Follow us: Twitter | blog
> >
>


Re: [DISCUSS] KIP-360: Improve handling of unknown producer

2019-03-25 Thread Adam Bellemare
Ach - Sorry. I meant Jason. I had just read a John Roesler email.

On Mon, Mar 25, 2019 at 5:21 PM Adam Bellemare 
wrote:

> Hi John
>
> What is the status of this KIP?
>
> My teammates and I are running into the "UNKNOWN_PRODUCER_ID" error on
> 2.1.1 for a multitude of our internal topics, and I suspect that a proper
> fix is needed.
>
> Adam
>
> On Mon, Jan 7, 2019 at 7:42 PM Guozhang Wang  wrote:
>
>> Thanks Jason. The proposed solution sounds good to me.
>>
>>
>> Guozhang
>>
>> On Mon, Jan 7, 2019 at 3:52 PM Jason Gustafson 
>> wrote:
>>
>> > Hey Guozhang,
>> >
>> > Thanks for sharing the article. The INVALID_PRODUCER_ID_MAPPING error
>> > occurs following expiration of the producerId. It's possible that
>> another
>> > producerId has been installed in its place following expiration (if
>> another
>> > producer instance has become active), or the mapping is empty. We can
>> > safely retry the InitProducerId with the logic in this KIP in order to
>> > detect which case it is. So I'd suggest something like this:
>> >
>> > 1. After receiving INVALID_PRODUCER_ID_MAPPING, the producer can send
>> > InitProducerId using the current producerId and epoch.
>> > 2. If no mapping exists, the coordinator can generate a new producerId
>> and
>> > return it. If a transaction is in progress on the client, it will have
>> to
>> > be aborted, but the producer can continue afterwards.
>> > 3. Otherwise if a different producerId has been assigned, then we can
>> > return INVALID_PRODUCER_ID_MAPPING. To simplify error handling, we can
>> > probably raise this as ProducerFencedException since that is effectively
>> > what has happened. Ideally this is the only fatal case that users have
>> to
>> > handle.
>> >
>> > I'll give it a little more thought and update the KIP.
>> >
>> > Thanks,
>> > Jason
>> >
>> > On Thu, Jan 3, 2019 at 1:38 PM Guozhang Wang 
>> wrote:
>> >
>> > > You're right about the dangling txn since it will actually block
>> > > read-committed consumers from proceeding at all. I'd agree that since
>> > this
>> > > is a very rare case, we can consider fixing it not via broker-side
>> logic
>> > > but via tooling in a future work.
>> > >
>> > > I've also discovered some related error handling logic inside producer
>> > that
>> > > may be addressed together with this KIP (since it is mostly for
>> internal
>> > > implementations the wiki itself does not need to be modified):
>> > >
>> > >
>> > >
>> >
>> https://stackoverflow.com/questions/53976117/why-did-the-kafka-stream-fail-to-produce-data-after-a-long-time/54029181#54029181
>> > >
>> > > Guozhang
>> > >
>> > >
>> > >
>> > > On Thu, Nov 29, 2018 at 2:25 PM Jason Gustafson 
>> > > wrote:
>> > >
>> > > > Hey Guozhang,
>> > > >
>> > > > To clarify, the broker does not actually use the ApiVersion API for
>> > > > inter-broker communications. The use of an API and its corresponding
>> > > > version is controlled by `inter.broker.protocol.version`.
>> > > >
>> > > > Nevertheless, it sounds like we're on the same page about removing
>> > > > DescribeTransactionState. The impact of a dangling transaction is a
>> > > little
>> > > > worse than what you describe though. Consumers with the
>> read_committed
>> > > > isolation level will be stuck. Still, I think we agree that this
>> case
>> > > > should be rare and we can reconsider for future work. Rather than
>> > > > preventing dangling transactions, perhaps we should consider options
>> > > which
>> > > > allows us to detect them and recover. Anyway, this needs more
>> thought.
>> > I
>> > > > will update the KIP.
>> > > >
>> > > > Best,
>> > > > Jason
>> > > >
>> > > > On Tue, Nov 27, 2018 at 6:51 PM Guozhang Wang 
>> > > wrote:
>> > > >
>> > > > > 0. My original question is about the implementation details
>> > primarily,
>> > > > > since current the handling logic of the APIVersionResponse is
>> simply
>> > > "use
>> > > > > the 

Re: [DISCUSS] KIP-360: Improve handling of unknown producer

2019-03-25 Thread Adam Bellemare
Hi John

What is the status of this KIP?

My teammates and I are running into the "UNKNOWN_PRODUCER_ID" error on
2.1.1 for a multitude of our internal topics, and I suspect that a proper
fix is needed.

Adam

On Mon, Jan 7, 2019 at 7:42 PM Guozhang Wang  wrote:

> Thanks Jason. The proposed solution sounds good to me.
>
>
> Guozhang
>
> On Mon, Jan 7, 2019 at 3:52 PM Jason Gustafson  wrote:
>
> > Hey Guozhang,
> >
> > Thanks for sharing the article. The INVALID_PRODUCER_ID_MAPPING error
> > occurs following expiration of the producerId. It's possible that another
> > producerId has been installed in its place following expiration (if
> another
> > producer instance has become active), or the mapping is empty. We can
> > safely retry the InitProducerId with the logic in this KIP in order to
> > detect which case it is. So I'd suggest something like this:
> >
> > 1. After receiving INVALID_PRODUCER_ID_MAPPING, the producer can send
> > InitProducerId using the current producerId and epoch.
> > 2. If no mapping exists, the coordinator can generate a new producerId
> and
> > return it. If a transaction is in progress on the client, it will have to
> > be aborted, but the producer can continue afterwards.
> > 3. Otherwise if a different producerId has been assigned, then we can
> > return INVALID_PRODUCER_ID_MAPPING. To simplify error handling, we can
> > probably raise this as ProducerFencedException since that is effectively
> > what has happened. Ideally this is the only fatal case that users have to
> > handle.
> >
> > I'll give it a little more thought and update the KIP.
> >
> > Thanks,
> > Jason
> >
> > On Thu, Jan 3, 2019 at 1:38 PM Guozhang Wang  wrote:
> >
> > > You're right about the dangling txn since it will actually block
> > > read-committed consumers from proceeding at all. I'd agree that since
> > this
> > > is a very rare case, we can consider fixing it not via broker-side
> logic
> > > but via tooling in a future work.
> > >
> > > I've also discovered some related error handling logic inside producer
> > that
> > > may be addressed together with this KIP (since it is mostly for
> internal
> > > implementations the wiki itself does not need to be modified):
> > >
> > >
> > >
> >
> https://stackoverflow.com/questions/53976117/why-did-the-kafka-stream-fail-to-produce-data-after-a-long-time/54029181#54029181
> > >
> > > Guozhang
> > >
> > >
> > >
> > > On Thu, Nov 29, 2018 at 2:25 PM Jason Gustafson 
> > > wrote:
> > >
> > > > Hey Guozhang,
> > > >
> > > > To clarify, the broker does not actually use the ApiVersion API for
> > > > inter-broker communications. The use of an API and its corresponding
> > > > version is controlled by `inter.broker.protocol.version`.
> > > >
> > > > Nevertheless, it sounds like we're on the same page about removing
> > > > DescribeTransactionState. The impact of a dangling transaction is a
> > > little
> > > > worse than what you describe though. Consumers with the
> read_committed
> > > > isolation level will be stuck. Still, I think we agree that this case
> > > > should be rare and we can reconsider for future work. Rather than
> > > > preventing dangling transactions, perhaps we should consider options
> > > which
> > > > allows us to detect them and recover. Anyway, this needs more
> thought.
> > I
> > > > will update the KIP.
> > > >
> > > > Best,
> > > > Jason
> > > >
> > > > On Tue, Nov 27, 2018 at 6:51 PM Guozhang Wang 
> > > wrote:
> > > >
> > > > > 0. My original question is about the implementation details
> > primarily,
> > > > > since current the handling logic of the APIVersionResponse is
> simply
> > > "use
> > > > > the highest supported version of the corresponding request", but if
> > the
> > > > > returned response from APIVersionRequest says "I don't even know
> > about
> > > > the
> > > > > DescribeTransactionStateRequest at all", then we need additional
> > logic
> > > > for
> > > > > the falling back logic. Currently this logic is embedded in
> > > NetworkClient
> > > > > which is shared by all clients, so I'd like to avoid making this
> > logic
> > > > more
> > > > > complicated.
> > > > >
> > > > > As for the general issue that a broker does not recognize a
> producer
> > > with
> > > > > sequence number 0, here's my thinking: as you mentioned in the
> wiki,
> > > this
> > > > > is only a concern for transactional producer since for idempotent
> > > > producer
> > > > > it can just bump the epoch and go. For transactional producer, even
> > if
> > > > the
> > > > > producer request from a fenced producer gets accepted, its
> > transaction
> > > > will
> > > > > never be committed and hence messages not exposed to read-committed
> > > > > consumers as well. The drawback is though, 1) read-uncommitted
> > > consumers
> > > > > will still read those messages, 2) unnecessary storage for those
> > fenced
> > > > > produce messages, but in practice should not accumulate to a large
> > > amount
> > > > > since producer should soon try to commit 

Re: Question on performance data for Kafka vs NATS

2019-03-22 Thread Adam Bellemare
One more thing to note:

You are looking at regular, base NATS. On its own, it is not a direct 1-1
comparison to Kafka because it lacks things like data retention, clustering
and replication. Instead, you would want to compare it to NATS-Streaming, (
https://github.com/nats-io/nats-streaming-server ). You can find a number
of more recent articles and comparisons by a simple web search.

With that being said, this is likely not the best venue for an in-depth
discussion on tradeoffs between the two (especially since I see you're
spanning two very large mailing lists).

Adam




On Fri, Mar 22, 2019 at 1:34 AM Hans Jespersen  wrote:

> Thats a 4.5 year old benchmark and it was run with a single broker node
> and only 1 producer and 1 consumer all running on a single MacBookPro.
> Definitely not the target production environment for Kafka.
>
> -hans
>
> > On Mar 21, 2019, at 11:43 AM, M. Manna  wrote:
> >
> > HI All,
> >
> > https://nats.io/about/
> >
> > this shows a general comparison of sender/receiver throughputs for NATS
> and
> > other messaging system including our favourite Kafka.
> >
> > It appears that Kafka, despite taking the 2nd place, has a very low
> > throughput. My question is, where does Kafka win over NATS? is it the
> > unique partitioning and delivery semantics? Or, is it something else.
> >
> > From what I can see, NATS has traditional pub/sub and queuing. But it
> > doesn't look like there is any proper retention system built for this.
> >
> > Has anyone come across this already?
> >
> > Thanks,
>


Re: KIP-213- [DISCUSS] - Three follow-up discussion points - topic partitioning, serializers, hashers

2019-03-19 Thread Adam Bellemare
Thanks John & Matthias. I have created a report with Confluent (
https://github.com/confluentinc/schema-registry/issues/1061).

I will continue on with current work and we can resume the discussion, as
Matthias correctly indicates, in the PR. Matthias, thank you for the link
to Kafka-. This is something that my team has also come across, and I
may be interested in pursuing a KIP on that once this one is completed.

Thank you both again for your insight.

On Tue, Mar 19, 2019 at 2:19 PM John Roesler  wrote:

> Chiming in...
>
> 1) Agreed. There is a technical reason 1:1 joins have to be co-partitioned,
> which does not apply to the many:1 join you've designed.
>
> 2) Looking at the Serializer interface, it unfortunately doesn't indicate
> whether the topic (or the value) is nullable. There are several places in
> Streams where we need to serialize a value for purposes other than sending
> it to a topic (KTableSuppressProcessor comes to mind), and using `null` for
> the topic is the convention we have. I think we should just use `null` for
> this case as well. Since we're doing this already, maybe we should document
> in the Serializer interface which parameters are nullable.
>
> It sounds like you're using the Confluent serde, and need it to support
> this usage. I'd recommend you just send a PR to that project independently.
>
> On Mon, Mar 18, 2019 at 7:13 PM Matthias J. Sax 
> wrote:
>
> > Just my 2 cents. Not sure if others see it differently:
> >
> > 1) it seems that we can lift the restriction on having the same number
> > of input topic partitions, and thus we should exploit this IMHO; don't
> > see why we should enforce an artificial restriction
> >
> >
> > 2) for the value serde it's a little bit more tricky; in general, Apache
> > Kafka should not be concerned with third party tools. It seems that
> > https://issues.apache.org/jira/browse/KAFKA- might provide a
> > solution though -- but it's unclear if KIP-213 and  would be shipped
> > with the same release...
> >
> > > To me, this is a shortcoming of the Confluent Avro Serde
> > >> that will likely need to be fixed on that side.
> >
> > I agree (good to know...)
> >
> >
> > 3) I am not an expert on hashing, but 128-bit murmur3 sounds reasonable
> > to me
> >
> >
> >
> > Btw: I think we can have this discussion on the PR -- no need to concern
> > the mailing list (it's a lot of people that are subscribed).
> >
> >
> >
> > -Matthias
> >
> > On 3/17/19 5:20 PM, Adam Bellemare wrote:
> > > Hey folks
> > >
> > > I have been implementing the KIP as outlined in
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable
> > ,
> > > and I have run into a few points to consider that we did not include in
> > the
> > > original.
> > >
> > > *1) Do all input topics need to have the same partitions or not?*
> > Currently
> > > I have it designed such that it must, to be consistent with other
> joins.
> > > However, consider the following:
> > > TableA - 5 partitions
> > > TableB - 2 partitions
> > > Pre-subscribe Repartition Topic = 2 partitions, 2 RHS processors
> > > Post-Subcribe Repartition Topic = 5 partitions, 5 LHS processors
> > >
> > > Would this not be possible? Is there a value in flexibility to this? I
> > have
> > > not looked deeper into the restrictions of this approach, so if there
> is
> > > something I should know I would appreciate a heads up.
> > >
> > > *2) Is it appropriate to use the KTable valueSerde during the
> computation
> > > of the hash?* To compute the hash I need to obtain an array of bytes,
> > which
> > > is immediately possible by  using the valueSerde. However, the
> Confluent
> > > Kafka Schema Registry serializer fails when it is being used in this
> way:
> > > In the hash generating code, I set topic to null because the data is
> not
> > > dependent on any topic value. I simply want the serialized bytes to
> input
> > > into the hash function.
> > > *byte[] preHashValue = serializer.serialize(topic = null, data)*
> > >
> > > Any KTable that is Consumed.with(Confluent-Key-Serde,
> > > Confluent-Value-Serde) will automatically try to register the schema to
> > > topic+"-key" and topic+"-value". If I pass in null, it tries to
> register
> > to
> > > "-key" and "-value" each time the serializer is called, regardless

KIP-213- [DISCUSS] - Three follow-up discussion points - topic partitioning, serializers, hashers

2019-03-17 Thread Adam Bellemare
Hey folks

I have been implementing the KIP as outlined in
https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable,
and I have run into a few points to consider that we did not include in the
original.

*1) Do all input topics need to have the same partitions or not?* Currently
I have it designed such that it must, to be consistent with other joins.
However, consider the following:
TableA - 5 partitions
TableB - 2 partitions
Pre-subscribe Repartition Topic = 2 partitions, 2 RHS processors
Post-Subcribe Repartition Topic = 5 partitions, 5 LHS processors

Would this not be possible? Is there a value in flexibility to this? I have
not looked deeper into the restrictions of this approach, so if there is
something I should know I would appreciate a heads up.

*2) Is it appropriate to use the KTable valueSerde during the computation
of the hash?* To compute the hash I need to obtain an array of bytes, which
is immediately possible by  using the valueSerde. However, the Confluent
Kafka Schema Registry serializer fails when it is being used in this way:
In the hash generating code, I set topic to null because the data is not
dependent on any topic value. I simply want the serialized bytes to input
into the hash function.
*byte[] preHashValue = serializer.serialize(topic = null, data)*

Any KTable that is Consumed.with(Confluent-Key-Serde,
Confluent-Value-Serde) will automatically try to register the schema to
topic+"-key" and topic+"-value". If I pass in null, it tries to register to
"-key" and "-value" each time the serializer is called, regardless of the
class. In other words, it registers the schemas to a null topic and fails
any subsequent serializations that aren't of the exact same schema. Note
that this would be the case across ALL applications using the confluent
schema registry. To me, this is a shortcoming of the Confluent Avro Serde
that will likely need to be fixed on that side. However, it does bring up
the question - is this an appropriate way to use a serializer? Alternately,
if I should NOT use the KTable value-serde to generate the byte array, does
anyone have a better idea?

*3) How big of a hash value do we need? Does the Foreign Key even matter
for resolving?*
I am currently looking at fast, non-cryptologically-secure hash options. We
use murmur2 already in Kafka, but it is only 32 bits. I have been looking
at murmur3hash as implemented in the Apache Hive project (
https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/Murmur3.java)
- it supports 128-bit hashes and is allegedly more performant than MD5.
With a 128-bit hash. The birthday problem indicates that we would have a
50% chance of a collision with 2^64 = 1.8446744e+19 entries. I believe that
this is sufficiently small, especially for our narrow time window, to
expect a collision for a singly-keyed event. I think that there is no
benefit in including the foreign key, but again, please let me know if this
is wrong.


Thanks All


Re: [DISCUSS] KIP-437: Custom replacement for MaskField SMT

2019-03-16 Thread Adam Bellemare
Hi Valeria

I am thinking that a full map function via configuration is very unlikely
to be feasible. At that point, it would be best for the user to create
their own custom transformation.

I think that since your function is indeed just an extension of masking
that it is reasonable as presented. I don't have any other concerns with
the proposal, but it would be good to hear from others.

Thanks


On Fri, Mar 15, 2019 at 11:38 AM Valeria Vasylieva <
valeria.vasyli...@gmail.com> wrote:

> Hi Adam,
>
> Thank you for your interest. Here is the list of currently supported
> transformations in Connect:
> https://kafka.apache.org/documentation/#connect_transforms.
> As I can see, there is no "map" transformation in this list and all other
> SMTs do not support functionality described in a KIP.
> I cannot find the way to achieve the same result using existing
> transformations.
> The request, described in an issue was just to add this custom masking
> functionality to the MaskField SMT, but if there is a need we can evolve
> this issue and create separate "map" transformation,
> it may be more useful but will require more effort, so it is better to do
> it as separate issue.
>
> Kind Regards,
> Valeria
>
> пт, 15 мар. 2019 г. в 17:35, Adam Bellemare :
>
> > Hi Valeria
> >
> > Thanks for the KIP. I admit my knowledge on Kafka Connect transforms is a
> > bit rusty, however - Is there any other way to currently achieve this
> same
> > functionality outlined in your KIP using existing transforms?
> >
> >
> > Thanks
> >
> >
> >
> >
> >
> >
> >
> >
> > On Thu, Mar 14, 2019 at 12:05 PM Valeria Vasylieva <
> > valeria.vasyli...@gmail.com> wrote:
> >
> > > Dear community members,
> > >
> > > I would be very grateful if you leave any feedback on this KIP. It will
> > > help me to understand if change is useful or not and to decide on
> further
> > > actions.
> > >
> > > Thank you in advance,
> > > Valeria
> > >
> > > пн, 11 мар. 2019 г. в 13:20, Valeria Vasylieva <
> > > valeria.vasyli...@gmail.com
> > > >:
> > >
> > > > Hi All,
> > > >
> > > > I would like to start a discussion about adding new functionality to
> > > > MaskField SMT. The existing implementation allows to mask out any
> field
> > > > value with the null equivalent of the field type.
> > > >
> > > > I suggest to add a possibility to provide a literal replacement for
> the
> > > > field. This way you can mask out any PII info (IP, SSN etc.) with any
> > > > custom replacement.
> > > >
> > > > It is a short KIP which does not require major changes, but could
> help
> > to
> > > > make this transformation more useful for the client.
> > > >
> > > > The KIP is here:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-437%3A+Custom+replacement+for+MaskField+SMT
> > > >
> > > > I would be glad to receive any feedback on this KIP.
> > > >
> > > > Kind Regards,
> > > > Valeria
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-437: Custom replacement for MaskField SMT

2019-03-15 Thread Adam Bellemare
Hi Valeria

Thanks for the KIP. I admit my knowledge on Kafka Connect transforms is a
bit rusty, however - Is there any other way to currently achieve this same
functionality outlined in your KIP using existing transforms?


Thanks








On Thu, Mar 14, 2019 at 12:05 PM Valeria Vasylieva <
valeria.vasyli...@gmail.com> wrote:

> Dear community members,
>
> I would be very grateful if you leave any feedback on this KIP. It will
> help me to understand if change is useful or not and to decide on further
> actions.
>
> Thank you in advance,
> Valeria
>
> пн, 11 мар. 2019 г. в 13:20, Valeria Vasylieva <
> valeria.vasyli...@gmail.com
> >:
>
> > Hi All,
> >
> > I would like to start a discussion about adding new functionality to
> > MaskField SMT. The existing implementation allows to mask out any field
> > value with the null equivalent of the field type.
> >
> > I suggest to add a possibility to provide a literal replacement for the
> > field. This way you can mask out any PII info (IP, SSN etc.) with any
> > custom replacement.
> >
> > It is a short KIP which does not require major changes, but could help to
> > make this transformation more useful for the client.
> >
> > The KIP is here:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-437%3A+Custom+replacement+for+MaskField+SMT
> >
> > I would be glad to receive any feedback on this KIP.
> >
> > Kind Regards,
> > Valeria
> >
>


Re: [VOTE] - KIP-213 (new vote) - Simplified and revised.

2019-03-12 Thread Adam Bellemare
Done and done.

I will begin work on bringing the previous PR up to date with the KIP.

Thanks all

Adam

On Tue, Mar 12, 2019 at 1:17 AM Guozhang Wang  wrote:

> Hello Adam,
>
> Thank YOU for all the patience going through this process!
>
> Could you update the KIP pages (including the parent page) with the newest
> status as well? Thanks.
>
>
> Guozhang
>
> On Mon, Mar 11, 2019 at 6:12 PM Adam Bellemare 
> wrote:
>
> > Thanks to everyone, including John, Guozhang, Matthias and Jan for all
> the
> > help!
> >
> > I do believe that makes 3 binding and 3 non-binding, vote passes.
> >
> > Thanks folks
> >
> > > On Mar 11, 2019, at 5:02 PM, Matthias J. Sax 
> > wrote:
> > >
> > > Thanks a lot for putting this together Adam and Jan!
> > >
> > > +1 (binding)
> > >
> > >
> > >
> > > -Matthias
> > >
> > >> On 2/15/19 11:27 AM, Adam Bellemare wrote:
> > >> Hi Bill
> > >>
> > >> Now that you are a committer, does your vote add a +1 to binding? Can
> > you recast it if you believe this is a sound decision? I am eager to
> > finally finish up this KIP that has been open so long.
> > >>
> > >> Thanks
> > >>
> > >> Adam
> > >>
> > >>> On Feb 15, 2019, at 12:50 PM, Matthias J. Sax  >
> > wrote:
> > >>>
> > >>> I think, he needs to re-cast his vote.
> > >>>
> > >>> -Matthias
> > >>>
> > >>>> On 2/15/19 5:49 AM, Adam Bellemare wrote:
> > >>>> Hi all
> > >>>>
> > >>>> Since Bill is now a committer, the vote is changed to 3 binding and
> 3
> > non-binding (unless I am somehow mistaken - please let me know!). In this
> > case, I believe the vote passes.
> > >>>>
> > >>>> Thanks
> > >>>>
> > >>>> Adam
> > >>>>
> > >>>>> On Jan 24, 2019, at 7:28 PM, Adam Bellemare <
> > adam.bellem...@gmail.com> wrote:
> > >>>>>
> > >>>>> Bumping this vote because I don't want it to languish. It is very
> > unlikely to go into 2.2 at this point, but I would like to avoid
> > resurrecting a dead thread in 30 days time.
> > >>>>>
> > >>>>>> On Tue, Jan 15, 2019 at 5:07 PM Adam Bellemare <
> > adam.bellem...@gmail.com> wrote:
> > >>>>>> All good Matthias. If it doesn’t get in for 2.2 I’ll just do it
> for
> > the next release.
> > >>>>>>
> > >>>>>> Thanks
> > >>>>>>
> > >>>>>>> On Jan 15, 2019, at 12:25 PM, Matthias J. Sax <
> > matth...@confluent.io> wrote:
> > >>>>>>>
> > >>>>>>> I'll try to review the KIP before the deadline, but as I am
> acting
> > as
> > >>>>>>> release manager and also working on KIP-258, I cannot promise.
> > Even if
> > >>>>>>> we make the voting deadline, it might also be tight to review the
> > PR, as
> > >>>>>>> it seems to be big and complicated.
> > >>>>>>>
> > >>>>>>> I'll try my very best to get it into 2.2...
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> -Matthias
> > >>>>>>>
> > >>>>>>>> On 1/15/19 3:27 AM, Adam Bellemare wrote:
> > >>>>>>>> If I can get one more binding vote in here, I may be able to get
> > this out
> > >>>>>>>> for the 2.2 release in February.
> > >>>>>>>>
> > >>>>>>>> Currently at:
> > >>>>>>>> 2 binding, 4 non-binding.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>> On Sun, Jan 13, 2019 at 2:41 PM Patrik Kleindl <
> > pklei...@gmail.com> wrote:
> > >>>>>>>>>
> > >>>>>>>>> +1 (non-binding)
> > >>>>>>>>> I have followed the discussion too and think this feature will
> > be very
> > >>>>>>>>> helpful.
> > >>>>>>>>> Th

Re: [VOTE] - KIP-213 (new vote) - Simplified and revised.

2019-03-11 Thread Adam Bellemare
Thanks to everyone, including John, Guozhang, Matthias and Jan for all the help!

I do believe that makes 3 binding and 3 non-binding, vote passes.

Thanks folks 

> On Mar 11, 2019, at 5:02 PM, Matthias J. Sax  wrote:
> 
> Thanks a lot for putting this together Adam and Jan!
> 
> +1 (binding)
> 
> 
> 
> -Matthias
> 
>> On 2/15/19 11:27 AM, Adam Bellemare wrote:
>> Hi Bill 
>> 
>> Now that you are a committer, does your vote add a +1 to binding? Can you 
>> recast it if you believe this is a sound decision? I am eager to finally 
>> finish up this KIP that has been open so long.
>> 
>> Thanks
>> 
>> Adam 
>> 
>>> On Feb 15, 2019, at 12:50 PM, Matthias J. Sax  wrote:
>>> 
>>> I think, he needs to re-cast his vote.
>>> 
>>> -Matthias
>>> 
>>>> On 2/15/19 5:49 AM, Adam Bellemare wrote:
>>>> Hi all
>>>> 
>>>> Since Bill is now a committer, the vote is changed to 3 binding and 3 
>>>> non-binding (unless I am somehow mistaken - please let me know!). In this 
>>>> case, I believe the vote passes.
>>>> 
>>>> Thanks
>>>> 
>>>> Adam 
>>>> 
>>>>> On Jan 24, 2019, at 7:28 PM, Adam Bellemare  
>>>>> wrote:
>>>>> 
>>>>> Bumping this vote because I don't want it to languish. It is very 
>>>>> unlikely to go into 2.2 at this point, but I would like to avoid 
>>>>> resurrecting a dead thread in 30 days time.
>>>>> 
>>>>>> On Tue, Jan 15, 2019 at 5:07 PM Adam Bellemare 
>>>>>>  wrote:
>>>>>> All good Matthias. If it doesn’t get in for 2.2 I’ll just do it for the 
>>>>>> next release. 
>>>>>> 
>>>>>> Thanks
>>>>>> 
>>>>>>> On Jan 15, 2019, at 12:25 PM, Matthias J. Sax  
>>>>>>> wrote:
>>>>>>> 
>>>>>>> I'll try to review the KIP before the deadline, but as I am acting as
>>>>>>> release manager and also working on KIP-258, I cannot promise. Even if
>>>>>>> we make the voting deadline, it might also be tight to review the PR, as
>>>>>>> it seems to be big and complicated.
>>>>>>> 
>>>>>>> I'll try my very best to get it into 2.2...
>>>>>>> 
>>>>>>> 
>>>>>>> -Matthias
>>>>>>> 
>>>>>>>> On 1/15/19 3:27 AM, Adam Bellemare wrote:
>>>>>>>> If I can get one more binding vote in here, I may be able to get this 
>>>>>>>> out
>>>>>>>> for the 2.2 release in February.
>>>>>>>> 
>>>>>>>> Currently at:
>>>>>>>> 2 binding, 4 non-binding.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>>> On Sun, Jan 13, 2019 at 2:41 PM Patrik Kleindl  
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>> +1 (non-binding)
>>>>>>>>> I have followed the discussion too and think this feature will be very
>>>>>>>>> helpful.
>>>>>>>>> Thanks Adam for staying on this.
>>>>>>>>> Best regards
>>>>>>>>> Patrik
>>>>>>>>> 
>>>>>>>>>> Am 13.01.2019 um 19:55 schrieb Paul Whalen :
>>>>>>>>>> 
>>>>>>>>>> +1 non binding.  I haven't contributed at all to discussion but have
>>>>>>>>>> followed since Adam reinvigorated it a few months ago and am very 
>>>>>>>>>> excited
>>>>>>>>>> about it.  It would be a huge help on the project I'm working on.
>>>>>>>>>> 
>>>>>>>>>> On Fri, Jan 11, 2019 at 9:05 AM Adam Bellemare 
>>>>>>>>>> >>>>>>>>> 
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Thanks all -
>>>>>>>>>>> 
>>>>>>>>>>> So far that's +2 Binding, +2 non-binding
>>>>>>>>>>> 
>>>>>>>>>>> If we get a few more votes I can

Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.

2019-03-11 Thread Adam Bellemare
Hi John

Thanks for the explanation. I wasn't sure how KTable repartition topics
were handled with regards to cleanup but I just wanted to double check to
see if it could cause an issue.

@Matthias
My inclination is to keep the DSL topologies consistent with one another. I
am a bit concerned about scope creep into the header domain, and I am not
sure how much performance would be improved vs. additional complexity. I
think if we go down this approach we should consider a new type of internal
topic so that it's not confused with existing repartition and changelog
topic types. I am more inclined to go with keeping it consistent and
separated into a normal repartition topic and a normal changelog topic
otherwise.

Thanks
Adam






On Mon, Mar 11, 2019 at 1:24 PM Matthias J. Sax 
wrote:

> I guess Adam suggests, to use compaction for the repartition topic and
> don't purge data. Doing this, would allow us to avoid a store changelog
> topic for the "subscription store" on the RHS. This would be a nice
> optimization.
>
> But the concern about breaking compaction is correct. However, I see it
> as an optimization only and thus, if we keep the topic as plain
> repartition topic and use a separate store changelog topic the issue
> resolves itself.
>
> Maybe we could use headers thought to get this optimization. Do you
> think it's worth to do this optimization or just stick with the simple
> design and two topics (repartition plus changelog)?
>
>
>
> @Adam: thanks for updating the Wiki page. LGTM.
>
>
> -Matthias
>
>
> On 3/11/19 9:24 AM, John Roesler wrote:
> > Hey Adam,
> >
> > That's a good observation, but it wouldn't be a problem for repartition
> > topics because Streams aggressively deletes messages from the reparation
> > topics once it knows they are handled. Thus, we don't need to try and
> cater
> > to the log compactor.
> >
> > Thanks,
> > -John
> >
> > On Mon, Mar 11, 2019 at 9:10 AM Adam Bellemare  >
> > wrote:
> >
> >> For the sake of expediency, I updated the KIP with what I believe we
> have
> >> discussed.
> >>
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-Tombstones
> >>
> >>
> >>
> >> On Mon, Mar 11, 2019 at 8:20 AM Adam Bellemare <
> adam.bellem...@gmail.com>
> >> wrote:
> >>
> >>> My only concern was around compaction of records in the repartition
> >> topic.
> >>> This would simply mean that these records would stick around as their
> >> value
> >>> isn't truly null. Though I know about the usage of compaction on
> >> changelog
> >>> topics, I am a bit fuzzy on where we use compaction in other internal
> >>> topics. So long as this doesn't cause concern I can certainly finish
> off
> >>> the KIP today.
> >>>
> >>> Thanks
> >>>
> >>> Adam
> >>>
> >>> On Sun, Mar 10, 2019 at 11:38 PM Matthias J. Sax <
> matth...@confluent.io>
> >>> wrote:
> >>>
> >>>> I agree that the LHS side must encode this information and tell the
> RHS
> >>>> if a tombstone requires a reply or not.
> >>>>
> >>>>>> Would this pose some sort of verbosity problem in the internal
> >> topics,
> >>>>>> especially if we have to rebuild state off of them?
> >>>>
> >>>> I don't see an issue atm. Can you elaborate how this relates to
> rebuild
> >>>> state?
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 3/10/19 12:25 PM, Adam Bellemare wrote:
> >>>>> Hi Matthias
> >>>>>
> >>>>> I have been mulling over the unsubscribe / delete optimization, and I
> >>>> have
> >>>>> one main concern. I believe that the RHS can only determine whether
> to
> >>>>> propagate the tombstone or not based on the value passed over from
> the
> >>>> LHS.
> >>>>> This value would need to be non-null, and so wouldn't the internal
> >>>>> repartition topics end up containing many non-null "tombstone"
> values?
> >>>>>
> >>>>> ie:
> >>>>> Normal tombstone (propagate): (key=123, value=null)
> >>>>> Don't-propagate-tombstone:  (key=123, value=("don't propagate
> >>>> me,
> >>>>>

Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.

2019-03-11 Thread Adam Bellemare
For the sake of expediency, I updated the KIP with what I believe we have
discussed.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-Tombstones



On Mon, Mar 11, 2019 at 8:20 AM Adam Bellemare 
wrote:

> My only concern was around compaction of records in the repartition topic.
> This would simply mean that these records would stick around as their value
> isn't truly null. Though I know about the usage of compaction on changelog
> topics, I am a bit fuzzy on where we use compaction in other internal
> topics. So long as this doesn't cause concern I can certainly finish off
> the KIP today.
>
> Thanks
>
> Adam
>
> On Sun, Mar 10, 2019 at 11:38 PM Matthias J. Sax 
> wrote:
>
>> I agree that the LHS side must encode this information and tell the RHS
>> if a tombstone requires a reply or not.
>>
>> >> Would this pose some sort of verbosity problem in the internal topics,
>> >> especially if we have to rebuild state off of them?
>>
>> I don't see an issue atm. Can you elaborate how this relates to rebuild
>> state?
>>
>>
>> -Matthias
>>
>> On 3/10/19 12:25 PM, Adam Bellemare wrote:
>> > Hi Matthias
>> >
>> > I have been mulling over the unsubscribe / delete optimization, and I
>> have
>> > one main concern. I believe that the RHS can only determine whether to
>> > propagate the tombstone or not based on the value passed over from the
>> LHS.
>> > This value would need to be non-null, and so wouldn't the internal
>> > repartition topics end up containing many non-null "tombstone" values?
>> >
>> > ie:
>> > Normal tombstone (propagate): (key=123, value=null)
>> > Don't-propagate-tombstone:  (key=123, value=("don't propagate
>> me,
>> > but please delete state"))
>> >
>> > Would this pose some sort of verbosity problem in the internal topics,
>> > especially if we have to rebuild state off of them?
>> >
>> > Thanks
>> >
>> > Adam
>> >
>> >
>> >
>> >
>> >
>> > On Fri, Mar 8, 2019 at 10:14 PM Matthias J. Sax 
>> > wrote:
>> >
>> >> SGTM.
>> >>
>> >> I also had the impression that those duplicates are rather an error
>> than
>> >> an case of eventual consistency. Using hashing to avoid sending the
>> >> payload is a good idea IMHO.
>> >>
>> >> @Adam: can you update the KIP accordingly?
>> >>
>> >>  - add the optimization to not send a reply from RHS to LHS on
>> >> unsubscribe (if not a tombstone)
>> >>  - explain why using offsets to avoid duplicates does not work
>> >>  - add hashing to avoid duplicates
>> >>
>> >> Beside this, I don't have any further comments. Excited to finally get
>> >> this in!
>> >>
>> >> Let us know when you have updated the KIP so we can move forward with
>> >> the VOTE. Thanks a lot for your patience! This was a very lng shot!
>> >>
>> >>
>> >> -Matthias
>> >>
>> >> On 3/8/19 8:47 AM, John Roesler wrote:
>> >>> Hi all,
>> >>>
>> >>> This proposal sounds good to me, especially since we observe that
>> people
>> >>> are already confused when the see duplicate results coming out of 1:1
>> >> joins
>> >>> (which is a bug). I take this as "evidence" that we're better off
>> >>> eliminating those duplicates from the start. Guozhang's proposal seems
>> >> like
>> >>> a lightweight solution to the problem, so FWIW, I'm in favor.
>> >>>
>> >>> Thanks,
>> >>> -John
>> >>>
>> >>> On Fri, Mar 8, 2019 at 7:59 AM Adam Bellemare <
>> adam.bellem...@gmail.com>
>> >>> wrote:
>> >>>
>> >>>> Hi Guozhang
>> >>>>
>> >>>> That would certainly work for eliminating those duplicate values. As
>> it
>> >>>> stands right now, this would be consistent with swallowing changes
>> due
>> >> to
>> >>>> out-of-order processing with multiple threads, and seems like a very
>> >>>> reasonable way forward. Thank you for the suggestion!
>> >>>>
>> >>>> I have been trying to think if there are any other scenarios where 

Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.

2019-03-11 Thread Adam Bellemare
My only concern was around compaction of records in the repartition topic.
This would simply mean that these records would stick around as their value
isn't truly null. Though I know about the usage of compaction on changelog
topics, I am a bit fuzzy on where we use compaction in other internal
topics. So long as this doesn't cause concern I can certainly finish off
the KIP today.

Thanks

Adam

On Sun, Mar 10, 2019 at 11:38 PM Matthias J. Sax 
wrote:

> I agree that the LHS side must encode this information and tell the RHS
> if a tombstone requires a reply or not.
>
> >> Would this pose some sort of verbosity problem in the internal topics,
> >> especially if we have to rebuild state off of them?
>
> I don't see an issue atm. Can you elaborate how this relates to rebuild
> state?
>
>
> -Matthias
>
> On 3/10/19 12:25 PM, Adam Bellemare wrote:
> > Hi Matthias
> >
> > I have been mulling over the unsubscribe / delete optimization, and I
> have
> > one main concern. I believe that the RHS can only determine whether to
> > propagate the tombstone or not based on the value passed over from the
> LHS.
> > This value would need to be non-null, and so wouldn't the internal
> > repartition topics end up containing many non-null "tombstone" values?
> >
> > ie:
> > Normal tombstone (propagate): (key=123, value=null)
> > Don't-propagate-tombstone:  (key=123, value=("don't propagate me,
> > but please delete state"))
> >
> > Would this pose some sort of verbosity problem in the internal topics,
> > especially if we have to rebuild state off of them?
> >
> > Thanks
> >
> > Adam
> >
> >
> >
> >
> >
> > On Fri, Mar 8, 2019 at 10:14 PM Matthias J. Sax 
> > wrote:
> >
> >> SGTM.
> >>
> >> I also had the impression that those duplicates are rather an error than
> >> an case of eventual consistency. Using hashing to avoid sending the
> >> payload is a good idea IMHO.
> >>
> >> @Adam: can you update the KIP accordingly?
> >>
> >>  - add the optimization to not send a reply from RHS to LHS on
> >> unsubscribe (if not a tombstone)
> >>  - explain why using offsets to avoid duplicates does not work
> >>  - add hashing to avoid duplicates
> >>
> >> Beside this, I don't have any further comments. Excited to finally get
> >> this in!
> >>
> >> Let us know when you have updated the KIP so we can move forward with
> >> the VOTE. Thanks a lot for your patience! This was a very lng shot!
> >>
> >>
> >> -Matthias
> >>
> >> On 3/8/19 8:47 AM, John Roesler wrote:
> >>> Hi all,
> >>>
> >>> This proposal sounds good to me, especially since we observe that
> people
> >>> are already confused when the see duplicate results coming out of 1:1
> >> joins
> >>> (which is a bug). I take this as "evidence" that we're better off
> >>> eliminating those duplicates from the start. Guozhang's proposal seems
> >> like
> >>> a lightweight solution to the problem, so FWIW, I'm in favor.
> >>>
> >>> Thanks,
> >>> -John
> >>>
> >>> On Fri, Mar 8, 2019 at 7:59 AM Adam Bellemare <
> adam.bellem...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi Guozhang
> >>>>
> >>>> That would certainly work for eliminating those duplicate values. As
> it
> >>>> stands right now, this would be consistent with swallowing changes due
> >> to
> >>>> out-of-order processing with multiple threads, and seems like a very
> >>>> reasonable way forward. Thank you for the suggestion!
> >>>>
> >>>> I have been trying to think if there are any other scenarios where we
> >> can
> >>>> end up with duplicates, though I have not been able to identify any
> >> others
> >>>> at the moment. I will think on it a bit more, but if anyone else has
> any
> >>>> ideas, please chime in.
> >>>>
> >>>> Thanks,
> >>>> Adam
> >>>>
> >>>>
> >>>>
> >>>> On Thu, Mar 7, 2019 at 8:19 PM Guozhang Wang 
> >> wrote:
> >>>>
> >>>>> One more thought regarding *c-P2: Duplicates)*: first I want to
> >> separate
> >>>>> this issue with the more general issue that today (not only
> >> forei

Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.

2019-03-10 Thread Adam Bellemare
Hi Matthias

I have been mulling over the unsubscribe / delete optimization, and I have
one main concern. I believe that the RHS can only determine whether to
propagate the tombstone or not based on the value passed over from the LHS.
This value would need to be non-null, and so wouldn't the internal
repartition topics end up containing many non-null "tombstone" values?

ie:
Normal tombstone (propagate): (key=123, value=null)
Don't-propagate-tombstone:  (key=123, value=("don't propagate me,
but please delete state"))

Would this pose some sort of verbosity problem in the internal topics,
especially if we have to rebuild state off of them?

Thanks

Adam





On Fri, Mar 8, 2019 at 10:14 PM Matthias J. Sax 
wrote:

> SGTM.
>
> I also had the impression that those duplicates are rather an error than
> an case of eventual consistency. Using hashing to avoid sending the
> payload is a good idea IMHO.
>
> @Adam: can you update the KIP accordingly?
>
>  - add the optimization to not send a reply from RHS to LHS on
> unsubscribe (if not a tombstone)
>  - explain why using offsets to avoid duplicates does not work
>  - add hashing to avoid duplicates
>
> Beside this, I don't have any further comments. Excited to finally get
> this in!
>
> Let us know when you have updated the KIP so we can move forward with
> the VOTE. Thanks a lot for your patience! This was a very lng shot!
>
>
> -Matthias
>
> On 3/8/19 8:47 AM, John Roesler wrote:
> > Hi all,
> >
> > This proposal sounds good to me, especially since we observe that people
> > are already confused when the see duplicate results coming out of 1:1
> joins
> > (which is a bug). I take this as "evidence" that we're better off
> > eliminating those duplicates from the start. Guozhang's proposal seems
> like
> > a lightweight solution to the problem, so FWIW, I'm in favor.
> >
> > Thanks,
> > -John
> >
> > On Fri, Mar 8, 2019 at 7:59 AM Adam Bellemare 
> > wrote:
> >
> >> Hi Guozhang
> >>
> >> That would certainly work for eliminating those duplicate values. As it
> >> stands right now, this would be consistent with swallowing changes due
> to
> >> out-of-order processing with multiple threads, and seems like a very
> >> reasonable way forward. Thank you for the suggestion!
> >>
> >> I have been trying to think if there are any other scenarios where we
> can
> >> end up with duplicates, though I have not been able to identify any
> others
> >> at the moment. I will think on it a bit more, but if anyone else has any
> >> ideas, please chime in.
> >>
> >> Thanks,
> >> Adam
> >>
> >>
> >>
> >> On Thu, Mar 7, 2019 at 8:19 PM Guozhang Wang 
> wrote:
> >>
> >>> One more thought regarding *c-P2: Duplicates)*: first I want to
> separate
> >>> this issue with the more general issue that today (not only
> foreign-key,
> >>> but also co-partition primary-key) table-table joins is still not
> >> strictly
> >>> respecting the timestamp ordering since the two changelog streams may
> be
> >>> fetched and hence processed out-of-order and we do not allow a record
> to
> >> be
> >>> joined with the other table at any given time snapshot yet. So ideally
> >> when
> >>> there are two changelog records (k1, (f-k1, v1)), (k1, (f-k1, v2))
> coming
> >>> at the left hand table and one record (f-k1, v3) at the right hand
> table,
> >>> depending on the processing ordering we may get:
> >>>
> >>> (k1, (f-k1, v2-v3))
> >>>
> >>> or
> >>>
> >>> (k1, (f-k1, v1-v3))
> >>> (k1, (f-k1, v2-v3))
> >>>
> >>> And this is not to be addressed by this KIP.
> >>>
> >>> What I would advocate is to fix the issue that is introduced in this
> KIP
> >>> alone, that is we may have
> >>>
> >>> (k1, (f-k1, v2-v3))   // this should actually be v1-v3
> >>> (k1, (f-k1, v2-v3))
> >>>
> >>> I admit that it does not have correctness issue from the semantics
> along,
> >>> comparing it with "discarding the first result", but it may be
> confusing
> >>> from user's observation who do not expect to see the seemingly
> >> duplicates.
> >>> On the other hand, I think there's a light solution to avoid it, which
> is
> >>> that we can still optimize away to not send the full payload of "v1"
> from
>

Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.

2019-03-08 Thread Adam Bellemare
Hi Guozhang

That would certainly work for eliminating those duplicate values. As it
stands right now, this would be consistent with swallowing changes due to
out-of-order processing with multiple threads, and seems like a very
reasonable way forward. Thank you for the suggestion!

I have been trying to think if there are any other scenarios where we can
end up with duplicates, though I have not been able to identify any others
at the moment. I will think on it a bit more, but if anyone else has any
ideas, please chime in.

Thanks,
Adam



On Thu, Mar 7, 2019 at 8:19 PM Guozhang Wang  wrote:

> One more thought regarding *c-P2: Duplicates)*: first I want to separate
> this issue with the more general issue that today (not only foreign-key,
> but also co-partition primary-key) table-table joins is still not strictly
> respecting the timestamp ordering since the two changelog streams may be
> fetched and hence processed out-of-order and we do not allow a record to be
> joined with the other table at any given time snapshot yet. So ideally when
> there are two changelog records (k1, (f-k1, v1)), (k1, (f-k1, v2)) coming
> at the left hand table and one record (f-k1, v3) at the right hand table,
> depending on the processing ordering we may get:
>
> (k1, (f-k1, v2-v3))
>
> or
>
> (k1, (f-k1, v1-v3))
> (k1, (f-k1, v2-v3))
>
> And this is not to be addressed by this KIP.
>
> What I would advocate is to fix the issue that is introduced in this KIP
> alone, that is we may have
>
> (k1, (f-k1, v2-v3))   // this should actually be v1-v3
> (k1, (f-k1, v2-v3))
>
> I admit that it does not have correctness issue from the semantics along,
> comparing it with "discarding the first result", but it may be confusing
> from user's observation who do not expect to see the seemingly duplicates.
> On the other hand, I think there's a light solution to avoid it, which is
> that we can still optimize away to not send the full payload of "v1" from
> left hand side to right hand side, but instead of just trimming off the
> whole bytes, we can send, e.g., an MD5 hash of the bytes (I'm using MD5
> here just as an example, we can definitely replace it with other
> functions), by doing which we can discard the join operation if the hash
> value sent back from the right hand side does not match with the left hand
> side any more, i.e. we will only send:
>
> (k1, (f-k1, v2-v3))
>
> to down streams once.
>
> WDYT?
>
>
> Guozhang
>
>
> On Wed, Mar 6, 2019 at 7:58 AM Adam Bellemare 
> wrote:
>
> > Ah yes, I recall it all now. That answers that question as to why I had
> > caching disabled. I can certainly re-enable it since I believe the main
> > concern was simply about reconciling those two iterators. A lack of
> > knowledge there on my part.
> >
> >
> > Thank you John for weighing in - we certainly both do appreciate it. I
> > think that John hits it on the head though with his comment of "If it
> turns
> > out we're wrong about this, then it should be possible to fix the
> semantics
> > in place, without messing with the API."
> >
> > If anyone else would like to weigh in, your thoughts would be greatly
> > appreciated.
> >
> > Thanks
> >
> > On Tue, Mar 5, 2019 at 6:05 PM Matthias J. Sax 
> > wrote:
> >
> > > >> I dont know how to range scan over a caching store, probably one had
> > > >> to open 2 iterators and merge them.
> > >
> > > That happens automatically. If you query a cached KTable, it ranges
> over
> > > the cache and the underlying RocksDB and performs the merging under the
> > > hood.
> > >
> > > >> Other than that, I still think even the regualr join is broken with
> > > >> caching enabled right?
> > >
> > > Why? To me, if you use the word "broker", it implies conceptually
> > > incorrect; I don't see this.
> > >
> > > > I once files a ticket, because with caching
> > > >>> enabled it would return values that havent been published
> downstream
> > > yet.
> > >
> > > For the bug report, if found
> > > https://issues.apache.org/jira/browse/KAFKA-6599. We still need to fix
> > > this, but it is a regular bug as any other, and we should not change a
> > > design because of a bug.
> > >
> > > That range() returns values that have not been published downstream if
> > > caching is enabled is how caching works and is intended behavior. Not
> > > sure why say it's incorrect?
> > >
> > >
> > > -Matthias
> > >
> > >
&

Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.

2019-03-06 Thread Adam Bellemare
Ah yes, I recall it all now. That answers that question as to why I had
caching disabled. I can certainly re-enable it since I believe the main
concern was simply about reconciling those two iterators. A lack of
knowledge there on my part.


Thank you John for weighing in - we certainly both do appreciate it. I
think that John hits it on the head though with his comment of "If it turns
out we're wrong about this, then it should be possible to fix the semantics
in place, without messing with the API."

If anyone else would like to weigh in, your thoughts would be greatly
appreciated.

Thanks

On Tue, Mar 5, 2019 at 6:05 PM Matthias J. Sax 
wrote:

> >> I dont know how to range scan over a caching store, probably one had
> >> to open 2 iterators and merge them.
>
> That happens automatically. If you query a cached KTable, it ranges over
> the cache and the underlying RocksDB and performs the merging under the
> hood.
>
> >> Other than that, I still think even the regualr join is broken with
> >> caching enabled right?
>
> Why? To me, if you use the word "broker", it implies conceptually
> incorrect; I don't see this.
>
> > I once files a ticket, because with caching
> >>> enabled it would return values that havent been published downstream
> yet.
>
> For the bug report, if found
> https://issues.apache.org/jira/browse/KAFKA-6599. We still need to fix
> this, but it is a regular bug as any other, and we should not change a
> design because of a bug.
>
> That range() returns values that have not been published downstream if
> caching is enabled is how caching works and is intended behavior. Not
> sure why say it's incorrect?
>
>
> -Matthias
>
>
> On 3/5/19 1:49 AM, Jan Filipiak wrote:
> >
> >
> > On 04.03.2019 19:14, Matthias J. Sax wrote:
> >> Thanks Adam,
> >>
> >> *> Q) Range scans work with caching enabled, too. Thus, there is no
> >> functional/correctness requirement to disable caching. I cannot
> >> remember why Jan's proposal added this? It might be an
> >> implementation detail though (maybe just remove it from the KIP?
> >> -- might be miss leading).
> >
> > I dont know how to range scan over a caching store, probably one had
> > to open 2 iterators and merge them.
> >
> > Other than that, I still think even the regualr join is broken with
> > caching enabled right? I once files a ticket, because with caching
> > enabled it would return values that havent been published downstream yet.
> >
>
>


Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.

2019-03-04 Thread Adam Bellemare
Hi Matthias

Thank you for the feedback! I appreciate your well thought-out questions. I
have tried to answer and comment on everything that I know below.



*> Q) For the materialized combined-key store, why do we need to disable>
caching? And why do we need to flush the store?*
This is an artifact from Jan's implementation that I have carried along. My
understanding (though possibly erroneous!) is that RocksDB prefix scan
doesn't work with the cache, and ignores any data stored within it. I have
tried to validate this but I have not succeeded, so I believe that this
will need more investigation and testing. I will dig deeper on this and get
back to you.



*> a) Thus, I am wondering why we would need to send the `null` message
back> (from RHS to LHS) in the first place?*

We don't need to, if we follow your subsequent tombstone suggestion.





*> (b) About using "offsets" to resolve ordering issue: I don't think this>
would work. The join input table would be created as>
stream.flatMapValues().groupByKey().aggregate()*
Hmmm... I am a bit fuzzy on this part. Wouldn't the LHS processor be able
to get the highest offset and propagate that onwards to the RHS processor?
In my original design I had a wrapper that kept track of the input offset,
though I suspect it did not work for the above aggregation scenario.

*c-P1)*
Regarding the consistency examples, everything you wrote is correct as far
as I can tell in how the proposed system would behave. Rapid updates to the
LHS will result in some of the results being discarded (in the case of
deletes or change of FK) or doubly-produced (discussed below, after the
following example).

It does not seem to me to be possible to avoid quashing records that are
late arriving from the RHS. This could commonly be exhibited by two RHS
processors that are receiving very different loads. In the example below,
consider RHS-1 to be heavily loaded while RHS-2 is idle.

Example:
1- RHS-1 is updated to Y|bar
2- RHS-2 is updated to Z|foo
3- LHS is updated to A|Y
   -> sends Y|A+ subscription message to RHS-1
3- LHS is updated to A|Z
   -> sends Y|A- unsubscribe message to RHS-1
   -> sends Z|A+ subscription to RHS-2
4- RHS-2 processes Z|A message immediately
   -> sends A|Z,foo back
5- LHS processes A|Z,foo and produces result record A|join(Z,foo)
4- RHS-1 processes Y|A message
   -> sends A|Y,bar back
4- RHS-1 processes Y|A- unsubscribe message
   -> sends A|null message back
X- LHS processes A|Y,bar, compares it to A|Z, and discards it due to
staleness.
X- LHS processes A|null, compares it to A|Z, and discards it due to
staleness.

In this case, intermediate messages were discarded due to staleness. From
the outside, this can be seen as "incorrect" because these intermediate
results were not shown. However, there is no possible way for RHS-2 to know
to delay production of its event until RHS-1 has completed its
propagations. If we wish to produce all intermediate events, in order, we
must maintain state on the LHS about which events have been sent out, await
their return, and only publish them in order. Aside from the obvious
complexity and memory requirements, the intermediate events would
immediately be stomped by the final output.


*c-P2: Duplicates)*
With regards to duplicates (as per the double-sending of `A|Y,2,bar`), one
option is to ship the entire payload of the LHS over to the RHS, and either
join there or ship the entire payload back along with the RHS record. We
would still need to compare the FK on the LHS to ensure that it is still
valid. To take your example and expand it:

1- RHS is updated to Y|bar
2- LHS is updated to A|Y,1
   -> sends Y|(A, (Y,1))+ subscription message to RHS
3- LHS is updated to A|Y,2
   -> sends Y|(A, (Y,1))- unsubscribe message to RHS
   -> sends Y|(A, (Y,2))+ subscription to RHS
4- RHS processes first Y|A+ message
   -> sends A|(A, (Y,1)),bar back
5- LHS processes A|(A, (Y,1)),bar and produces result record A|Y,1,bar
6- RHS processes Y|(A, (Y,1))- unsubscribe message (update store only)
7- RHS processes second Y|(A, (Y,2))+ subscribe message
   -> sends A|(A, (Y,2)),bar back
8- LHS processes A|(A, (Y,2)),bar and produces result record A|Y,2,bar

Thus, the first result record is now `A|Y,1,bar`, while the second is
`A|Y,2,bar`.

This will add substantially to the data payload size. The question here
then becomes, "In which scenario is this a necessity"?

A possible scenario may include:
ktable.toStream.filter(filterFunc).foreach( workFunc )
//filterFunc true if value == (Y,1), else false
If the intermediate event (`A|Y,1`) is never produced + filtered, then
workFunc will not be executed. If I am mistaken on this point, please let
me know.



*Monkey Wrench)If you change the foreign key (Say, A|Z,1) while the updates
in step 2 & 3 above are processing in step 4, the results will all be
rejected anyways upon returning to the LHS. So even if we send the payload,
the results will be rejected as stale.*

*Conclusion:*
My two cents is 

Re: [VOTE] 2.2.0 RC0

2019-02-26 Thread Adam Bellemare
Downloaded, compiled and passed all tests successfully.

Ran quickstart (https://kafka.apache.org/quickstart) up to step 6 without
issue.

(+1 non-binding).

Adam



On Mon, Feb 25, 2019 at 9:19 PM Matthias J. Sax 
wrote:

> @Stephane
>
> Thanks! You are right (I copied the list from an older draft without
> double checking).
>
> On the release Wiki page, it's correctly listed as postponed:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=100827512
>
>
> @Viktor
>
> Thanks. This will not block the release, but I'll make sure to include
> it in the webpage update.
>
>
>
> -Matthias
>
> On 2/25/19 5:16 AM, Viktor Somogyi-Vass wrote:
> > Hi Matthias,
> >
> > I've noticed a minor line break issue in the upgrade docs. I've created a
> > small PR for that: https://github.com/apache/kafka/pull/6320
> >
> > Best,
> > Viktor
> >
> > On Sun, Feb 24, 2019 at 10:16 PM Stephane Maarek <
> kafka.tutori...@gmail.com>
> > wrote:
> >
> >> Hi Matthias
> >>
> >> Thanks for this
> >> Running through the list of KIPs. I think this is not included in 2.2:
> >>
> >> - Allow clients to suppress auto-topic-creation
> >>
> >> Regards
> >> Stephane
> >>
> >> On Sun, Feb 24, 2019 at 1:03 AM Matthias J. Sax 
> >> wrote:
> >>
> >>> Hello Kafka users, developers and client-developers,
> >>>
> >>> This is the first candidate for the release of Apache Kafka 2.2.0.
> >>>
> >>> This is a minor release with the follow highlight:
> >>>
> >>>  - Added SSL support for custom principle name
> >>>  - Allow SASL connections to periodically re-authenticate
> >>>  - Improved consumer group management
> >>>- default group.id is `null` instead of empty string
> >>>  - Add --under-min-isr option to describe topics command
> >>>  - Allow clients to suppress auto-topic-creation
> >>>  - API improvement
> >>>- Producer: introduce close(Duration)
> >>>- AdminClient: introduce close(Duration)
> >>>- Kafka Streams: new flatTransform() operator in Streams DSL
> >>>- KafkaStreams (and other classed) now implement AutoClosable to
> >>> support try-with-resource
> >>>- New Serdes and default method implementations
> >>>  - Kafka Streams exposed internal client.id via ThreadMetadata
> >>>  - Metric improvements:  All `-min`, `-avg` and `-max` metrics will now
> >>> output `NaN` as default value
> >>>
> >>>
> >>> Release notes for the 2.2.0 release:
> >>> http://home.apache.org/~mjsax/kafka-2.2.0-rc0/RELEASE_NOTES.html
> >>>
> >>> *** Please download, test and vote by Friday, March 1, 9am PST.
> >>>
> >>> Kafka's KEYS file containing PGP keys we use to sign the release:
> >>> http://kafka.apache.org/KEYS
> >>>
> >>> * Release artifacts to be voted upon (source and binary):
> >>> http://home.apache.org/~mjsax/kafka-2.2.0-rc0/
> >>>
> >>> * Maven artifacts to be voted upon:
> >>> https://repository.apache.org/content/groups/staging/org/apache/kafka/
> >>>
> >>> * Javadoc:
> >>> http://home.apache.org/~mjsax/kafka-2.2.0-rc0/javadoc/
> >>>
> >>> * Tag to be voted upon (off 2.2 branch) is the 2.2.0 tag:
> >>> https://github.com/apache/kafka/releases/tag/2.2.0-rc0
> >>>
> >>> * Documentation:
> >>> https://kafka.apache.org/22/documentation.html
> >>>
> >>> * Protocol:
> >>> https://kafka.apache.org/22/protocol.html
> >>>
> >>> * Successful Jenkins builds for the 2.2 branch:
> >>> Unit/integration tests:
> https://builds.apache.org/job/kafka-2.2-jdk8/31/
> >>>
> >>> * System tests:
> >>> https://jenkins.confluent.io/job/system-test-kafka/job/2.2/
> >>>
> >>>
> >>>
> >>>
> >>> Thanks,
> >>>
> >>> -Matthias
> >>>
> >>>
> >>
> >
>
>


Re: Statestore restoration & scaling questions - possible KIP as well.

2019-02-25 Thread Adam Bellemare
Hi Guozhang -

Thanks for the replies, and directing me to the existing JIRAs. I think
that a two-phase rebalance will be quite useful.

1) For clarity's sake, I should have just asked: When a new thread / node
is created and tasks are rebalanced, are the state stores on the new
threads/nodes fully restored during rebalancing, thereby blocking *any and
all *threads from proceeding with processing until restoration is complete?
I do not believe that this is the case, and in the case of rebalanced tasks
only the threads assigned the new tasks will be paused until state store
restoration is complete.


Thanks for your help - I appreciate you taking the time to reply.

Adam



On Wed, Feb 20, 2019 at 8:38 PM Guozhang Wang  wrote:

> Hello Adam,
>
> Sorry for being late replying on this thread, I've put my comments inlined
> below.
>
> On Sun, Feb 3, 2019 at 7:34 AM Adam Bellemare 
> wrote:
>
> > Hey Folks
> >
> > I have a few questions around the operations of stateful processing while
> > scaling nodes up/down, and a possible KIP in question #4. Most of them
> have
> > to do with task processing during rebuilding of state stores after
> scaling
> > nodes up.
> >
> > Scenario:
> > Single node/thread, processing 2 topics (10 partitions each):
> > User event topic (events) - ie: key:userId, value: ProductId
> > Product topic (entity) - ie: key: ProductId, value: productData
> >
> > My topology looks like this:
> >
> > KTable productTable = ... //materialize from product topic
> >
> > KStream output = userStream
> > .map(x => (x.value, x.key) ) //Swap the key and value around
> > .join(productTable, ... ) //Joiner is not relevant here
> > .to(...)  //Send it to some output topic
> >
> >
> > Here are my questions:
> > 1) If I scale the processing node count up, partitions will be rebalanced
> > to the new node. Does processing continue as normal on the original node,
> > while the new node's processing is paused as the internal state stores
> are
> > rebuilt/reloaded? From my reading of the code (and own experience) I
> > believe this to be the case, but I am just curious in case I missed
> > something.
> >
> >
> With 2 topics and 10 partitions each, assuming the default PartitionGrouper
> is used, there should be a total of 20 tasks (10 tasks for map which will
> send to an internal repartition topic, and 10 tasks for doing the join)
> created since these two topics are co-partitioned for joins.
>
> For example, task-0 would be processing the join from
> user-topic-partition-0 and product-topic-partition-0, and so on.
>
> With a single thread, all of these 20 tasks will be allocated to this
> thread, which would process them in an iterative manner. Note that since
> each task has its own state store (e.g. product-state-store-0 for task-0,
> etc), it means this thread will host all the 10 sets of state stores as
> well (note for the 10 mapping tasks there's no state stores at all).
>
> When you add new threads either within the same node, or on a different
> node, after rebalance each thread should be processing 10 tasks, and hence
> owning corresponding set of state stores due to rebalance. The new thread
> will first restore the state stores it gets assigned before start
> processing.
>
>
> > 2) What happens to the userStream map task? Will the new node be able to
> > process this task while the state store is rebuilding/reloading? My
> reading
> > of the code suggests that this map process will be paused on the new node
> > while the state store is rebuilt. The effect of this is that it will lead
> > to a delay in events reaching the original node's partitions, which will
> be
> > seen as late-arriving events. Am I right in this assessment?
> >
> >
> Currently the thread will NOT start processing any tasks until ALL stateful
> tasks completes restoring (stateless tasks, like the map tasks in your
> example never needs restoration at all). There's an open JIRA for making it
> customizable but I cannot find it currently.
>
>
> > 3) How does scaling up work with standby state-store replicas? From my
> > reading of the code, it appears that scaling a node up will result in a
> > reabalance, with the state assigned to the new node being rebuilt first
> > (leading to a pause in processing). Following this, the standy replicas
> are
> > populated. Am I correct in this reading?
> >
> > Standby tasks are running in parallel with active stream tasks, and it
> simply reads from the changelog topic in read time and populate the standby
> store replica; when scaling out, the instances with standby tasks will 

Re: [VOTE] - KIP-213 (new vote) - Simplified and revised.

2019-02-15 Thread Adam Bellemare
Hi Bill 

Now that you are a committer, does your vote add a +1 to binding? Can you 
recast it if you believe this is a sound decision? I am eager to finally finish 
up this KIP that has been open so long.

Thanks

Adam 

> On Feb 15, 2019, at 12:50 PM, Matthias J. Sax  wrote:
> 
> I think, he needs to re-cast his vote.
> 
> -Matthias
> 
>> On 2/15/19 5:49 AM, Adam Bellemare wrote:
>> Hi all
>> 
>> Since Bill is now a committer, the vote is changed to 3 binding and 3 
>> non-binding (unless I am somehow mistaken - please let me know!). In this 
>> case, I believe the vote passes.
>> 
>> Thanks
>> 
>> Adam 
>> 
>>> On Jan 24, 2019, at 7:28 PM, Adam Bellemare  
>>> wrote:
>>> 
>>> Bumping this vote because I don't want it to languish. It is very unlikely 
>>> to go into 2.2 at this point, but I would like to avoid resurrecting a dead 
>>> thread in 30 days time.
>>> 
>>>> On Tue, Jan 15, 2019 at 5:07 PM Adam Bellemare  
>>>> wrote:
>>>> All good Matthias. If it doesn’t get in for 2.2 I’ll just do it for the 
>>>> next release. 
>>>> 
>>>> Thanks
>>>> 
>>>>> On Jan 15, 2019, at 12:25 PM, Matthias J. Sax  
>>>>> wrote:
>>>>> 
>>>>> I'll try to review the KIP before the deadline, but as I am acting as
>>>>> release manager and also working on KIP-258, I cannot promise. Even if
>>>>> we make the voting deadline, it might also be tight to review the PR, as
>>>>> it seems to be big and complicated.
>>>>> 
>>>>> I'll try my very best to get it into 2.2...
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>>> On 1/15/19 3:27 AM, Adam Bellemare wrote:
>>>>>> If I can get one more binding vote in here, I may be able to get this out
>>>>>> for the 2.2 release in February.
>>>>>> 
>>>>>> Currently at:
>>>>>> 2 binding, 4 non-binding.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>> On Sun, Jan 13, 2019 at 2:41 PM Patrik Kleindl  
>>>>>>> wrote:
>>>>>>> 
>>>>>>> +1 (non-binding)
>>>>>>> I have followed the discussion too and think this feature will be very
>>>>>>> helpful.
>>>>>>> Thanks Adam for staying on this.
>>>>>>> Best regards
>>>>>>> Patrik
>>>>>>> 
>>>>>>>> Am 13.01.2019 um 19:55 schrieb Paul Whalen :
>>>>>>>> 
>>>>>>>> +1 non binding.  I haven't contributed at all to discussion but have
>>>>>>>> followed since Adam reinvigorated it a few months ago and am very 
>>>>>>>> excited
>>>>>>>> about it.  It would be a huge help on the project I'm working on.
>>>>>>>> 
>>>>>>>> On Fri, Jan 11, 2019 at 9:05 AM Adam Bellemare 
>>>>>>>> >>>>>>> 
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Thanks all -
>>>>>>>>> 
>>>>>>>>> So far that's +2 Binding, +2 non-binding
>>>>>>>>> 
>>>>>>>>> If we get a few more votes I can likely get this out as part of the
>>>>>>> Kafka
>>>>>>>>> 2.2 release, as the KIP Freeze is Jan 24, 2019. The current PR I have
>>>>>>> could
>>>>>>>>> be modified to match the PR in short order.
>>>>>>>>> 
>>>>>>>>> Adam
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> On Fri, Jan 11, 2019 at 7:11 AM Damian Guy 
>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>> +1 binding
>>>>>>>>>> 
>>>>>>>>>>> On Thu, 10 Jan 2019 at 16:57, Bill Bejeck  wrote:
>>>>>>>>>>> 
>>>>>>>>>>> +1 from me.  Great job on the KIP.
>>>>>>>>>>> 
>>>>>>>>>>> -Bill
>>>>>>>>>>> 
>>>>>>>>>>> On Thu, Jan 10, 2019 at 11:35 AM John Roesler 
>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> It's a +1 (nonbinding) from me as well.
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks for sticking with this, Adam!
>>>>>>>>>>>> -John
>>>>>>>>>>>> 
>>>>>>>>>>>> On Wed, Jan 9, 2019 at 6:22 PM Guozhang Wang 
>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hello Adam,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I'm +1 on the current proposal, thanks!
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Mon, Jan 7, 2019 at 6:13 AM Adam Bellemare <
>>>>>>>>>>> adam.bellem...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi All
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I would like to call a new vote on KIP-213. The design has
>>>>>>>>> changed
>>>>>>>>>>>>>> substantially. Perhaps more importantly, the KIP and associated
>>>>>>>>>>>>>> documentation has been greatly simplified. I know this KIP has
>>>>>>>>> been
>>>>>>>>>>> on
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> mailing list for a long time, but the help from John Roesler and
>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>> Wang have helped put it into a much better state. I would
>>>>>>>>>> appreciate
>>>>>>>>>>>> any
>>>>>>>>>>>>>> feedback or votes.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thank you
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Adam Bellemare
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> --
>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>> 
> 


Re: [VOTE] - KIP-213 (new vote) - Simplified and revised.

2019-02-15 Thread Adam Bellemare
Hi all

Since Bill is now a committer, the vote is changed to 3 binding and 3 
non-binding (unless I am somehow mistaken - please let me know!). In this case, 
I believe the vote passes.

Thanks

Adam 

> On Jan 24, 2019, at 7:28 PM, Adam Bellemare  wrote:
> 
> Bumping this vote because I don't want it to languish. It is very unlikely to 
> go into 2.2 at this point, but I would like to avoid resurrecting a dead 
> thread in 30 days time.
> 
>> On Tue, Jan 15, 2019 at 5:07 PM Adam Bellemare  
>> wrote:
>> All good Matthias. If it doesn’t get in for 2.2 I’ll just do it for the next 
>> release. 
>> 
>> Thanks
>> 
>> > On Jan 15, 2019, at 12:25 PM, Matthias J. Sax  
>> > wrote:
>> > 
>> > I'll try to review the KIP before the deadline, but as I am acting as
>> > release manager and also working on KIP-258, I cannot promise. Even if
>> > we make the voting deadline, it might also be tight to review the PR, as
>> > it seems to be big and complicated.
>> > 
>> > I'll try my very best to get it into 2.2...
>> > 
>> > 
>> > -Matthias
>> > 
>> >> On 1/15/19 3:27 AM, Adam Bellemare wrote:
>> >> If I can get one more binding vote in here, I may be able to get this out
>> >> for the 2.2 release in February.
>> >> 
>> >> Currently at:
>> >> 2 binding, 4 non-binding.
>> >> 
>> >> 
>> >> 
>> >>> On Sun, Jan 13, 2019 at 2:41 PM Patrik Kleindl  
>> >>> wrote:
>> >>> 
>> >>> +1 (non-binding)
>> >>> I have followed the discussion too and think this feature will be very
>> >>> helpful.
>> >>> Thanks Adam for staying on this.
>> >>> Best regards
>> >>> Patrik
>> >>> 
>> >>>> Am 13.01.2019 um 19:55 schrieb Paul Whalen :
>> >>>> 
>> >>>> +1 non binding.  I haven't contributed at all to discussion but have
>> >>>> followed since Adam reinvigorated it a few months ago and am very 
>> >>>> excited
>> >>>> about it.  It would be a huge help on the project I'm working on.
>> >>>> 
>> >>>> On Fri, Jan 11, 2019 at 9:05 AM Adam Bellemare > >>>> 
>> >>>> wrote:
>> >>>> 
>> >>>>> Thanks all -
>> >>>>> 
>> >>>>> So far that's +2 Binding, +2 non-binding
>> >>>>> 
>> >>>>> If we get a few more votes I can likely get this out as part of the
>> >>> Kafka
>> >>>>> 2.2 release, as the KIP Freeze is Jan 24, 2019. The current PR I have
>> >>> could
>> >>>>> be modified to match the PR in short order.
>> >>>>> 
>> >>>>> Adam
>> >>>>> 
>> >>>>> 
>> >>>>>> On Fri, Jan 11, 2019 at 7:11 AM Damian Guy 
>> >>> wrote:
>> >>>>>> 
>> >>>>>> +1 binding
>> >>>>>> 
>> >>>>>>> On Thu, 10 Jan 2019 at 16:57, Bill Bejeck  wrote:
>> >>>>>>> 
>> >>>>>>> +1 from me.  Great job on the KIP.
>> >>>>>>> 
>> >>>>>>> -Bill
>> >>>>>>> 
>> >>>>>>> On Thu, Jan 10, 2019 at 11:35 AM John Roesler 
>> >>>>> wrote:
>> >>>>>>> 
>> >>>>>>>> It's a +1 (nonbinding) from me as well.
>> >>>>>>>> 
>> >>>>>>>> Thanks for sticking with this, Adam!
>> >>>>>>>> -John
>> >>>>>>>> 
>> >>>>>>>> On Wed, Jan 9, 2019 at 6:22 PM Guozhang Wang 
>> >>>>>> wrote:
>> >>>>>>>> 
>> >>>>>>>>> Hello Adam,
>> >>>>>>>>> 
>> >>>>>>>>> I'm +1 on the current proposal, thanks!
>> >>>>>>>>> 
>> >>>>>>>>> 
>> >>>>>>>>> Guozhang
>> >>>>>>>>> 
>> >>>>>>>>> On Mon, Jan 7, 2019 at 6:13 AM Adam Bellemare <
>> >>>>>>> adam.bellem...@gmail.com>
>> >>>>>>>>> wrote:
>> >>>>>>>>> 
>> >>>>>>>>>> Hi All
>> >>>>>>>>>> 
>> >>>>>>>>>> I would like to call a new vote on KIP-213. The design has
>> >>>>> changed
>> >>>>>>>>>> substantially. Perhaps more importantly, the KIP and associated
>> >>>>>>>>>> documentation has been greatly simplified. I know this KIP has
>> >>>>> been
>> >>>>>>> on
>> >>>>>>>>> the
>> >>>>>>>>>> mailing list for a long time, but the help from John Roesler and
>> >>>>>>>> Guozhang
>> >>>>>>>>>> Wang have helped put it into a much better state. I would
>> >>>>>> appreciate
>> >>>>>>>> any
>> >>>>>>>>>> feedback or votes.
>> >>>>>>>>>> 
>> >>>>>>>>>> 
>> >>>>>>>>>> 
>> >>>>>>>>> 
>> >>>>>>>> 
>> >>>>>>> 
>> >>>>>> 
>> >>>>> 
>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable
>> >>>>>>>>>> 
>> >>>>>>>>>> 
>> >>>>>>>>>> 
>> >>>>>>>>>> Thank you
>> >>>>>>>>>> 
>> >>>>>>>>>> Adam Bellemare
>> >>>>>>>>>> 
>> >>>>>>>>> 
>> >>>>>>>>> 
>> >>>>>>>>> --
>> >>>>>>>>> -- Guozhang
>> >>>>>>>>> 
>> >>>>>>>> 
>> >>>>>>> 
>> >>>>>> 
>> >>>>> 
>> >>> 
>> >> 
>> > 


Re: [ANNOUNCE] New Committer: Bill Bejeck

2019-02-15 Thread Adam Bellemare
Great work Bill! Well deserved! 

> On Feb 14, 2019, at 3:55 AM, Edoardo Comar  wrote:
> 
> Well done Bill!
> --
> 
> Edoardo Comar
> 
> IBM Event Streams
> IBM UK Ltd, Hursley Park, SO21 2JN
> 
> 
> 
> 
> From:   Rajini Sivaram 
> To: dev 
> Date:   14/02/2019 09:25
> Subject:Re: [ANNOUNCE] New Committer: Bill Bejeck
> 
> 
> 
> Congratulations, Bill!
> 
> On Thu, Feb 14, 2019 at 9:04 AM Jorge Esteban Quilcate Otoya <
> quilcate.jo...@gmail.com> wrote:
> 
>> Congratulations Bill!
>> 
>> On Thu, 14 Feb 2019, 09:29 Mickael Maison, 
>> wrote:
>> 
>>> Congratulations Bill!
>>> 
>>> On Thu, Feb 14, 2019 at 7:52 AM Gurudatt Kulkarni 
> 
>>> wrote:
>>> 
 Congratulations Bill!
 
 On Thursday, February 14, 2019, Konstantine Karantasis <
 konstant...@confluent.io> wrote:
> Congrats Bill!
> 
> -Konstantine
> 
> On Wed, Feb 13, 2019 at 8:42 PM Srinivas Reddy <
 srinivas96all...@gmail.com
> 
> wrote:
> 
>> Congratulations Bill 
>> 
>> Well deserved!!
>> 
>> -
>> Srinivas
>> 
>> - Typed on tiny keys. pls ignore typos.{mobile app}
>> 
>>> On Thu, 14 Feb, 2019, 11:21 Ismael Juma >> 
>>> Congratulations Bill!
>>> 
>>> On Wed, Feb 13, 2019, 5:03 PM Guozhang Wang >>> wrote:
>>> 
 Hello all,
 
 The PMC of Apache Kafka is happy to announce that we've added
>> Bill
>> Bejeck
 as our newest project committer.
 
 Bill has been active in the Kafka community since 2015. He 
> has
>>> made
 significant contributions to the Kafka Streams project with 
> more
 than
>> 100
 PRs and 4 authored KIPs, including the streams topology
>>> optimization
 framework. Bill's also very keen on tightening Kafka's unit
>> test /
>> system
 tests coverage, which is a great value to our project 
> codebase.
 
 In addition, Bill has been very active in evangelizing Kafka 
> for
 stream
 processing in the community. He has given several Kafka 
> meetup
>>> talks
 in
>>> the
 past year, including a presentation at Kafka Summit SF. He's
>> also
>>> authored
 a book about Kafka Streams (
 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__www.manning.com_books_kafka-2Dstreams-2Din-2Daction=DwIFaQ=jf_iaSHvJObTbx-siA1ZOg=EzRhmSah4IHsUZVekRUIINhltZK7U0OaeRo7hgW4_tQ=KJZMhaqmHaB06mnORSUk3ZCMhhs-Q-KMRty3OPPS28k=KQXXkpCoIhSnbCiL1As-0nEdq8oHZGCcqYUZGOq118E=
> ), as well
>>> as
>>> various
 of posts in public venues like DZone as well as his personal
>> blog
>>> (
 
> https://urldefense.proofpoint.com/v2/url?u=http-3A__codingjunkie.net_=DwIFaQ=jf_iaSHvJObTbx-siA1ZOg=EzRhmSah4IHsUZVekRUIINhltZK7U0OaeRo7hgW4_tQ=KJZMhaqmHaB06mnORSUk3ZCMhhs-Q-KMRty3OPPS28k=K4jgRN4mNUsjGag4cb7mdSZXOV4oVbbwO48t0OxB4b0=
> ).
 
 We really appreciate the contributions and are looking 
> forward
>> to
 see
>>> more
 from him. Congratulations, Bill !
 
 
 Guozhang, on behalf of the Apache Kafka PMC
 
>>> 
>> 
> 
 
>>> 
>> 
> 
> 
> 
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number 
> 741598. 
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
> 


Re: Statestore restoration & scaling questions - possible KIP as well.

2019-02-06 Thread Adam Bellemare
Bump - hoping someone has some insight. Alternately, redirection to a more
suitable forum.

Thanks

On Sun, Feb 3, 2019 at 10:25 AM Adam Bellemare 
wrote:

> Hey Folks
>
> I have a few questions around the operations of stateful processing while
> scaling nodes up/down, and a possible KIP in question #4. Most of them have
> to do with task processing during rebuilding of state stores after scaling
> nodes up.
>
> Scenario:
> Single node/thread, processing 2 topics (10 partitions each):
> User event topic (events) - ie: key:userId, value: ProductId
> Product topic (entity) - ie: key: ProductId, value: productData
>
> My topology looks like this:
>
> KTable productTable = ... //materialize from product topic
>
> KStream output = userStream
> .map(x => (x.value, x.key) ) //Swap the key and value around
> .join(productTable, ... ) //Joiner is not relevant here
> .to(...)  //Send it to some output topic
>
>
> Here are my questions:
> 1) If I scale the processing node count up, partitions will be rebalanced
> to the new node. Does processing continue as normal on the original node,
> while the new node's processing is paused as the internal state stores are
> rebuilt/reloaded? From my reading of the code (and own experience) I
> believe this to be the case, but I am just curious in case I missed
> something.
>
> 2) What happens to the userStream map task? Will the new node be able to
> process this task while the state store is rebuilding/reloading? My reading
> of the code suggests that this map process will be paused on the new node
> while the state store is rebuilt. The effect of this is that it will lead
> to a delay in events reaching the original node's partitions, which will be
> seen as late-arriving events. Am I right in this assessment?
>
> 3) How does scaling up work with standby state-store replicas? From my
> reading of the code, it appears that scaling a node up will result in a
> reabalance, with the state assigned to the new node being rebuilt first
> (leading to a pause in processing). Following this, the standy replicas are
> populated. Am I correct in this reading?
>
> 4) If my reading in #3 is correct, would it be possible to pre-populate
> the standby stores on scale-up before initiating active-task transfer? This
> would allow seamless scale-up and scale-down without requiring any pauses
> for rebuilding state. I am interested in kicking this off as a KIP if so,
> but would appreciate any JIRAs or related KIPs to read up on prior to
> digging into this.
>
>
> Thanks
>
> Adam Bellemare
>


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-02-05 Thread Adam Bellemare
Hi Harsha

Agreed with Eno:  "However I'd argue that this KIP is about strengthening
the role of Kafka as a long-term storage service"
  - Put the focus on the single source of truth being from Kafka, with
clients not needing to source from multiple data sources.

Also, clarify if the segment is ever copied back over locally. From my
reading it suggests that it will not copy the segment back, which is
totally fine by me as I believe it would become incredibly complex to
manage otherwise.

Last, have you looked at how Apache Pulsar handles their tiered storage (
https://pulsar.apache.org/docs/latest/cookbooks/tiered-storage/)? You may
be able to get some ideas from their method - it's very similar to what you
are proposing.

I think I would like to see more information on:
  - Expiring data from the tiered storage (lets say I have 90 day expiry,
but it ends up in the tiered storage after 30 days... how do I expire out
of the HDFS/S3/Other store?)
  - Performance estimates - will the clients have timeout problems if the
data is being retrieved from slower tiered storage? How slow can the
external storage reasonably be?


Thanks for contributing - it looks very promising and I am eager to see
this as a feature.






On Tue, Feb 5, 2019 at 6:00 AM Eno Thereska  wrote:

> Thanks Harsha for the KIP. A couple of comments:
>
> - the motivation needs more work. I think some of the questions about ETL
> and other tools would benefit from a clearer motivation. In particular
> right now the motivation is mostly about reducing recovery time. However
> I'd argue that this KIP is about strengthening the role of Kafka as a
> long-term storage service (e.g. blog by Jay
> https://www.confluent.io/blog/okay-store-data-apache-kafka/). Any long
> term
> storage system needs to handle (infinitely) scalable storage. Local storage
> is just not that infinitely scalable. Hence the motivation for tiering,
> where there is a fast local tier and a remote tier that scales well in
> terms of long-term storage costs.
> - I agree that this is not an ETL use case. There are indeed tools out
> there that help with moving data to other storage systems, however
> first-class support for tiering would make it easier for clients to access
> their Kafka data with no other moving parts. So I like that part of the
> proposal.
> - I didn't understand when a log would be copied to the remote tier and
> whether it would ever be brought back to the local tier. Tiering in storage
> implies data movement and policies for when data moves from tier to tier. A
> couple of things here 1) I didn't understand when a log would move to the
> remote tier. I was thinking it would move once the local tier is getting
> full and some eviction policy (e.g. evict oldest log) would decide. However
> you seem to imply that the log would be copied to the remote tier when it
> is first created? 2) would a log from the remote tier ever make it back to
> the local tier?
>
> That's it for now,
>
> Thanks
> Eno
>
>
> On Mon, Feb 4, 2019 at 11:53 PM Harsha Chintalapani 
> wrote:
>
> > "I think you are saying that this enables additional (potentially
> cheaper)
> > storage options without *requiring* an existing ETL pipeline. “
> > Yes.
> >
> > " But it's not really a replacement for the sort of pipelines people
> build
> > with Connect, Gobblin etc.”
> >
> > It is not. But also making an assumption that everyone runs these
> > pipelines for storing raw Kafka data into HDFS or S3 is also wrong
> >  assumption.
> > The aim of this KIP is to provide tiered storage as whole package not
> > asking users to ship the data on their own using existing ETL, which
> means
> > running a consumer and maintaining those pipelines.
> >
> > " My point was that, if you are already offloading records in an ETL
> > pipeline, why do you need a new pipeline built into the broker to ship
> the
> > same data to the same place?”
> >
> > As you said its ETL pipeline, which means users of these pipelines are
> > reading the data from broker and transforming its state and storing it
> > somewhere.
> > The point of this KIP is store log segments as it is without changing
> > their structure so that we can use the existing offset mechanisms to look
> > it up when the consumer needs to read old data. When you do load it via
> > your existing pipelines you are reading the topic as a whole , which
> > doesn’t guarantee that you’ll produce this data back into HDFS in S3 in
> the
> > same order and who is going to generate the Index files again.
> >
> >
> > "So you'd end up with one of 1)cold segments are only useful to Kafka; 2)
> > you have the same data written to HDFS/etc twice, once for Kafka and once
> > for everything else, in two separate formats”
> >
> > You are talking two different use cases. If someone is storing raw data
> > out of Kafka for long term access.
> > By storing the data as it is in HDFS though Kafka will solve this issue.
> > They do not need to run another pipe-line to ship these 

Statestore restoration & scaling questions - possible KIP as well.

2019-02-03 Thread Adam Bellemare
Hey Folks

I have a few questions around the operations of stateful processing while
scaling nodes up/down, and a possible KIP in question #4. Most of them have
to do with task processing during rebuilding of state stores after scaling
nodes up.

Scenario:
Single node/thread, processing 2 topics (10 partitions each):
User event topic (events) - ie: key:userId, value: ProductId
Product topic (entity) - ie: key: ProductId, value: productData

My topology looks like this:

KTable productTable = ... //materialize from product topic

KStream output = userStream
.map(x => (x.value, x.key) ) //Swap the key and value around
.join(productTable, ... ) //Joiner is not relevant here
.to(...)  //Send it to some output topic


Here are my questions:
1) If I scale the processing node count up, partitions will be rebalanced
to the new node. Does processing continue as normal on the original node,
while the new node's processing is paused as the internal state stores are
rebuilt/reloaded? From my reading of the code (and own experience) I
believe this to be the case, but I am just curious in case I missed
something.

2) What happens to the userStream map task? Will the new node be able to
process this task while the state store is rebuilding/reloading? My reading
of the code suggests that this map process will be paused on the new node
while the state store is rebuilt. The effect of this is that it will lead
to a delay in events reaching the original node's partitions, which will be
seen as late-arriving events. Am I right in this assessment?

3) How does scaling up work with standby state-store replicas? From my
reading of the code, it appears that scaling a node up will result in a
reabalance, with the state assigned to the new node being rebuilt first
(leading to a pause in processing). Following this, the standy replicas are
populated. Am I correct in this reading?

4) If my reading in #3 is correct, would it be possible to pre-populate the
standby stores on scale-up before initiating active-task transfer? This
would allow seamless scale-up and scale-down without requiring any pauses
for rebuilding state. I am interested in kicking this off as a KIP if so,
but would appreciate any JIRAs or related KIPs to read up on prior to
digging into this.


Thanks

Adam Bellemare


Re: Why is enable.auto.commit=true the default value for consumer?

2019-02-01 Thread Adam Bellemare
Hi Clement & Colin

Thanks for the discussion on this.

I think the biggest confusion is how the enable.auto.commit documentation
says "If true the consumer's offset will be periodically committed in the
background."
@Clement, you mention that there is documentation about what you must do
with auto-commit. I am looking at https://kafka.apache.org/documentation
and I cannot seem to locate it. Could you point me to it?

I would be fine with making a simple PR to improve the documentation on
this - I think it will help straighten things out.

Thanks again,

Adam


On Tue, Jan 29, 2019 at 7:35 PM Pellerin, Clement 
wrote:

> Indeed, but this is what the documentation says you must do with
> auto-commit.
> I say it is a user error if you don't.
>
> Regardless, I think this is a fairly common misconception, so it would not
> hurt
> to debunk it explicitly in the documentation.
>
> -Original Message-
> From: Colin McCabe [mailto:cmcc...@apache.org]
> Sent: Tuesday, January 29, 2019 7:26 PM
> To: dev@kafka.apache.org
> Subject: Re: Why is enable.auto.commit=true the default value for consumer?
>
> Hi Clement,
>
> You are assuming that the client application is single-threaded-- or at
> least processes all the records before polling for more.  This may or may
> not be the case.  But that is a fair point-- in this case, auto-commit
> would be safe.
>
> best,
> Colin
>
> On Tue, Jan 29, 2019, at 16:23, Pellerin, Clement wrote:
> > I had the same impression at some point but this is not how auto-commit
> works.
> > Auto-commit can only commit when the application comes back to poll and
> > if it decides to commit at that time, it will only commit the previous
> batch.
> > In your example, the app might come back and have to re-execute all
> > the records in the uncommitted batch but it will never skip over
> > unprocessed records.
> >
> > -Original Message-
> > From: Adam Bellemare [mailto:adam.bellem...@gmail.com]
> > Sent: Tuesday, January 29, 2019 3:54 PM
> > To: dev@kafka.apache.org
> > Subject: Why is enable.auto.commit=true the default value for consumer?
> >
> > As the question indicates.
> >
> > Should this not be default false? I think this is a bit nefarious to
> > someone launching their application into production without testing it
> > extensively around failure modes. I can see a scenario where a consumer
> > polls for events, processes them, produces to output topic, and commits
> the
> > offsets. Say it takes 30 seconds for a batch. If it fails halfway
> through,
> > upon restarting it will skip everything that was unprocessed/unpublished
> up
> > to the committed offset.
> >
> > Is there a historic reason why it's set to default true? Is it because to
> > change it to default false it could affect the upgrade path of previous
> > implementations?
> >
> > Adam
> >
>


Why is enable.auto.commit=true the default value for consumer?

2019-01-29 Thread Adam Bellemare
As the question indicates.

Should this not be default false? I think this is a bit nefarious to
someone launching their application into production without testing it
extensively around failure modes. I can see a scenario where a consumer
polls for events, processes them, produces to output topic, and commits the
offsets. Say it takes 30 seconds for a batch. If it fails halfway through,
upon restarting it will skip everything that was unprocessed/unpublished up
to the committed offset.

Is there a historic reason why it's set to default true? Is it because to
change it to default false it could affect the upgrade path of previous
implementations?

Adam


Re: [VOTE] - KIP-213 (new vote) - Simplified and revised.

2019-01-24 Thread Adam Bellemare
Bumping this vote because I don't want it to languish. It is very unlikely
to go into 2.2 at this point, but I would like to avoid resurrecting a dead
thread in 30 days time.

On Tue, Jan 15, 2019 at 5:07 PM Adam Bellemare 
wrote:

> All good Matthias. If it doesn’t get in for 2.2 I’ll just do it for the
> next release.
>
> Thanks
>
> > On Jan 15, 2019, at 12:25 PM, Matthias J. Sax 
> wrote:
> >
> > I'll try to review the KIP before the deadline, but as I am acting as
> > release manager and also working on KIP-258, I cannot promise. Even if
> > we make the voting deadline, it might also be tight to review the PR, as
> > it seems to be big and complicated.
> >
> > I'll try my very best to get it into 2.2...
> >
> >
> > -Matthias
> >
> >> On 1/15/19 3:27 AM, Adam Bellemare wrote:
> >> If I can get one more binding vote in here, I may be able to get this
> out
> >> for the 2.2 release in February.
> >>
> >> Currently at:
> >> 2 binding, 4 non-binding.
> >>
> >>
> >>
> >>> On Sun, Jan 13, 2019 at 2:41 PM Patrik Kleindl 
> wrote:
> >>>
> >>> +1 (non-binding)
> >>> I have followed the discussion too and think this feature will be very
> >>> helpful.
> >>> Thanks Adam for staying on this.
> >>> Best regards
> >>> Patrik
> >>>
> >>>> Am 13.01.2019 um 19:55 schrieb Paul Whalen :
> >>>>
> >>>> +1 non binding.  I haven't contributed at all to discussion but have
> >>>> followed since Adam reinvigorated it a few months ago and am very
> excited
> >>>> about it.  It would be a huge help on the project I'm working on.
> >>>>
> >>>> On Fri, Jan 11, 2019 at 9:05 AM Adam Bellemare <
> adam.bellem...@gmail.com
> >>>>
> >>>> wrote:
> >>>>
> >>>>> Thanks all -
> >>>>>
> >>>>> So far that's +2 Binding, +2 non-binding
> >>>>>
> >>>>> If we get a few more votes I can likely get this out as part of the
> >>> Kafka
> >>>>> 2.2 release, as the KIP Freeze is Jan 24, 2019. The current PR I have
> >>> could
> >>>>> be modified to match the PR in short order.
> >>>>>
> >>>>> Adam
> >>>>>
> >>>>>
> >>>>>> On Fri, Jan 11, 2019 at 7:11 AM Damian Guy 
> >>> wrote:
> >>>>>>
> >>>>>> +1 binding
> >>>>>>
> >>>>>>> On Thu, 10 Jan 2019 at 16:57, Bill Bejeck 
> wrote:
> >>>>>>>
> >>>>>>> +1 from me.  Great job on the KIP.
> >>>>>>>
> >>>>>>> -Bill
> >>>>>>>
> >>>>>>> On Thu, Jan 10, 2019 at 11:35 AM John Roesler 
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> It's a +1 (nonbinding) from me as well.
> >>>>>>>>
> >>>>>>>> Thanks for sticking with this, Adam!
> >>>>>>>> -John
> >>>>>>>>
> >>>>>>>> On Wed, Jan 9, 2019 at 6:22 PM Guozhang Wang 
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hello Adam,
> >>>>>>>>>
> >>>>>>>>> I'm +1 on the current proposal, thanks!
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Guozhang
> >>>>>>>>>
> >>>>>>>>> On Mon, Jan 7, 2019 at 6:13 AM Adam Bellemare <
> >>>>>>> adam.bellem...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi All
> >>>>>>>>>>
> >>>>>>>>>> I would like to call a new vote on KIP-213. The design has
> >>>>> changed
> >>>>>>>>>> substantially. Perhaps more importantly, the KIP and associated
> >>>>>>>>>> documentation has been greatly simplified. I know this KIP has
> >>>>> been
> >>>>>>> on
> >>>>>>>>> the
> >>>>>>>>>> mailing list for a long time, but the help from John Roesler and
> >>>>>>>> Guozhang
> >>>>>>>>>> Wang have helped put it into a much better state. I would
> >>>>>> appreciate
> >>>>>>>> any
> >>>>>>>>>> feedback or votes.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Thank you
> >>>>>>>>>>
> >>>>>>>>>> Adam Bellemare
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> -- Guozhang
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> >>
> >
>


Re: [VOTE] - KIP-213 (new vote) - Simplified and revised.

2019-01-15 Thread Adam Bellemare
All good Matthias. If it doesn’t get in for 2.2 I’ll just do it for the next 
release. 

Thanks

> On Jan 15, 2019, at 12:25 PM, Matthias J. Sax  wrote:
> 
> I'll try to review the KIP before the deadline, but as I am acting as
> release manager and also working on KIP-258, I cannot promise. Even if
> we make the voting deadline, it might also be tight to review the PR, as
> it seems to be big and complicated.
> 
> I'll try my very best to get it into 2.2...
> 
> 
> -Matthias
> 
>> On 1/15/19 3:27 AM, Adam Bellemare wrote:
>> If I can get one more binding vote in here, I may be able to get this out
>> for the 2.2 release in February.
>> 
>> Currently at:
>> 2 binding, 4 non-binding.
>> 
>> 
>> 
>>> On Sun, Jan 13, 2019 at 2:41 PM Patrik Kleindl  wrote:
>>> 
>>> +1 (non-binding)
>>> I have followed the discussion too and think this feature will be very
>>> helpful.
>>> Thanks Adam for staying on this.
>>> Best regards
>>> Patrik
>>> 
>>>> Am 13.01.2019 um 19:55 schrieb Paul Whalen :
>>>> 
>>>> +1 non binding.  I haven't contributed at all to discussion but have
>>>> followed since Adam reinvigorated it a few months ago and am very excited
>>>> about it.  It would be a huge help on the project I'm working on.
>>>> 
>>>> On Fri, Jan 11, 2019 at 9:05 AM Adam Bellemare >>> 
>>>> wrote:
>>>> 
>>>>> Thanks all -
>>>>> 
>>>>> So far that's +2 Binding, +2 non-binding
>>>>> 
>>>>> If we get a few more votes I can likely get this out as part of the
>>> Kafka
>>>>> 2.2 release, as the KIP Freeze is Jan 24, 2019. The current PR I have
>>> could
>>>>> be modified to match the PR in short order.
>>>>> 
>>>>> Adam
>>>>> 
>>>>> 
>>>>>> On Fri, Jan 11, 2019 at 7:11 AM Damian Guy 
>>> wrote:
>>>>>> 
>>>>>> +1 binding
>>>>>> 
>>>>>>> On Thu, 10 Jan 2019 at 16:57, Bill Bejeck  wrote:
>>>>>>> 
>>>>>>> +1 from me.  Great job on the KIP.
>>>>>>> 
>>>>>>> -Bill
>>>>>>> 
>>>>>>> On Thu, Jan 10, 2019 at 11:35 AM John Roesler 
>>>>> wrote:
>>>>>>> 
>>>>>>>> It's a +1 (nonbinding) from me as well.
>>>>>>>> 
>>>>>>>> Thanks for sticking with this, Adam!
>>>>>>>> -John
>>>>>>>> 
>>>>>>>> On Wed, Jan 9, 2019 at 6:22 PM Guozhang Wang 
>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hello Adam,
>>>>>>>>> 
>>>>>>>>> I'm +1 on the current proposal, thanks!
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Guozhang
>>>>>>>>> 
>>>>>>>>> On Mon, Jan 7, 2019 at 6:13 AM Adam Bellemare <
>>>>>>> adam.bellem...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi All
>>>>>>>>>> 
>>>>>>>>>> I would like to call a new vote on KIP-213. The design has
>>>>> changed
>>>>>>>>>> substantially. Perhaps more importantly, the KIP and associated
>>>>>>>>>> documentation has been greatly simplified. I know this KIP has
>>>>> been
>>>>>>> on
>>>>>>>>> the
>>>>>>>>>> mailing list for a long time, but the help from John Roesler and
>>>>>>>> Guozhang
>>>>>>>>>> Wang have helped put it into a much better state. I would
>>>>>> appreciate
>>>>>>>> any
>>>>>>>>>> feedback or votes.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Thank you
>>>>>>>>>> 
>>>>>>>>>> Adam Bellemare
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> -- Guozhang
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> 
> 


Re: [VOTE] - KIP-213 (new vote) - Simplified and revised.

2019-01-15 Thread Adam Bellemare
If I can get one more binding vote in here, I may be able to get this out
for the 2.2 release in February.

Currently at:
2 binding, 4 non-binding.



On Sun, Jan 13, 2019 at 2:41 PM Patrik Kleindl  wrote:

> +1 (non-binding)
> I have followed the discussion too and think this feature will be very
> helpful.
> Thanks Adam for staying on this.
> Best regards
> Patrik
>
> > Am 13.01.2019 um 19:55 schrieb Paul Whalen :
> >
> > +1 non binding.  I haven't contributed at all to discussion but have
> > followed since Adam reinvigorated it a few months ago and am very excited
> > about it.  It would be a huge help on the project I'm working on.
> >
> > On Fri, Jan 11, 2019 at 9:05 AM Adam Bellemare  >
> > wrote:
> >
> >> Thanks all -
> >>
> >> So far that's +2 Binding, +2 non-binding
> >>
> >> If we get a few more votes I can likely get this out as part of the
> Kafka
> >> 2.2 release, as the KIP Freeze is Jan 24, 2019. The current PR I have
> could
> >> be modified to match the PR in short order.
> >>
> >> Adam
> >>
> >>
> >>> On Fri, Jan 11, 2019 at 7:11 AM Damian Guy 
> wrote:
> >>>
> >>> +1 binding
> >>>
> >>>> On Thu, 10 Jan 2019 at 16:57, Bill Bejeck  wrote:
> >>>>
> >>>> +1 from me.  Great job on the KIP.
> >>>>
> >>>> -Bill
> >>>>
> >>>> On Thu, Jan 10, 2019 at 11:35 AM John Roesler 
> >> wrote:
> >>>>
> >>>>> It's a +1 (nonbinding) from me as well.
> >>>>>
> >>>>> Thanks for sticking with this, Adam!
> >>>>> -John
> >>>>>
> >>>>> On Wed, Jan 9, 2019 at 6:22 PM Guozhang Wang 
> >>> wrote:
> >>>>>
> >>>>>> Hello Adam,
> >>>>>>
> >>>>>> I'm +1 on the current proposal, thanks!
> >>>>>>
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>> On Mon, Jan 7, 2019 at 6:13 AM Adam Bellemare <
> >>>> adam.bellem...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi All
> >>>>>>>
> >>>>>>> I would like to call a new vote on KIP-213. The design has
> >> changed
> >>>>>>> substantially. Perhaps more importantly, the KIP and associated
> >>>>>>> documentation has been greatly simplified. I know this KIP has
> >> been
> >>>> on
> >>>>>> the
> >>>>>>> mailing list for a long time, but the help from John Roesler and
> >>>>> Guozhang
> >>>>>>> Wang have helped put it into a much better state. I would
> >>> appreciate
> >>>>> any
> >>>>>>> feedback or votes.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> Thank you
> >>>>>>>
> >>>>>>> Adam Bellemare
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>


Re: [VOTE] - KIP-213 (new vote) - Simplified and revised.

2019-01-11 Thread Adam Bellemare
Thanks all -

So far that's +2 Binding, +2 non-binding

If we get a few more votes I can likely get this out as part of the Kafka
2.2 release, as the KIP Freeze is Jan 24, 2019. The current PR I have could
be modified to match the PR in short order.

Adam


On Fri, Jan 11, 2019 at 7:11 AM Damian Guy  wrote:

> +1 binding
>
> On Thu, 10 Jan 2019 at 16:57, Bill Bejeck  wrote:
>
> > +1 from me.  Great job on the KIP.
> >
> > -Bill
> >
> > On Thu, Jan 10, 2019 at 11:35 AM John Roesler  wrote:
> >
> > > It's a +1 (nonbinding) from me as well.
> > >
> > > Thanks for sticking with this, Adam!
> > > -John
> > >
> > > On Wed, Jan 9, 2019 at 6:22 PM Guozhang Wang 
> wrote:
> > >
> > > > Hello Adam,
> > > >
> > > > I'm +1 on the current proposal, thanks!
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Jan 7, 2019 at 6:13 AM Adam Bellemare <
> > adam.bellem...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi All
> > > > >
> > > > > I would like to call a new vote on KIP-213. The design has changed
> > > > > substantially. Perhaps more importantly, the KIP and associated
> > > > > documentation has been greatly simplified. I know this KIP has been
> > on
> > > > the
> > > > > mailing list for a long time, but the help from John Roesler and
> > > Guozhang
> > > > > Wang have helped put it into a much better state. I would
> appreciate
> > > any
> > > > > feedback or votes.
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable
> > > > >
> > > > >
> > > > >
> > > > > Thank you
> > > > >
> > > > > Adam Bellemare
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>


Re: [VOTE] KIP-349 Priorities for Source Topics

2019-01-11 Thread Adam Bellemare
Hi Colin

Thanks for the sober second thought - I actually didn't see the
inconclusive parts of the DISCUSS (I must have missed them when going
through) so I am grateful you highlighted these. I will have to remove my
+1 in light of the issues Colin has mentioned, but I will follow the
discussion more carefully.



On Thu, Jan 10, 2019 at 5:41 PM Colin McCabe  wrote:

> Hi all,
>
> Just as a quick reminder, this is not really a complete proposal.  There
> are a bunch of unresolved issues with this KIP.  One example is how this
> interacts with incremental fetch sessions.  It is not mentioned anywhere in
> the KIP text.  Previously we discussed some approaches, but there was no
> clear consensus.
>
> Another example is the issue of starvation.  The KIP discusses "an idea"
> for handling starvation, but the details are very sparse-- just a sentence
> of two.  At minimum we would need some kind of configuration for the
> proposed "lag deltas".  It's also not clear that the proposed mechanism
> would work, since we don't receive lag metrics for partitions that we don't
> fetch.  But if we do fetch from the partitions, we may receive data, which
> would cause our policy to not be strict prioties.  Keep in mind, even
> attempting to fetch 1 byte may cause us to read an entire message, as
> described in KIP-74.
>
> It seems that we don't understand the potential use-cases.  The only
> use-case referenced by the KIP is this one, by Bala Prassanna:
>
>  > We use Kafka to process the asynchronous events of our Document
> Management
>  > System such as preview generation, indexing for search etc.
>  > The traffic gets generated via Web and Desktop Sync application. In
> such
>  > cases, we had to prioritize the traffic from web and consume them
> first.
>  > But this might lead to the starvation of events from sync if the
> consumer
>  > speed is slow and the event rate is high from web.  A solution to
> handle
>  > the starvation with a timeout after which the events are consumed
> normally
>  > for a specified period of time would be great and help us use our
>  > resources effectively.
>
> Reading this carefully, it seems that the problem is actually starvation,
> not implementing priorities.  Bala already implemented priorities outside
> of Kafka.  If you read the discussion on KAFKA-6690, Bala also makes this
> comment: "We would need this in both Consumer API and Streams API."  The
> current KIP does not discuss adding priorities to Streams-- only to the
> basic consumer API.  So it seems clear that KIP-349 does not address Bala's
> use-case at all.
>
> Stepping back a little bit, it seems like a few people have spoken up
> recently asking for some way to re-order the messages they receive from the
> Kafka consumer.  For example, ChienHsing Wu has discussed a use-case where
> he wants to receive messages in a "round robin" order.  All of this is
> possible by doing some local buffering and using the pause and resume
> APIs.  Perhaps we should consider better documenting these APIs, and adding
> some examples.  Or perhaps we should consider some kind of API to do
> pluggable buffering on the client side.
>
> In any case, this needs more discussion.  We need to be clear and definite
> about what use cases we want to solve, and the tradeoffs we're making to
> solve them.  For now, I have to reiterate my -1 (binding).
>
> Colin
>
>
> On Thu, Jan 10, 2019, at 10:46, Adam Bellemare wrote:
> > Looks good to me then!
> >
> > +1 non-binding
> >
> >
> >
> > > On Jan 10, 2019, at 1:22 PM, Afshartous, Nick 
> wrote:
> > >
> > >
> > > Hi Adam,
> > >
> > >
> > > This change is only intended for the basic consumer API.
> > >
> > >
> > > Cheers,
> > >
> > > --
> > >
> > >Nick
> > >
> > >
> > > 
> > > From: Adam Bellemare 
> > > Sent: Sunday, January 6, 2019 11:45 AM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [VOTE] KIP-349 Priorities for Source Topics
> > >
> > > Hi Nick
> > >
> > > Is this change only for the basic consumer? How would this affect
> anything with Kafka Streams?
> > >
> > > Thanks
> > >
> > >
> > >> On Jan 5, 2019, at 10:52 PM, n...@afshartous.com wrote:
> > >>
> > >> Bumping again for more votes.
> > >> --
> > >> Nick
> > >>
> > >>
> > >>> On Dec 26, 2018, at 12:36 PM, n...@afshartous.c

Re: [VOTE] KIP-349 Priorities for Source Topics

2019-01-10 Thread Adam Bellemare
Looks good to me then!

+1 non-binding



> On Jan 10, 2019, at 1:22 PM, Afshartous, Nick  wrote:
> 
> 
> Hi Adam,
> 
> 
> This change is only intended for the basic consumer API.
> 
> 
> Cheers,
> 
> --
> 
>Nick
> 
> 
> 
> From: Adam Bellemare 
> Sent: Sunday, January 6, 2019 11:45 AM
> To: dev@kafka.apache.org
> Subject: Re: [VOTE] KIP-349 Priorities for Source Topics
> 
> Hi Nick
> 
> Is this change only for the basic consumer? How would this affect anything 
> with Kafka Streams?
> 
> Thanks
> 
> 
>> On Jan 5, 2019, at 10:52 PM, n...@afshartous.com wrote:
>> 
>> Bumping again for more votes.
>> --
>> Nick
>> 
>> 
>>> On Dec 26, 2018, at 12:36 PM, n...@afshartous.com wrote:
>>> 
>>> Bumping this thread for more votes
>>> 
>>> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D349-3A-2BPriorities-2Bfor-2BSource-2BTopics=DwIFAg=-SicqtCl7ffNuxX6bdsSog=P28z_ShLjFv5AP-w9-b_auYBx8qTrjk2JPYZKbjmJTs=5qg4fCOVMtRYYLu2e8h8KmDyis_uk3aFqT5Eq0x4hN8=Sbrd5XSwEZiMc9iTPJjRQafl4ubXwIOnsnFzhBEa0h0=
>>>  
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D349-3A-2BPriorities-2Bfor-2BSource-2BTopics=DwIFAg=-SicqtCl7ffNuxX6bdsSog=P28z_ShLjFv5AP-w9-b_auYBx8qTrjk2JPYZKbjmJTs=5qg4fCOVMtRYYLu2e8h8KmDyis_uk3aFqT5Eq0x4hN8=Sbrd5XSwEZiMc9iTPJjRQafl4ubXwIOnsnFzhBEa0h0=><https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D349-3A-2BPriorities-2Bfor-2BSource-2BTopics=DwIFAg=-SicqtCl7ffNuxX6bdsSog=P28z_ShLjFv5AP-w9-b_auYBx8qTrjk2JPYZKbjmJTs=5qg4fCOVMtRYYLu2e8h8KmDyis_uk3aFqT5Eq0x4hN8=Sbrd5XSwEZiMc9iTPJjRQafl4ubXwIOnsnFzhBEa0h0=
>>>  
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D349-3A-2BPriorities-2Bfor-2BSource-2BTopics=DwIFAg=-SicqtCl7ffNuxX6bdsSog=P28z_ShLjFv5AP-w9-b_auYBx8qTrjk2JPYZKbjmJTs=5qg4fCOVMtRYYLu2e8h8KmDyis_uk3aFqT5Eq0x4hN8=Sbrd5XSwEZiMc9iTPJjRQafl4ubXwIOnsnFzhBEa0h0=>>
>> 
>> 
>> 
>> 


[VOTE] - KIP-213 (new vote) - Simplified and revised.

2019-01-07 Thread Adam Bellemare
Hi All

I would like to call a new vote on KIP-213. The design has changed
substantially. Perhaps more importantly, the KIP and associated
documentation has been greatly simplified. I know this KIP has been on the
mailing list for a long time, but the help from John Roesler and Guozhang
Wang have helped put it into a much better state. I would appreciate any
feedback or votes.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable



Thank you

Adam Bellemare


  1   2   >