[jira] [Created] (KAFKA-8712) Build Kafka Streams against Scala 2.13

2019-07-24 Thread Loic DIVAD (JIRA)
Loic DIVAD created KAFKA-8712:
-

 Summary: Build Kafka Streams against Scala 2.13
 Key: KAFKA-8712
 URL: https://issues.apache.org/jira/browse/KAFKA-8712
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Affects Versions: 2.3.0
Reporter: Loic DIVAD


Since Scala 2.13 has been released and KAFKA-7197 brings its support, the 
library {{kafka-streams-scala }}could be also compiled for Scala 2.13. 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [DISCUSS] KIP-478 Strongly Typed Processor API

2019-07-24 Thread John Roesler
Hey again Matthias,

I think it might help to frame the evaluation of the Context question if we
have a "spitball" proposal for what change we might want to make to the
context.

Currently, the ProcessorContext is referenced in the following public
interfaces:

org.apache.kafka.streams.errors.DeserializationExceptionHandler#handle
org.apache.kafka.streams.kstream.Transformer#init
org.apache.kafka.streams.kstream.ValueTransformer#init
org.apache.kafka.streams.kstream.ValueTransformerWithKey#init
org.apache.kafka.streams.processor.Processor#init
org.apache.kafka.streams.processor.StateStore#init

We can sub-divide the ProcessorContext into broad categories:
General Information:
* a handle on the config
* information about the execution context (what is the task id, the
application id, etc)
Record Information:
* extra information about the current record
Store Support:
* the ability to register state restore callbacks
Processor Support:
* the ability to schedule punctuations
* the ability to get registered state stores
* the ability to schedule punctuations
* the ability to forward records
* the ability to request commits

We could imagine slicing the Processor Context into four new component
interfaces, and making ProcessorContext just implement them. Then, we could
mix-and-match those new component interfaces for use elsewhere.

E.g.,:
org.apache.kafka.streams.errors.DeserializationExceptionHandler#handle
* only gets the informational context

org.apache.kafka.streams.kstream.Transformer#init
org.apache.kafka.streams.kstream.ValueTransformer#init
org.apache.kafka.streams.kstream.ValueTransformerWithKey#init
* information context
* the ability to get registered state stores
Also
* the ability to schedule punctuations
* restricted ability to forward (only obeying the rules of the particular
interface, for example)
Or maybe just:
* no ability to foraward
* the ability to schedule special punctuators that can return one or more
keys or values when fired

org.apache.kafka.streams.processor.Processor#init
* all the contexts, except the ability to register state restore callbacks

org.apache.kafka.streams.processor.StateStore#init
* information contexts
* the ability to register state restore callbacks
* maybe punctuations and forwards, could be discussed further


The operative question for us right now is whether there is a clean path to
something like this from the current KIP, or whether we'd be forced to
deprecate an interface we are only just now adding. Note that the only
interfaces we're adding right now are :
* org.apache.kafka.streams.processor.api.Processor
* org.apache.kafka.streams.processor.api.ProcessorSupplier
And the only thing we need to make the above spitball proposal compatible
with these proposed interfaces is to deprecate the ability to register
state restore callbacks from the ProcessorContext.

Otherwise, we would at that time be able to propose new Transformer
interfaces that take (e.g.) TransformerContexts, likewise with
DeserializationExceptionHandler and StateStore.

In other words, I _think_ that we have a clean migration path to address
the Context problem in follow-on work. But clearly my motivation may be
corrupt. What do you think?

Thanks,
-John


On Wed, Jul 24, 2019 at 5:06 PM John Roesler  wrote:

> Hey Matthias,
>
> I agree, it's worth double-checking to make sure that the upgrade path
> would be smooth. There's no point in putting ourselves in an awkward jam.
> I'll look into it and report back.
>
> Regarding the global store logic, I confirmed that the "state update
> processor" shouldn't be forwarding anything, so we can safely bound its
> output type to `Void`. I've updated the KIP.
>
> Thanks,
> -John
>
> On Wed, Jul 24, 2019 at 3:08 PM Matthias J. Sax 
> wrote:
>
>> If we don't fix the `ProcessorContext` now, how would an upgrade path
>> look like?
>>
>> We woudl deprecate existing `init()` and add a new `init()`, and during
>> runtime need to call both? This sound rather error prone to me and might
>> be confusing to users? Hence, it might be beneficial to fix it right now.
>>
>> If my concerns are not valid, and we think that the upgrade path will
>> smooth, we can of course do a follow up KIP. Another possibility would
>> be, to still do an extra KIP but ensure that both KIPs are contained in
>> the same release.
>>
>> WDYT?
>>
>>
>> -Matthias
>>
>> On 7/24/19 11:55 AM, John Roesler wrote:
>> > Hey Matthias,
>> >
>> > Thanks for the review!
>> >
>> > I agree about ProcessorContext, it could certainly be split up to
>> improve
>> > compile-time clues about what is or is not permitted (like, do you just
>> > want to be able to see the extra record context vs. forawrding vs.
>> > registering state stores, as you said). But, similar to the ideas around
>> > transforms, we can hopefully make that a separate design effort outside
>> of
>> > this KIP. Is that ok with you?
>> >
>> > Note that, unlike the current Processor API, KIP-478 proposes to
>> provide a
>> > default 

Re: [DISCUSS] KIP-478 Strongly Typed Processor API

2019-07-24 Thread John Roesler
Hey Matthias,

I agree, it's worth double-checking to make sure that the upgrade path
would be smooth. There's no point in putting ourselves in an awkward jam.
I'll look into it and report back.

Regarding the global store logic, I confirmed that the "state update
processor" shouldn't be forwarding anything, so we can safely bound its
output type to `Void`. I've updated the KIP.

Thanks,
-John

On Wed, Jul 24, 2019 at 3:08 PM Matthias J. Sax 
wrote:

> If we don't fix the `ProcessorContext` now, how would an upgrade path
> look like?
>
> We woudl deprecate existing `init()` and add a new `init()`, and during
> runtime need to call both? This sound rather error prone to me and might
> be confusing to users? Hence, it might be beneficial to fix it right now.
>
> If my concerns are not valid, and we think that the upgrade path will
> smooth, we can of course do a follow up KIP. Another possibility would
> be, to still do an extra KIP but ensure that both KIPs are contained in
> the same release.
>
> WDYT?
>
>
> -Matthias
>
> On 7/24/19 11:55 AM, John Roesler wrote:
> > Hey Matthias,
> >
> > Thanks for the review!
> >
> > I agree about ProcessorContext, it could certainly be split up to improve
> > compile-time clues about what is or is not permitted (like, do you just
> > want to be able to see the extra record context vs. forawrding vs.
> > registering state stores, as you said). But, similar to the ideas around
> > transforms, we can hopefully make that a separate design effort outside
> of
> > this KIP. Is that ok with you?
> >
> > Note that, unlike the current Processor API, KIP-478 proposes to provide
> a
> > default no-op implementation of init(), which means we can deprecate it
> > later and replace it with one taking a cleaner "context" abstraction, as
> > you proposed.
> >
> > It's just that the typing change as proposed is already a very large
> design
> > and implementation scope. I fear that adding in new flavors of
> > ProcessorContext would make is much harder to actually consider the
> design,
> > and certainly stretch out the implementation phase as well.
> >
> > Regarding the documentation of non-goals, that's very good feedback. I'll
> > update the KIP.
> >
> > Regarding addGlobalStore... I'll look into it.
> >
> > Thanks!
> > -John
> >
> >
> >
> > On Wed, Jul 24, 2019 at 12:27 PM Matthias J. Sax 
> > wrote:
> >
> >> I have concerns about the latest proposal from Guozhang. However, as
> >> John said it's beyond the scope of this KIP and thus, I don't go into
> >> details. I agree thought, that the current "transformer APIs" are not
> >> ideal and could be improved.
> >>
> >>
> >> An orthogonal though is that we should split the current
> >> `ProcessorContext` into multiple interfaces. Atm, the context can be use
> >> to:
> >>
> >> - access metadata
> >> - schedule punctuation
> >> - get state stores
> >> - register state stores
> >> - forward output data
> >>
> >> (1) registering state stores is only required if one implements a custom
> >> store, but not for a regular `Processor` implementation -- hence, it's a
> >> leaking abstraction
> >>
> >> (2) for `ValueTransformer` and `flatValueTransformer` we don't want to
> >> allow forwarding key-value pairs, and hence need to throw an RTE for
> >> this case atm
> >>
> >> (3) Why do we expose `keySerde()`, `valueSerde()`, and `stateDir()`
> >> explicitly? We have already `appConfigs()` to allow users to access the
> >> configuration.
> >>
> >> Overall, it seems that `ProcessorContext` is rather convoluted. Because,
> >> we add a new `Processor` abstraction, it seems like a good opportunity
> >> to improve the interface and to not pass `ProcessroContext` into the new
> >> `Processor#init()` method, but an improved interface.
> >>
> >> Thoughts?
> >>
> >>
> >>
> >> One more nits about the KIP:
> >>
> >> I think, we should clearly state, that this change does not provide type
> >> safety for PAPI users. The following example would compile without any
> >> errors or warning, even if the types don't match:
> >>
> >>> Topology t = new Topology();
> >>> t.addSource("s", ...);
> >>> t.addProcessor("p1", new ProcessorSupplier >> BarValue>()..., "s");
> >>> t.addProcessor("p2", new ProcessorSupplier KOut,
> >> VOut>()..., "p1");
> >>
> >> Just want to make sure users understand the impact/scope of the change,
> >> especially what is _not_ achieved.
> >>
> >>
> >> About `addGlobalStore()` -- should the return types be `Void` similar to
> >> `KStream#process()`?
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 7/24/19 9:11 AM, Guozhang Wang wrote:
> >>> Sounds good to me, thanks John!
> >>>
> >>>
> >>> Guozhang
> >>>
> >>> On Wed, Jul 24, 2019 at 7:40 AM John Roesler 
> wrote:
> >>>
>  Hey Guozhang,
> 
>  Thanks for the thought! It sounds related to what I was thinking in
>  https://issues.apache.org/jira/browse/KAFKA-8396 , but a little
> >> "extra"...
> 
>  I proposed to eliminate ValueTransformer, but I believe you're right;
> we
> 

Re: [VOTE] KIP-466: Add support for List serialization and deserialization

2019-07-24 Thread Development
Hi,

Thank you everyone! 

KIP-466 Add support for List serialization and deserialization will be 
marked as accepted with 3 binding votes:
Bill Bejeck, Guozhang Wang, and Matthias J. Sax.

Best,
Daniyar Yeralin

> On Jul 24, 2019, at 5:12 PM, Bill Bejeck  wrote:
> 
> Thanks for the KIP looks to be very helpful.
> 
> +1(binding)
> 
> -Bill
> 
> On Wed, Jul 24, 2019 at 5:01 PM Guozhang Wang  wrote:
> 
>> +1 (binding).
>> 
>> Thanks Daniyar!
>> 
>> On Wed, Jul 24, 2019 at 12:04 PM John Roesler  wrote:
>> 
>>> Thanks, Daniyar,
>>> 
>>> I'm +1 (nonbinding)
>>> 
>>> -John
>>> 
>>> On Tue, Jun 11, 2019 at 1:45 PM Development  wrote:
>>> 
 Hello,
 
 I want to start a vote for KIP-466 <
 
>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-466:+Add+support+for+List%3CT%3E+serialization+and+deserialization
 <
 
>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-466:+Add+support+for+List%3CT%3E+serialization+and+deserialization
> 
 "Add support for List serialization and deserialization”.
 The implementation can be found as a PR <
 https://github.com/apache/kafka/pull/6592 <
 https://github.com/apache/kafka/pull/6592>>.
 
 Thank you all for your input and support.
 
 Best,
 Daniyar Yeralin
>>> 
>> 
>> 
>> --
>> -- Guozhang
>> 



Re: [VOTE] KIP-466: Add support for List serialization and deserialization

2019-07-24 Thread Bill Bejeck
Thanks for the KIP looks to be very helpful.

+1(binding)

-Bill

On Wed, Jul 24, 2019 at 5:01 PM Guozhang Wang  wrote:

> +1 (binding).
>
> Thanks Daniyar!
>
> On Wed, Jul 24, 2019 at 12:04 PM John Roesler  wrote:
>
> > Thanks, Daniyar,
> >
> > I'm +1 (nonbinding)
> >
> > -John
> >
> > On Tue, Jun 11, 2019 at 1:45 PM Development  wrote:
> >
> > > Hello,
> > >
> > > I want to start a vote for KIP-466 <
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-466:+Add+support+for+List%3CT%3E+serialization+and+deserialization
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-466:+Add+support+for+List%3CT%3E+serialization+and+deserialization
> > >>
> > > "Add support for List serialization and deserialization”.
> > > The implementation can be found as a PR <
> > > https://github.com/apache/kafka/pull/6592 <
> > > https://github.com/apache/kafka/pull/6592>>.
> > >
> > > Thank you all for your input and support.
> > >
> > > Best,
> > > Daniyar Yeralin
> >
>
>
> --
> -- Guozhang
>


Re: [VOTE] KIP-466: Add support for List serialization and deserialization

2019-07-24 Thread Guozhang Wang
+1 (binding).

Thanks Daniyar!

On Wed, Jul 24, 2019 at 12:04 PM John Roesler  wrote:

> Thanks, Daniyar,
>
> I'm +1 (nonbinding)
>
> -John
>
> On Tue, Jun 11, 2019 at 1:45 PM Development  wrote:
>
> > Hello,
> >
> > I want to start a vote for KIP-466 <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-466:+Add+support+for+List%3CT%3E+serialization+and+deserialization
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-466:+Add+support+for+List%3CT%3E+serialization+and+deserialization
> >>
> > "Add support for List serialization and deserialization”.
> > The implementation can be found as a PR <
> > https://github.com/apache/kafka/pull/6592 <
> > https://github.com/apache/kafka/pull/6592>>.
> >
> > Thank you all for your input and support.
> >
> > Best,
> > Daniyar Yeralin
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-07-24 Thread Levani Kokhreidze
Hi Matthias,

Thanks for the suggestion. I Don’t have strong opinion on that one.
Agree that avoiding unnecessary method overloads is a good idea.

Updated KIP

Regards,
Levani


> On Jul 24, 2019, at 8:50 PM, Matthias J. Sax  wrote:
> 
> One question:
> 
> Why do we add
> 
>> Repartitioned#with(final String name, final int numberOfPartitions)
> 
> It seems that `#with(String name)`, `#numberOfPartitions(int)` in
> combination with `withName()` and `withNumberOfPartitions()` should be
> sufficient. Users can chain the method calls.
> 
> (I think it's valuable to keep the number of overload small if possible.)
> 
> Otherwise LGTM.
> 
> 
> -Matthias
> 
> 
> On 7/23/19 2:18 PM, Levani Kokhreidze wrote:
>> Hello,
>> 
>> Thanks all for your feedback.
>> I started voting procedure for this KIP. If there’re any other concerns 
>> about this KIP, please let me know.
>> 
>> Regards,
>> Levani
>> 
>>> On Jul 20, 2019, at 8:39 PM, Levani Kokhreidze  
>>> wrote:
>>> 
>>> Hi Matthias,
>>> 
>>> Thanks for the suggestion, makes sense.
>>> I’ve updated KIP 
>>> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>>  
>>> ).
>>> 
>>> Regards,
>>> Levani
>>> 
>>> 
 On Jul 20, 2019, at 3:53 AM, Matthias J. Sax >>> > wrote:
 
 Thanks for driving the KIP.
 
 I agree that users need to be able to specify a partitioning strategy.
 
 Sophie raises a fair point about topic configs and producer configs. My
 take is, that consider `Repartitioned` as an "extension" to `Produced`,
 that adds topic configuration, is a good way to think about it and helps
 to keep the API "clean".
 
 
 With regard to method names. I would prefer to avoid abbreviations. Can
 we rename:
 
 `withNumOfPartitions` -> `withNumberOfPartitions`
 
 Furthermore, it might be good to add some more `static` methods:
 
 - Repartitioned.with(Serde, Serde)
 - Repartitioned.withNumberOfPartitions(int)
 - Repartitioned.streamPartitioner(StreamPartitioner)
 
 
 -Matthias
 
 On 7/19/19 3:33 PM, Levani Kokhreidze wrote:
> Totally agree. I think in KStream interface it makes sense to have some 
> duplicate configurations between operators in order to keep API simple 
> and usable.
> Also, as more surface API has, harder it is to have proper backward 
> compatibility.
> While initial idea of keeping topic level configs separate was exciting, 
> having Repartitioned class encapsulate some producer level configs makes 
> API more readable.
> 
> Regards,
> Levani
> 
>> On Jul 20, 2019, at 1:15 AM, Sophie Blee-Goldman > > wrote:
>> 
>> I think that is a good point about trying to keep producer level
>> configurations and (repartition) topic level considerations separate.
>> Number of partitions is definitely purely a topic level configuration. 
>> But
>> on some level, serdes and partitioners are just as much a topic
>> configuration as a producer one. You could have two producers configured
>> with different serdes and/or partitioners, but if they are writing to the
>> same topic the result would be very difficult to part. So in a sense, 
>> these
>> are configurations of topics in Streams, not just producers.
>> 
>> Another way to think of it: while the Streams API is not always true to
>> this, ideally all the relevant configs for an operator are wrapped into a
>> single object (in this case, Repartitioned). We could instead split out 
>> the
>> fields in common with Produced into a separate parameter to keep topic 
>> and
>> producer level configurations separate, but this increases the API 
>> surface
>> area by a lot. It's much more straightforward to just say "this is
>> everything that this particular operator needs" without worrying about 
>> what
>> exactly you're specifying.
>> 
>> I suppose you could alternatively make Produced a field of Repartitioned,
>> but I don't think we do this kind of composition elsewhere in Streams at
>> the moment
>> 
>> On Fri, Jul 19, 2019 at 1:45 PM Levani Kokhreidze 
>> mailto:levani.co...@gmail.com>>
>> wrote:
>> 
>>> Hi Bill,
>>> 
>>> Thanks a lot for the feedback.
>>> Yes, that makes sense. I’ve updated KIP with `Repartitioned#partitioner`
>>> configuration.
>>> In the beginning, I wanted to introduce a class for topic level
>>> configuration and keep topic level and producer level configurations 
>>> (such
>>> as Produced) separately (see my second email in this thread).
>>> But while looking at the semantics of KStream interface, I couldn’t 

Re: [DISCUSS] KIP-478 Strongly Typed Processor API

2019-07-24 Thread Matthias J. Sax
If we don't fix the `ProcessorContext` now, how would an upgrade path
look like?

We woudl deprecate existing `init()` and add a new `init()`, and during
runtime need to call both? This sound rather error prone to me and might
be confusing to users? Hence, it might be beneficial to fix it right now.

If my concerns are not valid, and we think that the upgrade path will
smooth, we can of course do a follow up KIP. Another possibility would
be, to still do an extra KIP but ensure that both KIPs are contained in
the same release.

WDYT?


-Matthias

On 7/24/19 11:55 AM, John Roesler wrote:
> Hey Matthias,
> 
> Thanks for the review!
> 
> I agree about ProcessorContext, it could certainly be split up to improve
> compile-time clues about what is or is not permitted (like, do you just
> want to be able to see the extra record context vs. forawrding vs.
> registering state stores, as you said). But, similar to the ideas around
> transforms, we can hopefully make that a separate design effort outside of
> this KIP. Is that ok with you?
> 
> Note that, unlike the current Processor API, KIP-478 proposes to provide a
> default no-op implementation of init(), which means we can deprecate it
> later and replace it with one taking a cleaner "context" abstraction, as
> you proposed.
> 
> It's just that the typing change as proposed is already a very large design
> and implementation scope. I fear that adding in new flavors of
> ProcessorContext would make is much harder to actually consider the design,
> and certainly stretch out the implementation phase as well.
> 
> Regarding the documentation of non-goals, that's very good feedback. I'll
> update the KIP.
> 
> Regarding addGlobalStore... I'll look into it.
> 
> Thanks!
> -John
> 
> 
> 
> On Wed, Jul 24, 2019 at 12:27 PM Matthias J. Sax 
> wrote:
> 
>> I have concerns about the latest proposal from Guozhang. However, as
>> John said it's beyond the scope of this KIP and thus, I don't go into
>> details. I agree thought, that the current "transformer APIs" are not
>> ideal and could be improved.
>>
>>
>> An orthogonal though is that we should split the current
>> `ProcessorContext` into multiple interfaces. Atm, the context can be use
>> to:
>>
>> - access metadata
>> - schedule punctuation
>> - get state stores
>> - register state stores
>> - forward output data
>>
>> (1) registering state stores is only required if one implements a custom
>> store, but not for a regular `Processor` implementation -- hence, it's a
>> leaking abstraction
>>
>> (2) for `ValueTransformer` and `flatValueTransformer` we don't want to
>> allow forwarding key-value pairs, and hence need to throw an RTE for
>> this case atm
>>
>> (3) Why do we expose `keySerde()`, `valueSerde()`, and `stateDir()`
>> explicitly? We have already `appConfigs()` to allow users to access the
>> configuration.
>>
>> Overall, it seems that `ProcessorContext` is rather convoluted. Because,
>> we add a new `Processor` abstraction, it seems like a good opportunity
>> to improve the interface and to not pass `ProcessroContext` into the new
>> `Processor#init()` method, but an improved interface.
>>
>> Thoughts?
>>
>>
>>
>> One more nits about the KIP:
>>
>> I think, we should clearly state, that this change does not provide type
>> safety for PAPI users. The following example would compile without any
>> errors or warning, even if the types don't match:
>>
>>> Topology t = new Topology();
>>> t.addSource("s", ...);
>>> t.addProcessor("p1", new ProcessorSupplier> BarValue>()..., "s");
>>> t.addProcessor("p2", new ProcessorSupplier> VOut>()..., "p1");
>>
>> Just want to make sure users understand the impact/scope of the change,
>> especially what is _not_ achieved.
>>
>>
>> About `addGlobalStore()` -- should the return types be `Void` similar to
>> `KStream#process()`?
>>
>>
>>
>> -Matthias
>>
>>
>> On 7/24/19 9:11 AM, Guozhang Wang wrote:
>>> Sounds good to me, thanks John!
>>>
>>>
>>> Guozhang
>>>
>>> On Wed, Jul 24, 2019 at 7:40 AM John Roesler  wrote:
>>>
 Hey Guozhang,

 Thanks for the thought! It sounds related to what I was thinking in
 https://issues.apache.org/jira/browse/KAFKA-8396 , but a little
>> "extra"...

 I proposed to eliminate ValueTransformer, but I believe you're right; we
 could eliminate Transformer also and just use Processor in the
>> transform()
 methods.

 To your first bullet, regarding transform/flatTransform... I'd argue
>> that
 the difference isn't material and if we switch to just using
 context.forward instead of returns, then we just need one and people can
 call forward as much as they want. It certainly warrants further
 discussion, though...

 To the second point, yes, I'm thinking that we can eschew the
 ValueTransformer and instead do something like ignore the forwarded key
>> or
 check the key for serial identity, etc.

 The ultimate advantage of these ideas is that we reduce the number of
 

[jira] [Created] (KAFKA-8711) Kafka 2.3.0 Transient Unit Test Failures SocketServerTest. testControlPlaneRequest

2019-07-24 Thread Chandrasekhar (JIRA)
Chandrasekhar created KAFKA-8711:


 Summary: Kafka 2.3.0 Transient Unit Test Failures 
SocketServerTest. testControlPlaneRequest
 Key: KAFKA-8711
 URL: https://issues.apache.org/jira/browse/KAFKA-8711
 Project: Kafka
  Issue Type: Test
  Components: core, unit tests
Affects Versions: 2.3.0
Reporter: Chandrasekhar


Cloned Kakfa 2.3.0 source to our git repo and compiled it using 'gradle build', 
we see the following error consistently:

Gradle Version 4.7

 

testControlPlaneRequest
java.net.BindException: Address already in use (Bind failed)
    at java.net.PlainSocketImpl.socketBind(Native Method)
    at 
java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387)
    at java.net.Socket.bind(Socket.java:644)
    at java.net.Socket.(Socket.java:433)
    at java.net.Socket.(Socket.java:286)
    at kafka.network.SocketServerTest.connect(SocketServerTest.scala:140)
    at 
kafka.network.SocketServerTest.$anonfun$testControlPlaneRequest$1(SocketServerTest.scala:200)
    at 
kafka.network.SocketServerTest.$anonfun$testControlPlaneRequest$1$adapted(SocketServerTest.scala:199)
    at 
kafka.network.SocketServerTest.withTestableServer(SocketServerTest.scala:1141)
    at 
kafka.network.SocketServerTest.testControlPlaneRequest(SocketServerTest.scala:199)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
    at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
    at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
    at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
    at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
    at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
    at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
    at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
    at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
    at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
    at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
    at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
    at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
    at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
    at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
    at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
    at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
    at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
    at com.sun.proxy.$Proxy1.processTestClass(Unknown Source)
    at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:109)
    at sun.reflect.GeneratedMethodAccessor30.invoke(Unknown Source)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 

Re: [VOTE] KIP-466: Add support for List serialization and deserialization

2019-07-24 Thread John Roesler
Thanks, Daniyar,

I'm +1 (nonbinding)

-John

On Tue, Jun 11, 2019 at 1:45 PM Development  wrote:

> Hello,
>
> I want to start a vote for KIP-466 <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-466:+Add+support+for+List%3CT%3E+serialization+and+deserialization
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-466:+Add+support+for+List%3CT%3E+serialization+and+deserialization>>
> "Add support for List serialization and deserialization”.
> The implementation can be found as a PR <
> https://github.com/apache/kafka/pull/6592 <
> https://github.com/apache/kafka/pull/6592>>.
>
> Thank you all for your input and support.
>
> Best,
> Daniyar Yeralin


Re: [DISCUSS] KIP-478 Strongly Typed Processor API

2019-07-24 Thread John Roesler
Hey Matthias,

Thanks for the review!

I agree about ProcessorContext, it could certainly be split up to improve
compile-time clues about what is or is not permitted (like, do you just
want to be able to see the extra record context vs. forawrding vs.
registering state stores, as you said). But, similar to the ideas around
transforms, we can hopefully make that a separate design effort outside of
this KIP. Is that ok with you?

Note that, unlike the current Processor API, KIP-478 proposes to provide a
default no-op implementation of init(), which means we can deprecate it
later and replace it with one taking a cleaner "context" abstraction, as
you proposed.

It's just that the typing change as proposed is already a very large design
and implementation scope. I fear that adding in new flavors of
ProcessorContext would make is much harder to actually consider the design,
and certainly stretch out the implementation phase as well.

Regarding the documentation of non-goals, that's very good feedback. I'll
update the KIP.

Regarding addGlobalStore... I'll look into it.

Thanks!
-John



On Wed, Jul 24, 2019 at 12:27 PM Matthias J. Sax 
wrote:

> I have concerns about the latest proposal from Guozhang. However, as
> John said it's beyond the scope of this KIP and thus, I don't go into
> details. I agree thought, that the current "transformer APIs" are not
> ideal and could be improved.
>
>
> An orthogonal though is that we should split the current
> `ProcessorContext` into multiple interfaces. Atm, the context can be use
> to:
>
> - access metadata
> - schedule punctuation
> - get state stores
> - register state stores
> - forward output data
>
> (1) registering state stores is only required if one implements a custom
> store, but not for a regular `Processor` implementation -- hence, it's a
> leaking abstraction
>
> (2) for `ValueTransformer` and `flatValueTransformer` we don't want to
> allow forwarding key-value pairs, and hence need to throw an RTE for
> this case atm
>
> (3) Why do we expose `keySerde()`, `valueSerde()`, and `stateDir()`
> explicitly? We have already `appConfigs()` to allow users to access the
> configuration.
>
> Overall, it seems that `ProcessorContext` is rather convoluted. Because,
> we add a new `Processor` abstraction, it seems like a good opportunity
> to improve the interface and to not pass `ProcessroContext` into the new
> `Processor#init()` method, but an improved interface.
>
> Thoughts?
>
>
>
> One more nits about the KIP:
>
> I think, we should clearly state, that this change does not provide type
> safety for PAPI users. The following example would compile without any
> errors or warning, even if the types don't match:
>
> > Topology t = new Topology();
> > t.addSource("s", ...);
> > t.addProcessor("p1", new ProcessorSupplier BarValue>()..., "s");
> > t.addProcessor("p2", new ProcessorSupplier VOut>()..., "p1");
>
> Just want to make sure users understand the impact/scope of the change,
> especially what is _not_ achieved.
>
>
> About `addGlobalStore()` -- should the return types be `Void` similar to
> `KStream#process()`?
>
>
>
> -Matthias
>
>
> On 7/24/19 9:11 AM, Guozhang Wang wrote:
> > Sounds good to me, thanks John!
> >
> >
> > Guozhang
> >
> > On Wed, Jul 24, 2019 at 7:40 AM John Roesler  wrote:
> >
> >> Hey Guozhang,
> >>
> >> Thanks for the thought! It sounds related to what I was thinking in
> >> https://issues.apache.org/jira/browse/KAFKA-8396 , but a little
> "extra"...
> >>
> >> I proposed to eliminate ValueTransformer, but I believe you're right; we
> >> could eliminate Transformer also and just use Processor in the
> transform()
> >> methods.
> >>
> >> To your first bullet, regarding transform/flatTransform... I'd argue
> that
> >> the difference isn't material and if we switch to just using
> >> context.forward instead of returns, then we just need one and people can
> >> call forward as much as they want. It certainly warrants further
> >> discussion, though...
> >>
> >> To the second point, yes, I'm thinking that we can eschew the
> >> ValueTransformer and instead do something like ignore the forwarded key
> or
> >> check the key for serial identity, etc.
> >>
> >> The ultimate advantage of these ideas is that we reduce the number of
> >> interface variants and we also give people just one way to pass values
> >> forward instead of two.
> >>
> >> Of course, it's beyond the scope of this KIP, but this KIP is a
> >> precondition for these further improvements.
> >>
> >> I'm copying your comment onto the ticket for posterity.
> >>
> >> Thanks!
> >> -John
> >>
> >> On Tue, Jul 23, 2019 at 5:38 PM Guozhang Wang 
> wrote:
> >>
> >>> Hi John,
> >>>
> >>> Just a wild thought about Transformer: now with the new Processor >>> KOut, VIn, VOut>#init(ProcessorContext), do we still need a
> >>> Transformer (and even ValueTransformer / ValueTransformerWithKey)?
> >>>
> >>> What if:
> >>>
> >>> * We just make KStream#transform to get a ProcessorSupplier as well,
> and
> 

Re: [DISCUSS] KIP-466: Add support for List serialization and deserialization

2019-07-24 Thread Matthias J. Sax
Thanks!

Glad we are on the same page on how to address the cyclic dependency issue.


-Matthias

On 7/24/19 8:09 AM, Development wrote:
> KIP-466 is updated and new commit is pushed.
> 
> Thank you guys!
> 
>> On Jul 24, 2019, at 10:53 AM, John Roesler  wrote:
>>
>> Ah, thanks for setting me straight, Matthias.
>>
>> Given the choice between defining the Serde in the streams module (hence it
>> would not be in the Serdes "menu" class) or defining the configuration
>> property in CommonClientConfig, I think I'm leaning toward the latter.
>>
>> Really good catch on the ProducerConfig; otherwise, I think we should go
>> ahead and add the serializer/deserializer configs as discussed to
>> ProducerConfig and ConsumerConfig. It's just cleaner and more uniform that
>> way.
>>
>> Thanks again,
>> -John
>>
>> On Tue, Jul 23, 2019 at 8:08 PM Matthias J. Sax 
>> wrote:
>>
> Just to make sure I understand the problem you're highlighting:
> I guess the difference is that the serializer and deserializer that are
> nested inside the serde also need to be configured? So, by default I'd
>>> have
> to specify all six configs when I'm using Streams?
>>>
>>> That is not the problem. And you actually describe the solution for it
>>> yourself:
>>>
> I guess in the Serde, it could make use of a package-protected
>>> constructor
> for the serializer and deserializer that fixes the list type and inner
>>> type
> to the serde-configured ones. Then, when you're configuring Streams, you
> only need to specify the StreamsConfigs.
>>>
>>>
>>>
>>>
>>> The problem is, that `ListSerde` is in package `clients` and thus
>>> `ListSerde` cannot access `StreamsConfig`, and hence cannot use
>>> `StreamsConfig#DEFAULT_LIST_KEY_SERDE_TYPE` (and others). Therefore, we
>>> either need to hard-code strings literal for the config names (what does
>>> not sound right) or add `CommonClientConfig#DEFAULT_LIST_KEY_SERDE_TYPE`
>>> (and others).
>>>
>>> In StreamsConfig we would just redefine them for convenience:
>>>
 public static final String DEFAULT_LIST_KEY_SERDE_TYPE =
>>> CommonClientConfig#DEFAULT_LIST_KEY_SERDE_TYPE;
>>>
>>>
>>> Note that `TimeWindowSerde` is contained in `streams` package and thus
>>> it can access `StreamsConfig` and
>>> `StreamsConfig#DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS`.
>>>
>>>
>>>
>>>
>>> Btw: I just realized that we actually don't need `ProducerConfig`
>>>
 list.key/value.serializer.type
>>>
>>> because the list-type is irrelevant on write. We only need `inner` config.
>>>
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 7/23/19 1:30 PM, John Roesler wrote:
 Hmm, that's a tricky situation.

 I think Daniyar was on the right track... Producer only cares about
 serializer configs, and Consumer only cares about deserializer configs.

 I didn't see the problem with your proposal:

 ProducerConfig:
> list.key/value.serializer.type
> list.key/value.serializer.inner
> ConsumerConfig:
> list.key/value.deserializer.type
> list.key/value.deserializer.inner
> StreamsConfig:
> default.list.key/value.serde.type
> default.list.key/value.serde.inner


 It seems like the key/value serde configs are a better analogy than the
 windowed serde.
 ProducerConfig: key.serializer
 ConsumerConfig: key.deserializer
 StreamsConfig: default.key.serde

 Just to make sure I understand the problem you're highlighting:
 I guess the difference is that the serializer and deserializer that are
 nested inside the serde also need to be configured? So, by default I'd
>>> have
 to specify all six configs when I'm using Streams?

 I guess in the Serde, it could make use of a package-protected
>>> constructor
 for the serializer and deserializer that fixes the list type and inner
>>> type
 to the serde-configured ones. Then, when you're configuring Streams, you
 only need to specify the StreamsConfigs.

 Does that work?
 -John


 On Tue, Jul 23, 2019 at 11:39 AM Development  wrote:

> Bump
>
>> On Jul 22, 2019, at 11:26 AM, Development  wrote:
>>
>> Hey Matthias,
>>
>> It looks a little confusing, but I don’t have enough expertise to judge
> on the configuration placement.
>>
>> If you think, it is fine I’ll go ahead with this approach.
>>
>> Best,
>> Daniyar Yeralin
>>
>>> On Jul 19, 2019, at 5:49 PM, Matthias J. Sax 
> wrote:
>>>
>>> Good point.
>>>
>>> I guess the simplest solution is, to actually add
>>>
> default.list.key/value.serde.type
> default.list.key/value.serde.inner
>>>
>>> to both `CommonClientConfigs` and `StreamsConfig`.
>>>
>>> It's not super clean, but I think it's the best we can do. Thoughts?
>>>
>>>
>>> -Matthias
>>>
>>> On 7/19/19 1:23 PM, Development wrote:
 Hi Matthias,

 I agree, 

Re: [VOTE] KIP-466: Add support for List serialization and deserialization

2019-07-24 Thread Matthias J. Sax
+1 (binding)


On 7/18/19 11:37 AM, Guozhang Wang wrote:
> +1 (binding). Thanks Daniyar!
> 
> 
> Guozhang
> 
> On Tue, Jul 16, 2019 at 2:13 PM John Roesler  wrote:
> 
>> Thanks, Daniyar!
>>
>> I'm +1 (nonbinding)
>>
>> -John
>>
>> On Tue, Jul 16, 2019 at 3:14 PM Development  wrote:
>>>
>>> Hi,
>>>
>>> I’d like to start a vote thread for KIP-466.
>>>
>>> This addition will introduce new serde type ListSerde.
>>>
>>> More info at:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization
>>>
>>> Best,
>>> Daniyar Yeralin
>>
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-07-24 Thread Matthias J. Sax
One question:

Why do we add

> Repartitioned#with(final String name, final int numberOfPartitions)

It seems that `#with(String name)`, `#numberOfPartitions(int)` in
combination with `withName()` and `withNumberOfPartitions()` should be
sufficient. Users can chain the method calls.

(I think it's valuable to keep the number of overload small if possible.)

Otherwise LGTM.


-Matthias


On 7/23/19 2:18 PM, Levani Kokhreidze wrote:
> Hello,
> 
> Thanks all for your feedback.
> I started voting procedure for this KIP. If there’re any other concerns about 
> this KIP, please let me know.
> 
> Regards,
> Levani
> 
>> On Jul 20, 2019, at 8:39 PM, Levani Kokhreidze  
>> wrote:
>>
>> Hi Matthias,
>>
>> Thanks for the suggestion, makes sense.
>> I’ve updated KIP 
>> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
>>  
>> ).
>>
>> Regards,
>> Levani
>>
>>
>>> On Jul 20, 2019, at 3:53 AM, Matthias J. Sax >> > wrote:
>>>
>>> Thanks for driving the KIP.
>>>
>>> I agree that users need to be able to specify a partitioning strategy.
>>>
>>> Sophie raises a fair point about topic configs and producer configs. My
>>> take is, that consider `Repartitioned` as an "extension" to `Produced`,
>>> that adds topic configuration, is a good way to think about it and helps
>>> to keep the API "clean".
>>>
>>>
>>> With regard to method names. I would prefer to avoid abbreviations. Can
>>> we rename:
>>>
>>> `withNumOfPartitions` -> `withNumberOfPartitions`
>>>
>>> Furthermore, it might be good to add some more `static` methods:
>>>
>>> - Repartitioned.with(Serde, Serde)
>>> - Repartitioned.withNumberOfPartitions(int)
>>> - Repartitioned.streamPartitioner(StreamPartitioner)
>>>
>>>
>>> -Matthias
>>>
>>> On 7/19/19 3:33 PM, Levani Kokhreidze wrote:
 Totally agree. I think in KStream interface it makes sense to have some 
 duplicate configurations between operators in order to keep API simple and 
 usable.
 Also, as more surface API has, harder it is to have proper backward 
 compatibility.
 While initial idea of keeping topic level configs separate was exciting, 
 having Repartitioned class encapsulate some producer level configs makes 
 API more readable.

 Regards,
 Levani

> On Jul 20, 2019, at 1:15 AM, Sophie Blee-Goldman  > wrote:
>
> I think that is a good point about trying to keep producer level
> configurations and (repartition) topic level considerations separate.
> Number of partitions is definitely purely a topic level configuration. But
> on some level, serdes and partitioners are just as much a topic
> configuration as a producer one. You could have two producers configured
> with different serdes and/or partitioners, but if they are writing to the
> same topic the result would be very difficult to part. So in a sense, 
> these
> are configurations of topics in Streams, not just producers.
>
> Another way to think of it: while the Streams API is not always true to
> this, ideally all the relevant configs for an operator are wrapped into a
> single object (in this case, Repartitioned). We could instead split out 
> the
> fields in common with Produced into a separate parameter to keep topic and
> producer level configurations separate, but this increases the API surface
> area by a lot. It's much more straightforward to just say "this is
> everything that this particular operator needs" without worrying about 
> what
> exactly you're specifying.
>
> I suppose you could alternatively make Produced a field of Repartitioned,
> but I don't think we do this kind of composition elsewhere in Streams at
> the moment
>
> On Fri, Jul 19, 2019 at 1:45 PM Levani Kokhreidze  >
> wrote:
>
>> Hi Bill,
>>
>> Thanks a lot for the feedback.
>> Yes, that makes sense. I’ve updated KIP with `Repartitioned#partitioner`
>> configuration.
>> In the beginning, I wanted to introduce a class for topic level
>> configuration and keep topic level and producer level configurations 
>> (such
>> as Produced) separately (see my second email in this thread).
>> But while looking at the semantics of KStream interface, I couldn’t 
>> really
>> figure out good operation name for Topic level configuration class and 
>> just
>> introducing `Topic` config class was kinda breaking the semantics.
>> So I think having Repartitioned class which encapsulates topic and
>> producer level configurations for internal topics is viable thing to do.
>>
>> Regards,
>> Levani
>>
>>> On Jul 19, 2019, at 7:47 PM, Bill Bejeck 

Re: [DISCUSS] KIP-478 Strongly Typed Processor API

2019-07-24 Thread Matthias J. Sax
I have concerns about the latest proposal from Guozhang. However, as
John said it's beyond the scope of this KIP and thus, I don't go into
details. I agree thought, that the current "transformer APIs" are not
ideal and could be improved.


An orthogonal though is that we should split the current
`ProcessorContext` into multiple interfaces. Atm, the context can be use to:

- access metadata
- schedule punctuation
- get state stores
- register state stores
- forward output data

(1) registering state stores is only required if one implements a custom
store, but not for a regular `Processor` implementation -- hence, it's a
leaking abstraction

(2) for `ValueTransformer` and `flatValueTransformer` we don't want to
allow forwarding key-value pairs, and hence need to throw an RTE for
this case atm

(3) Why do we expose `keySerde()`, `valueSerde()`, and `stateDir()`
explicitly? We have already `appConfigs()` to allow users to access the
configuration.

Overall, it seems that `ProcessorContext` is rather convoluted. Because,
we add a new `Processor` abstraction, it seems like a good opportunity
to improve the interface and to not pass `ProcessroContext` into the new
`Processor#init()` method, but an improved interface.

Thoughts?



One more nits about the KIP:

I think, we should clearly state, that this change does not provide type
safety for PAPI users. The following example would compile without any
errors or warning, even if the types don't match:

> Topology t = new Topology();
> t.addSource("s", ...);
> t.addProcessor("p1", new ProcessorSupplier()..., 
> "s");
> t.addProcessor("p2", new ProcessorSupplier VOut>()..., "p1");

Just want to make sure users understand the impact/scope of the change,
especially what is _not_ achieved.


About `addGlobalStore()` -- should the return types be `Void` similar to
`KStream#process()`?



-Matthias


On 7/24/19 9:11 AM, Guozhang Wang wrote:
> Sounds good to me, thanks John!
> 
> 
> Guozhang
> 
> On Wed, Jul 24, 2019 at 7:40 AM John Roesler  wrote:
> 
>> Hey Guozhang,
>>
>> Thanks for the thought! It sounds related to what I was thinking in
>> https://issues.apache.org/jira/browse/KAFKA-8396 , but a little "extra"...
>>
>> I proposed to eliminate ValueTransformer, but I believe you're right; we
>> could eliminate Transformer also and just use Processor in the transform()
>> methods.
>>
>> To your first bullet, regarding transform/flatTransform... I'd argue that
>> the difference isn't material and if we switch to just using
>> context.forward instead of returns, then we just need one and people can
>> call forward as much as they want. It certainly warrants further
>> discussion, though...
>>
>> To the second point, yes, I'm thinking that we can eschew the
>> ValueTransformer and instead do something like ignore the forwarded key or
>> check the key for serial identity, etc.
>>
>> The ultimate advantage of these ideas is that we reduce the number of
>> interface variants and we also give people just one way to pass values
>> forward instead of two.
>>
>> Of course, it's beyond the scope of this KIP, but this KIP is a
>> precondition for these further improvements.
>>
>> I'm copying your comment onto the ticket for posterity.
>>
>> Thanks!
>> -John
>>
>> On Tue, Jul 23, 2019 at 5:38 PM Guozhang Wang  wrote:
>>
>>> Hi John,
>>>
>>> Just a wild thought about Transformer: now with the new Processor>> KOut, VIn, VOut>#init(ProcessorContext), do we still need a
>>> Transformer (and even ValueTransformer / ValueTransformerWithKey)?
>>>
>>> What if:
>>>
>>> * We just make KStream#transform to get a ProcessorSupplier as well, and
>>> inside `process()` we check that at most one `context.forward()` is
>> called,
>>> and then take it as the return value.
>>> * We would still use ValueTransformer for KStream#transformValue, or we
>> can
>>> also use a `ProcessorSupplier where we allow at most one
>>> `context.forward()` AND we ignore whatever passed in as key but just use
>>> the original key.
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Tue, Jul 16, 2019 at 9:03 AM John Roesler  wrote:
>>>
 Hi again, all,

 I have started the voting thread. Please cast your votes (or voice
 your objections)! The vote will remain open at least 72 hours. Once it
 closes, I can send the PR pretty quickly.

 Thanks for all you help ironing out the details on this feature.
 -John

 On Mon, Jul 15, 2019 at 5:09 PM John Roesler 
>> wrote:
>
> Hey all,
>
> It sounds like there's general agreement now on this KIP, so I
>> updated
> the KIP to fit in with Guozhang's overall proposed package structure.
> Specifically, the proposed name for the new Processor interface is
> "org.apache.kafka.streams.processor.api.Processor".
>
> If there are no objections, then I plan to start the vote tomorrow!
>
> Thanks, all, for your contributions.
> -John
>
> On Thu, Jul 11, 2019 at 1:50 PM Matthias J. Sax <
>> matth...@confluent.io

Re: [DISCUSS] KIP-487: Automatic Topic Creation on Producer

2019-07-24 Thread Justine Olshan
Hi,
Just a friendly reminder to take a look at this KIP if you have the time.

I was thinking about broker vs. client default precedence, and I think it
makes sense to keep the broker as the default used when both client-side
and broker-side defaults are configured. The idea is that there would be
pretty clear documentation, and that many systems with configurations that
the client could not change would likely have the auto-create default off.
(In cloud for example).

It also seems like in most cases, the consumer config
'allow.auto.create.topics' was created to actually prevent the creation of
topics, so the loss of creation functionality will not be a big problem.

 I'm happy to discuss any other compatibility problems or components of
this KIP.

Thank you,
Justine

On Wed, Jul 17, 2019 at 9:11 AM Justine Olshan  wrote:

> Hello all,
>
> I was looking at this KIP again, and there is a decision I made that I
> think is worth discussing.
>
> In the case where both the broker and producer's
> 'auto.create.topics.enable' are set to true, we have to choose either the
> broker configs or the producer configs for the replication
> factor/partitions.
>
> Currently, the decision is to have the broker defaults take precedence.
> (It is easier to do this in the implementation.) It also makes some sense
> for this behavior to take precedence since this behavior already occurs as
> the default.
>
> However, I was wondering if it would be odd for those who can only see the
> producer side to set configs for replication factor/partitions and see
> different results. Currently the documentation for the config states that
> the config values are only used when the broker config is not enabled, but
> this might not always be clear to the user.  Changing the code to have the
> producer's configurations take precedence is possible, but I was wondering
> what everyone thought.
>
> Thank you,
> Justine
>
> On Fri, Jul 12, 2019 at 2:49 PM Justine Olshan 
> wrote:
>
>> Just a quick update--
>>
>> It seems that enabling both the broker and producer configs works fine,
>> except that the broker configurations for partitions, replication factor
>> take precedence.
>> I don't know if that is something we would want to change, but I'll be
>> updating the KIP for now to reflect this. Perhaps we would want to add more
>> to the documentation of the the producer configs to clarify.
>>
>> Thank you,
>> Justine
>>
>> On Fri, Jul 12, 2019 at 9:28 AM Justine Olshan 
>> wrote:
>>
>>> Hi Colin,
>>>
>>> Thanks for looking at the KIP. I can definitely add to the title to make
>>> it more clear.
>>>
>>> It makes sense that both configurations could be turned on since there
>>> are many cases where the user can not control the server-side
>>> configurations. I was a little concerned about how both interacting would
>>> work out -- if there would be to many requests for new topics, for example.
>>> But it since it does make sense to allow both configurations enabled, I
>>> will test out some scenarios and I'll change the KIP to support this.
>>>
>>> I also agree with documentation about distinguishing the differences. I
>>> was having some trouble with the wording but I like the phrases
>>> "server-side" and "client-side." That's a good distinction I can use when
>>> describing.
>>>
>>> I'll try to update the KIP soon keeping everyone's input in mind.
>>>
>>> Thanks,
>>> Justine
>>>
>>> On Thu, Jul 11, 2019 at 5:39 PM Colin McCabe  wrote:
>>>
 Hi Justine,

 Thanks for the KIP.  This seems like a good step towards removing
 server-side topic auto-creation.

 We should add included "client-side" to the title of the KIP somewhere,
 to make it clear that we're talking about client-side auto creation.

 The KIP says:
 > In order to automatically create topics with the producer, the
 producer's
 > auto.create.topics.enable config must be set to true and the broker
 config should be set to false

 From a user's point of view, this seems counter-intuitive.  In order to
 auto-create topics the broker's auto.create.topics.enable config should be
 set to false?  It seems like the server-side auto-create is unrelated to
 the client-side auto-create.  We could have both turned on (and I'm sure
 that in the real world, people will try this configuration...)  There's no
 reason not to support this, I think.

 We should add some documentation explaining the difference between
 server-side and client-side auto-creation.  Without documentation, an admin
 might think that they had disabled all forms of auto-creation by setting
 the -side setting to false-- but this is not the case, of course.

 best,
 Colin


 On Thu, Jul 11, 2019, at 16:22, Justine Olshan wrote:
 > Hi Dhruvil,
 >
 > Thanks for reading the KIP!
 > That was the general idea for deprecation. We would log a warning
 when the
 > config is enabled on the 

Re: [VOTE] KIP-221: Enhance KStream with Connecting Topic Creation and Repartition Hint

2019-07-24 Thread Sophie Blee-Goldman
Looks good! Thanks Levani,

+1 (non-binding)

Sophie

On Tue, Jul 23, 2019 at 2:16 PM Levani Kokhreidze 
wrote:

> Hello,
>
> I’d like to initialize voting on KIP-221:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
> >
> If there’re any more concerns about the KIP, happy to discuss further.
>
> Regards,
> Levani


Re: [DISCUSS] KIP-490: log when consumer groups lose a message because offset has been deleted

2019-07-24 Thread Jose M
Hello Kamal,

The compacted topics are excluded from the KIP, because users of compacted
topics are mainly interested on the last state for a certain key, and can
afford to miss intermediary states. Technically is possible to know if the
topic is compacted through "log.config.compact" attribute. Thanks a lot for
your feedback!

Ive updated the KIP to precise:

   - compacted topics are excluded of the KIP.
   - instead of logging on the broker, I propose to create a new metric,
   following Colin's comment (thanks a lot!)

Thanks,

Jose

On Tue, Jul 23, 2019 at 11:45 AM Kamal Chandraprakash <
kamal.chandraprak...@gmail.com> wrote:

> Jose,
>
> How do you differentiate the compaction topics from the time retention
> topics? Deleting a message due to compaction policy is a valid case
> and users won't be interested in monitoring/reading those deleted messages.
>
> Thanks,
> Kamal
>
> On Tue, Jul 23, 2019 at 4:00 AM Jose M  wrote:
>
> > Hi Colin,
> >
> > Thanks a lot for your feedback. Please note that I only propose to log
> when
> > a message is lost this for a set of consumer groups, not as default
> > behaviour for all consumer groups.
> > But in fact, I agree with you that to log a line per message expired can
> be
> > quite lot, and that is not the better way do it. I can propose to add a
> > dedicated JMX metric of type counter "expired messages" per consumer
> group.
> > What do you think ?
> >
> > About monitoring the lag to ensure that messages are not lost, I know
> that
> > is what clients do, to set up alerting when the lag is above a threshold.
> > But even if the alert is triggered, we dont know if messages have been
> lost
> > or not. Implementing this KIP clients would know if something has been
> > missed or not.
> >
> >
> > Thanks,
> >
> >
> > Jose
> >
> > On Mon, Jul 22, 2019 at 5:51 PM Colin McCabe  wrote:
> >
> > > Hi Jose,
> > >
> > > One issue that I see here is that the number of log messages could be
> > > huge.  I've seen people create tens of thousands of consumer groups.
> > > People can also have settings that create pretty small log files.  A
> > > message per log file per group could be quite a lot of messages.
> > >
> > > A log message on the broker is also not that useful for detecting bad
> > > client behavior.  People generally only look at the server logs after
> > they
> > > become aware that something is wrong through some other means.
> > >
> > > Perhaps the clients should just monitor their lag?  There is a JMX
> metric
> > > for this, which means it can be hooked into traditional metrics /
> > reporting
> > > systems.
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Mon, Jul 22, 2019, at 03:12, Jose M wrote:
> > > > Hello,
> > > >
> > > > I didn't get any feedback on this small KIP-490
> > > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-490%3A+log+when+consumer+groups+lose+a+message+because+offset+has+been+deleted
> > > >.
> > > > In summary, I propose a way to be noticed when messages are being
> > > > removed
> > > > due to retention policy, without being consumed by a given consumer
> > > > group.
> > > > It will be useful to realize that some important messages have been
> > > > lost.
> > > >
> > > > As Im new to the codebase, I have technical questions about how to
> > > achieve
> > > > this, but before going deeper, I would like your feedback on the
> > feature.
> > > >
> > > > Thanks a lot,
> > > >
> > > >
> > > > Jose Morales
> > > >
> > > > On Sun, Jul 14, 2019 at 12:51 AM Jose M  wrote:
> > > >
> > > > > Hello,
> > > > >
> > > > > I would like to know what do you think on KIP-490:
> > > > >
> > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-490%3A+log+when+consumer+groups+lose+a+message+because+offset+has+been+deleted
> > > > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-490%3A+log+when+consumer+groups+lose+a+message+because+offset+has+expired
> > > >
> > > > >
> > > > >
> > > > > Thanks a lot !
> > > > > --
> > > > > Jose M
> > > > >
> > > >
> > > >
> > > > --
> > > > J
> > > >
> > >
> >
> >
> > --
> > J
> >
>


-- 
J


Re: Fwd: [DISCUSS] KIP-492 Add java security providers in Kafka Security config

2019-07-24 Thread Harsha
Thanks for the details. 
Rajini, Can you please take a look and let us know if these addresses your 
concerns.

Thanks,
Harsha

On Mon, Jul 22, 2019, at 9:36 AM, Sandeep Mopuri wrote:
> Hi Rajini,
>  Thanks for raising the above questions. Please find the
> replies below
> 
> On Wed, Jul 17, 2019 at 2:49 AM Rajini Sivaram 
> wrote:
> 
> > Hi Sandeep,
> >
> > Thanks for the KIP. A few questions below:
> >
> >1. Is the main use case for this KIP adding security providers for SSL?
> >If so, wouldn't a more generic solution like KIP-383 work for this?
> >
>We’re trying to solve this for both SSL and SASL. KIP-383 allows the
> creation of custom SSLFactory implementation, however adding the provides
> to new security algorithms doesn’t involve any new implementation of
> SSLFactory. Even after the KIP 383, people still are finding a need for
> loading custom keymanager and trustmanager implementations (KIP 486)
> 
>2. Presumably this config would also apply to clients. If so, have we
> >thought through the implications of changing static JVM-wide security
> >providers in the client applications?
> >
>Yes, this config will be applied to clients as well and the
> responsibility to face the consequences of adding the security providers
> need to be taken by the clients. In cases of resource manager running
> streaming applications such as Yarn, Mesos etc.. each user needs to make
> sure they are passing these JVM arguments.
> 
>3. Since client applications can programmatically invoke the Java
> >Security API anyway, isn't the system property described in `Rejected
> >Alternatives` a reasonable solution for brokers?
> >
>   This is true in a kafka only environment, but with an eco-system of
> streaming applications like flink, spark etc which might produce to kafka,
> it’s difficult to make changes to all the clients
> 
>4. We have SASL login modules in Kafka that automatically add security
> >providers for SASL mechanisms not supported by the JVM. We should
> > describe
> >the impact of this KIP on those and whether we would now require a
> > config
> >to enable these security providers
> 
> In a single JVM, one can register multiple security.providers. By default
> JVM itself provides multiple providers and these will not stepped over each
> other. The only way to activate a provider is through their registered names
> Example:
> 
> $ cat /usr/lib/jvm/jdk-8-oracle-x64/jre/lib/security/java.security
> ...
> security.provider.1=sun.security.provider.Sun
> security.provider.2=sun.security.rsa.SunRsaSign
> security.provider.3=sun.security.ec.SunEC
> security.provider.4=com.sun.net.ssl.internal.ssl.Provider
> security.provider.5=com.sun.crypto.provider.SunJCE
> security.provider.6=sun.security.jgss.SunProvider
> security.provider.7=com.sun.security.sasl.Provider
> security.provider.8=org.jcp.xml.dsig.internal.dom.XMLDSigRI
> security.provider.9=sun.security.smartcardio.SunPCSC
> ...
> 
>A user of a provider will refer through their registered provider
> 
> https://github.com/spiffe/spiffe-example/blob/master/java-spiffe/spiffe-security-provider/src/main/java/spiffe/api/provider/SpiffeProvider.java#L31
> 
>In the above example , we can register the SpiffeProvider and
> multiple other providers into the JVM. When a client or a broker wants to
> integrate with SpiffeProvider they need to add the config
> ssl.keymanager.alogirhtm = "Spiffe" . Another client can refer to a
> different provider or use a default one in the same JVM.
> 
> 
> >5. We have been moving away from JVM-wide configs like the default JAAS
> >config since they are hard to test reliably or update dynamically. The
> >replacement config `sasl.jaas.config` doesn't insert a JVM-wide
> >configuration. Have we investigated similar options for the specific
> >scenario we are addressing here?
> >
>Yes, that is the case with jaas config, however in the case of
> security providers, along with adding the security providers to JVM
> properties, one also need to configure the provider algorithm. For example,
> in the case of SSL configuration, besides adding the security provider to
> the JVM, we need to configure the “ssl.trustmanager.algorithm” and
> “ssl.keymanager.algorithm” inorder for the provider implementation to
> apply. Different components can opt for different key and trustmanager
> algorithms and can work independently simultaneously in the same JVM. This
> case is different from the jaas config.
> 
> 
> >6. Are we always going to insert new providers at the start of the
> >provider list?
> 
>Can you please explain what exactly do you mean by this
> 
> >
> >
> > Regards,
> >
> > Rajini
> >
> >
> >
> > On Wed, Jul 17, 2019 at 5:05 AM Harsha  wrote:
> >
> > > Thanks for the KIP Sandeep. LGTM.
> > >
> > > Mani & Rajini, can you please look at the KIP as well.
> > >
> > > Thanks,
> > > Harsha
> > >
> > > On 

[jira] [Created] (KAFKA-8710) InitProducerId changes for KIP-360

2019-07-24 Thread Bob Barrett (JIRA)
Bob Barrett created KAFKA-8710:
--

 Summary: InitProducerId changes for KIP-360
 Key: KAFKA-8710
 URL: https://issues.apache.org/jira/browse/KAFKA-8710
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.3.0
Reporter: Bob Barrett
Assignee: Bob Barrett


As part of KIP-360, InitProducerId needs to accept two additional parameters, 
the current producerId and the current producerEpoch, and it needs to allow 
producers to safely re-initialize a producer ID and continue processing as long 
as no other producer with the same transactional ID has started up.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2019-07-24 Thread Guozhang Wang
Yeah I think I agree with you.

+1 (binding) from me.


Guozhang


On Wed, Jul 24, 2019 at 7:43 AM John Roesler  wrote:

> Hi Guozhang,
>
> Thanks! I just replied in the discuss thread. I agree with what you're
> proposing, but would like to consider it outside the scope of this KIP, if
> that's ok with you.
>
> -John
>
> On Tue, Jul 23, 2019 at 5:38 PM Guozhang Wang  wrote:
>
> > Hi John,
> >
> > I left another question regarding Transformer in the DISCUSS thread.
> Other
> > than that I think this KIP is ready. Thanks!
> >
> >
> > Guozhang
> >
> >
> > On Tue, Jul 16, 2019 at 9:01 AM John Roesler  wrote:
> >
> > > Hi Dev,
> > >
> > > After a good discussion, I'd like to start the vote for KIP-478
> > > (https://cwiki.apache.org/confluence/x/2SkLBw).
> > >
> > > The proposal is to deprecate the existing interface
> > > org.apache.kafka.streams.processor.Processor in favor of a
> > > new one, org.apache.kafka.streams.processor.api.Processor > > KOut, VOut> that parameterizes both the input and output types.
> > >
> > > This change enables both the Streams DSL internal code and external
> > > Processor API code to improve their type safety and protect themselves
> > > from type-level bugs.
> > >
> > > Thanks,
> > > -John
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-478 Strongly Typed Processor API

2019-07-24 Thread Guozhang Wang
Sounds good to me, thanks John!


Guozhang

On Wed, Jul 24, 2019 at 7:40 AM John Roesler  wrote:

> Hey Guozhang,
>
> Thanks for the thought! It sounds related to what I was thinking in
> https://issues.apache.org/jira/browse/KAFKA-8396 , but a little "extra"...
>
> I proposed to eliminate ValueTransformer, but I believe you're right; we
> could eliminate Transformer also and just use Processor in the transform()
> methods.
>
> To your first bullet, regarding transform/flatTransform... I'd argue that
> the difference isn't material and if we switch to just using
> context.forward instead of returns, then we just need one and people can
> call forward as much as they want. It certainly warrants further
> discussion, though...
>
> To the second point, yes, I'm thinking that we can eschew the
> ValueTransformer and instead do something like ignore the forwarded key or
> check the key for serial identity, etc.
>
> The ultimate advantage of these ideas is that we reduce the number of
> interface variants and we also give people just one way to pass values
> forward instead of two.
>
> Of course, it's beyond the scope of this KIP, but this KIP is a
> precondition for these further improvements.
>
> I'm copying your comment onto the ticket for posterity.
>
> Thanks!
> -John
>
> On Tue, Jul 23, 2019 at 5:38 PM Guozhang Wang  wrote:
>
> > Hi John,
> >
> > Just a wild thought about Transformer: now with the new Processor > KOut, VIn, VOut>#init(ProcessorContext), do we still need a
> > Transformer (and even ValueTransformer / ValueTransformerWithKey)?
> >
> > What if:
> >
> > * We just make KStream#transform to get a ProcessorSupplier as well, and
> > inside `process()` we check that at most one `context.forward()` is
> called,
> > and then take it as the return value.
> > * We would still use ValueTransformer for KStream#transformValue, or we
> can
> > also use a `ProcessorSupplier where we allow at most one
> > `context.forward()` AND we ignore whatever passed in as key but just use
> > the original key.
> >
> >
> > Guozhang
> >
> >
> > On Tue, Jul 16, 2019 at 9:03 AM John Roesler  wrote:
> >
> > > Hi again, all,
> > >
> > > I have started the voting thread. Please cast your votes (or voice
> > > your objections)! The vote will remain open at least 72 hours. Once it
> > > closes, I can send the PR pretty quickly.
> > >
> > > Thanks for all you help ironing out the details on this feature.
> > > -John
> > >
> > > On Mon, Jul 15, 2019 at 5:09 PM John Roesler 
> wrote:
> > > >
> > > > Hey all,
> > > >
> > > > It sounds like there's general agreement now on this KIP, so I
> updated
> > > > the KIP to fit in with Guozhang's overall proposed package structure.
> > > > Specifically, the proposed name for the new Processor interface is
> > > > "org.apache.kafka.streams.processor.api.Processor".
> > > >
> > > > If there are no objections, then I plan to start the vote tomorrow!
> > > >
> > > > Thanks, all, for your contributions.
> > > > -John
> > > >
> > > > On Thu, Jul 11, 2019 at 1:50 PM Matthias J. Sax <
> matth...@confluent.io
> > >
> > > wrote:
> > > > >
> > > > > Side remark:
> > > > >
> > > > > > Now that "flat transform" is a specific
> > > > > >> part of the API it seems okay to steer folks in that direction
> (to
> > > never
> > > > > >> use context.process in a transformer), but it should be called
> out
> > > > > >> explicitly in javadocs.  Currently Transformer (which is used
> for
> > > both
> > > > > >> transform() and flatTransform() ) doesn't really call out the
> > > ambiguity:
> > > > >
> > > > > Would you want to do a PR for address this? We are always eager to
> > > > > improve the JavaDocs!
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 7/7/19 11:26 AM, Paul Whalen wrote:
> > > > > > First of all, +1 on the whole idea, my team has run into
> > (admittedly
> > > minor,
> > > > > > but definitely annoying) issues because of the weaker typing.
> > We're
> > > heavy
> > > > > > users of the PAPI and have Processors that, while not hundreds of
> > > lines
> > > > > > long, are certainly quite hefty and call context.forward() in
> many
> > > places.
> > > > > >
> > > > > > After reading the KIP and discussion a few times, I've convinced
> > > myself
> > > > > > that any initial concerns I had aren't really concerns at all
> > (state
> > > store
> > > > > > types, for one).  One thing I will mention:  changing
> *Transformer*
> > > to have
> > > > > > ProcessorContext gave me pause, because I have code
> > that
> > > does
> > > > > > context.forward in transformers.  Now that "flat transform" is a
> > > specific
> > > > > > part of the API it seems okay to steer folks in that direction
> (to
> > > never
> > > > > > use context.process in a transformer), but it should be called
> out
> > > > > > explicitly in javadocs.  Currently Transformer (which is used for
> > > both
> > > > > > transform() and flatTransform() ) doesn't really call out the
> > > ambiguity:
> > > > > >
> > >
> >
> 

Re: [DISCUSS] KIP-466: Add support for List serialization and deserialization

2019-07-24 Thread Development
KIP-466 is updated and new commit is pushed.

Thank you guys!

> On Jul 24, 2019, at 10:53 AM, John Roesler  wrote:
> 
> Ah, thanks for setting me straight, Matthias.
> 
> Given the choice between defining the Serde in the streams module (hence it
> would not be in the Serdes "menu" class) or defining the configuration
> property in CommonClientConfig, I think I'm leaning toward the latter.
> 
> Really good catch on the ProducerConfig; otherwise, I think we should go
> ahead and add the serializer/deserializer configs as discussed to
> ProducerConfig and ConsumerConfig. It's just cleaner and more uniform that
> way.
> 
> Thanks again,
> -John
> 
> On Tue, Jul 23, 2019 at 8:08 PM Matthias J. Sax 
> wrote:
> 
 Just to make sure I understand the problem you're highlighting:
 I guess the difference is that the serializer and deserializer that are
 nested inside the serde also need to be configured? So, by default I'd
>> have
 to specify all six configs when I'm using Streams?
>> 
>> That is not the problem. And you actually describe the solution for it
>> yourself:
>> 
 I guess in the Serde, it could make use of a package-protected
>> constructor
 for the serializer and deserializer that fixes the list type and inner
>> type
 to the serde-configured ones. Then, when you're configuring Streams, you
 only need to specify the StreamsConfigs.
>> 
>> 
>> 
>> 
>> The problem is, that `ListSerde` is in package `clients` and thus
>> `ListSerde` cannot access `StreamsConfig`, and hence cannot use
>> `StreamsConfig#DEFAULT_LIST_KEY_SERDE_TYPE` (and others). Therefore, we
>> either need to hard-code strings literal for the config names (what does
>> not sound right) or add `CommonClientConfig#DEFAULT_LIST_KEY_SERDE_TYPE`
>> (and others).
>> 
>> In StreamsConfig we would just redefine them for convenience:
>> 
>>> public static final String DEFAULT_LIST_KEY_SERDE_TYPE =
>> CommonClientConfig#DEFAULT_LIST_KEY_SERDE_TYPE;
>> 
>> 
>> Note that `TimeWindowSerde` is contained in `streams` package and thus
>> it can access `StreamsConfig` and
>> `StreamsConfig#DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS`.
>> 
>> 
>> 
>> 
>> Btw: I just realized that we actually don't need `ProducerConfig`
>> 
>>> list.key/value.serializer.type
>> 
>> because the list-type is irrelevant on write. We only need `inner` config.
>> 
>> 
>> 
>> -Matthias
>> 
>> 
>> On 7/23/19 1:30 PM, John Roesler wrote:
>>> Hmm, that's a tricky situation.
>>> 
>>> I think Daniyar was on the right track... Producer only cares about
>>> serializer configs, and Consumer only cares about deserializer configs.
>>> 
>>> I didn't see the problem with your proposal:
>>> 
>>> ProducerConfig:
 list.key/value.serializer.type
 list.key/value.serializer.inner
 ConsumerConfig:
 list.key/value.deserializer.type
 list.key/value.deserializer.inner
 StreamsConfig:
 default.list.key/value.serde.type
 default.list.key/value.serde.inner
>>> 
>>> 
>>> It seems like the key/value serde configs are a better analogy than the
>>> windowed serde.
>>> ProducerConfig: key.serializer
>>> ConsumerConfig: key.deserializer
>>> StreamsConfig: default.key.serde
>>> 
>>> Just to make sure I understand the problem you're highlighting:
>>> I guess the difference is that the serializer and deserializer that are
>>> nested inside the serde also need to be configured? So, by default I'd
>> have
>>> to specify all six configs when I'm using Streams?
>>> 
>>> I guess in the Serde, it could make use of a package-protected
>> constructor
>>> for the serializer and deserializer that fixes the list type and inner
>> type
>>> to the serde-configured ones. Then, when you're configuring Streams, you
>>> only need to specify the StreamsConfigs.
>>> 
>>> Does that work?
>>> -John
>>> 
>>> 
>>> On Tue, Jul 23, 2019 at 11:39 AM Development  wrote:
>>> 
 Bump
 
> On Jul 22, 2019, at 11:26 AM, Development  wrote:
> 
> Hey Matthias,
> 
> It looks a little confusing, but I don’t have enough expertise to judge
 on the configuration placement.
> 
> If you think, it is fine I’ll go ahead with this approach.
> 
> Best,
> Daniyar Yeralin
> 
>> On Jul 19, 2019, at 5:49 PM, Matthias J. Sax 
 wrote:
>> 
>> Good point.
>> 
>> I guess the simplest solution is, to actually add
>> 
 default.list.key/value.serde.type
 default.list.key/value.serde.inner
>> 
>> to both `CommonClientConfigs` and `StreamsConfig`.
>> 
>> It's not super clean, but I think it's the best we can do. Thoughts?
>> 
>> 
>> -Matthias
>> 
>> On 7/19/19 1:23 PM, Development wrote:
>>> Hi Matthias,
>>> 
>>> I agree, ConsumerConfig did not seem like a right place for these
 configurations.
>>> I’ll put them in ProducerConfig, ConsumerConfig, and StreamsConfig.
>>> 
>>> However, I have a question. What should I do in
>> "configure(Map>>> ?> configs, 

Re: [DISCUSS] KIP-466: Add support for List serialization and deserialization

2019-07-24 Thread John Roesler
Ah, thanks for setting me straight, Matthias.

Given the choice between defining the Serde in the streams module (hence it
would not be in the Serdes "menu" class) or defining the configuration
property in CommonClientConfig, I think I'm leaning toward the latter.

Really good catch on the ProducerConfig; otherwise, I think we should go
ahead and add the serializer/deserializer configs as discussed to
ProducerConfig and ConsumerConfig. It's just cleaner and more uniform that
way.

Thanks again,
-John

On Tue, Jul 23, 2019 at 8:08 PM Matthias J. Sax 
wrote:

> >> Just to make sure I understand the problem you're highlighting:
> >> I guess the difference is that the serializer and deserializer that are
> >> nested inside the serde also need to be configured? So, by default I'd
> have
> >> to specify all six configs when I'm using Streams?
>
> That is not the problem. And you actually describe the solution for it
> yourself:
>
> >> I guess in the Serde, it could make use of a package-protected
> constructor
> >> for the serializer and deserializer that fixes the list type and inner
> type
> >> to the serde-configured ones. Then, when you're configuring Streams, you
> >> only need to specify the StreamsConfigs.
>
>
>
>
> The problem is, that `ListSerde` is in package `clients` and thus
> `ListSerde` cannot access `StreamsConfig`, and hence cannot use
> `StreamsConfig#DEFAULT_LIST_KEY_SERDE_TYPE` (and others). Therefore, we
> either need to hard-code strings literal for the config names (what does
> not sound right) or add `CommonClientConfig#DEFAULT_LIST_KEY_SERDE_TYPE`
> (and others).
>
> In StreamsConfig we would just redefine them for convenience:
>
> > public static final String DEFAULT_LIST_KEY_SERDE_TYPE =
> CommonClientConfig#DEFAULT_LIST_KEY_SERDE_TYPE;
>
>
> Note that `TimeWindowSerde` is contained in `streams` package and thus
> it can access `StreamsConfig` and
> `StreamsConfig#DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS`.
>
>
>
>
> Btw: I just realized that we actually don't need `ProducerConfig`
>
> > list.key/value.serializer.type
>
> because the list-type is irrelevant on write. We only need `inner` config.
>
>
>
> -Matthias
>
>
> On 7/23/19 1:30 PM, John Roesler wrote:
> > Hmm, that's a tricky situation.
> >
> > I think Daniyar was on the right track... Producer only cares about
> > serializer configs, and Consumer only cares about deserializer configs.
> >
> > I didn't see the problem with your proposal:
> >
> > ProducerConfig:
> >> list.key/value.serializer.type
> >> list.key/value.serializer.inner
> >> ConsumerConfig:
> >> list.key/value.deserializer.type
> >> list.key/value.deserializer.inner
> >> StreamsConfig:
> >> default.list.key/value.serde.type
> >> default.list.key/value.serde.inner
> >
> >
> > It seems like the key/value serde configs are a better analogy than the
> > windowed serde.
> > ProducerConfig: key.serializer
> > ConsumerConfig: key.deserializer
> > StreamsConfig: default.key.serde
> >
> > Just to make sure I understand the problem you're highlighting:
> > I guess the difference is that the serializer and deserializer that are
> > nested inside the serde also need to be configured? So, by default I'd
> have
> > to specify all six configs when I'm using Streams?
> >
> > I guess in the Serde, it could make use of a package-protected
> constructor
> > for the serializer and deserializer that fixes the list type and inner
> type
> > to the serde-configured ones. Then, when you're configuring Streams, you
> > only need to specify the StreamsConfigs.
> >
> > Does that work?
> > -John
> >
> >
> > On Tue, Jul 23, 2019 at 11:39 AM Development  wrote:
> >
> >> Bump
> >>
> >>> On Jul 22, 2019, at 11:26 AM, Development  wrote:
> >>>
> >>> Hey Matthias,
> >>>
> >>> It looks a little confusing, but I don’t have enough expertise to judge
> >> on the configuration placement.
> >>>
> >>> If you think, it is fine I’ll go ahead with this approach.
> >>>
> >>> Best,
> >>> Daniyar Yeralin
> >>>
>  On Jul 19, 2019, at 5:49 PM, Matthias J. Sax 
> >> wrote:
> 
>  Good point.
> 
>  I guess the simplest solution is, to actually add
> 
> >> default.list.key/value.serde.type
> >> default.list.key/value.serde.inner
> 
>  to both `CommonClientConfigs` and `StreamsConfig`.
> 
>  It's not super clean, but I think it's the best we can do. Thoughts?
> 
> 
>  -Matthias
> 
>  On 7/19/19 1:23 PM, Development wrote:
> > Hi Matthias,
> >
> > I agree, ConsumerConfig did not seem like a right place for these
> >> configurations.
> > I’ll put them in ProducerConfig, ConsumerConfig, and StreamsConfig.
> >
> > However, I have a question. What should I do in
> "configure(Map >> ?> configs, boolean isKey)” methods? Which configurations should I try
> to
> >> locate? I was comparing my (de)serializer implementations with
> >> SessionWindows(De)serializer classes, and they use StreamsConfig class
> to
> >> get  either 

Re: [DISCUSS] KIP-466: Add support for List serialization and deserialization

2019-07-24 Thread Development
Hey Matthias,

Yes, you are totally right, “list.key/value.serializer.type" in 
ProducerConfigs. Removed!

And yes, now StreamsConfig just points to configs stored in CommonClientsConfig.

I’ll update the KIP.

I think we can continue with voting now.

Best,
Daniyar Yeralin

> On Jul 23, 2019, at 9:08 PM, Matthias J. Sax  wrote:
> 
>>> Just to make sure I understand the problem you're highlighting:
>>> I guess the difference is that the serializer and deserializer that are
>>> nested inside the serde also need to be configured? So, by default I'd have
>>> to specify all six configs when I'm using Streams?
> 
> That is not the problem. And you actually describe the solution for it
> yourself:
> 
>>> I guess in the Serde, it could make use of a package-protected constructor
>>> for the serializer and deserializer that fixes the list type and inner type
>>> to the serde-configured ones. Then, when you're configuring Streams, you
>>> only need to specify the StreamsConfigs.
> 
> 
> 
> 
> The problem is, that `ListSerde` is in package `clients` and thus
> `ListSerde` cannot access `StreamsConfig`, and hence cannot use
> `StreamsConfig#DEFAULT_LIST_KEY_SERDE_TYPE` (and others). Therefore, we
> either need to hard-code strings literal for the config names (what does
> not sound right) or add `CommonClientConfig#DEFAULT_LIST_KEY_SERDE_TYPE`
> (and others).
> 
> In StreamsConfig we would just redefine them for convenience:
> 
>> public static final String DEFAULT_LIST_KEY_SERDE_TYPE = 
>> CommonClientConfig#DEFAULT_LIST_KEY_SERDE_TYPE;
> 
> 
> Note that `TimeWindowSerde` is contained in `streams` package and thus
> it can access `StreamsConfig` and
> `StreamsConfig#DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS`.
> 
> 
> 
> 
> Btw: I just realized that we actually don't need `ProducerConfig`
> 
>> list.key/value.serializer.type
> 
> because the list-type is irrelevant on write. We only need `inner` config.
> 
> 
> 
> -Matthias
> 
> 
> On 7/23/19 1:30 PM, John Roesler wrote:
>> Hmm, that's a tricky situation.
>> 
>> I think Daniyar was on the right track... Producer only cares about
>> serializer configs, and Consumer only cares about deserializer configs.
>> 
>> I didn't see the problem with your proposal:
>> 
>> ProducerConfig:
>>> list.key/value.serializer.type
>>> list.key/value.serializer.inner
>>> ConsumerConfig:
>>> list.key/value.deserializer.type
>>> list.key/value.deserializer.inner
>>> StreamsConfig:
>>> default.list.key/value.serde.type
>>> default.list.key/value.serde.inner
>> 
>> 
>> It seems like the key/value serde configs are a better analogy than the
>> windowed serde.
>> ProducerConfig: key.serializer
>> ConsumerConfig: key.deserializer
>> StreamsConfig: default.key.serde
>> 
>> Just to make sure I understand the problem you're highlighting:
>> I guess the difference is that the serializer and deserializer that are
>> nested inside the serde also need to be configured? So, by default I'd have
>> to specify all six configs when I'm using Streams?
>> 
>> I guess in the Serde, it could make use of a package-protected constructor
>> for the serializer and deserializer that fixes the list type and inner type
>> to the serde-configured ones. Then, when you're configuring Streams, you
>> only need to specify the StreamsConfigs.
>> 
>> Does that work?
>> -John
>> 
>> 
>> On Tue, Jul 23, 2019 at 11:39 AM Development  wrote:
>> 
>>> Bump
>>> 
 On Jul 22, 2019, at 11:26 AM, Development  wrote:
 
 Hey Matthias,
 
 It looks a little confusing, but I don’t have enough expertise to judge
>>> on the configuration placement.
 
 If you think, it is fine I’ll go ahead with this approach.
 
 Best,
 Daniyar Yeralin
 
> On Jul 19, 2019, at 5:49 PM, Matthias J. Sax 
>>> wrote:
> 
> Good point.
> 
> I guess the simplest solution is, to actually add
> 
>>> default.list.key/value.serde.type
>>> default.list.key/value.serde.inner
> 
> to both `CommonClientConfigs` and `StreamsConfig`.
> 
> It's not super clean, but I think it's the best we can do. Thoughts?
> 
> 
> -Matthias
> 
> On 7/19/19 1:23 PM, Development wrote:
>> Hi Matthias,
>> 
>> I agree, ConsumerConfig did not seem like a right place for these
>>> configurations.
>> I’ll put them in ProducerConfig, ConsumerConfig, and StreamsConfig.
>> 
>> However, I have a question. What should I do in "configure(Map>> ?> configs, boolean isKey)” methods? Which configurations should I try to
>>> locate? I was comparing my (de)serializer implementations with
>>> SessionWindows(De)serializer classes, and they use StreamsConfig class to
>>> get  either StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS :
>>> StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
>> 
>> In my case, as I mentioned earlier, StreamsConfig class is not
>>> accessible from org.apache.kafka.common.serialization package. So, I can’t
>>> utilize it. Any suggestions 

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2019-07-24 Thread John Roesler
Hi Guozhang,

Thanks! I just replied in the discuss thread. I agree with what you're
proposing, but would like to consider it outside the scope of this KIP, if
that's ok with you.

-John

On Tue, Jul 23, 2019 at 5:38 PM Guozhang Wang  wrote:

> Hi John,
>
> I left another question regarding Transformer in the DISCUSS thread. Other
> than that I think this KIP is ready. Thanks!
>
>
> Guozhang
>
>
> On Tue, Jul 16, 2019 at 9:01 AM John Roesler  wrote:
>
> > Hi Dev,
> >
> > After a good discussion, I'd like to start the vote for KIP-478
> > (https://cwiki.apache.org/confluence/x/2SkLBw).
> >
> > The proposal is to deprecate the existing interface
> > org.apache.kafka.streams.processor.Processor in favor of a
> > new one, org.apache.kafka.streams.processor.api.Processor > KOut, VOut> that parameterizes both the input and output types.
> >
> > This change enables both the Streams DSL internal code and external
> > Processor API code to improve their type safety and protect themselves
> > from type-level bugs.
> >
> > Thanks,
> > -John
> >
>
>
> --
> -- Guozhang
>


Re: [DISCUSS] KIP-478 Strongly Typed Processor API

2019-07-24 Thread John Roesler
Hey Guozhang,

Thanks for the thought! It sounds related to what I was thinking in
https://issues.apache.org/jira/browse/KAFKA-8396 , but a little "extra"...

I proposed to eliminate ValueTransformer, but I believe you're right; we
could eliminate Transformer also and just use Processor in the transform()
methods.

To your first bullet, regarding transform/flatTransform... I'd argue that
the difference isn't material and if we switch to just using
context.forward instead of returns, then we just need one and people can
call forward as much as they want. It certainly warrants further
discussion, though...

To the second point, yes, I'm thinking that we can eschew the
ValueTransformer and instead do something like ignore the forwarded key or
check the key for serial identity, etc.

The ultimate advantage of these ideas is that we reduce the number of
interface variants and we also give people just one way to pass values
forward instead of two.

Of course, it's beyond the scope of this KIP, but this KIP is a
precondition for these further improvements.

I'm copying your comment onto the ticket for posterity.

Thanks!
-John

On Tue, Jul 23, 2019 at 5:38 PM Guozhang Wang  wrote:

> Hi John,
>
> Just a wild thought about Transformer: now with the new Processor KOut, VIn, VOut>#init(ProcessorContext), do we still need a
> Transformer (and even ValueTransformer / ValueTransformerWithKey)?
>
> What if:
>
> * We just make KStream#transform to get a ProcessorSupplier as well, and
> inside `process()` we check that at most one `context.forward()` is called,
> and then take it as the return value.
> * We would still use ValueTransformer for KStream#transformValue, or we can
> also use a `ProcessorSupplier where we allow at most one
> `context.forward()` AND we ignore whatever passed in as key but just use
> the original key.
>
>
> Guozhang
>
>
> On Tue, Jul 16, 2019 at 9:03 AM John Roesler  wrote:
>
> > Hi again, all,
> >
> > I have started the voting thread. Please cast your votes (or voice
> > your objections)! The vote will remain open at least 72 hours. Once it
> > closes, I can send the PR pretty quickly.
> >
> > Thanks for all you help ironing out the details on this feature.
> > -John
> >
> > On Mon, Jul 15, 2019 at 5:09 PM John Roesler  wrote:
> > >
> > > Hey all,
> > >
> > > It sounds like there's general agreement now on this KIP, so I updated
> > > the KIP to fit in with Guozhang's overall proposed package structure.
> > > Specifically, the proposed name for the new Processor interface is
> > > "org.apache.kafka.streams.processor.api.Processor".
> > >
> > > If there are no objections, then I plan to start the vote tomorrow!
> > >
> > > Thanks, all, for your contributions.
> > > -John
> > >
> > > On Thu, Jul 11, 2019 at 1:50 PM Matthias J. Sax  >
> > wrote:
> > > >
> > > > Side remark:
> > > >
> > > > > Now that "flat transform" is a specific
> > > > >> part of the API it seems okay to steer folks in that direction (to
> > never
> > > > >> use context.process in a transformer), but it should be called out
> > > > >> explicitly in javadocs.  Currently Transformer (which is used for
> > both
> > > > >> transform() and flatTransform() ) doesn't really call out the
> > ambiguity:
> > > >
> > > > Would you want to do a PR for address this? We are always eager to
> > > > improve the JavaDocs!
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 7/7/19 11:26 AM, Paul Whalen wrote:
> > > > > First of all, +1 on the whole idea, my team has run into
> (admittedly
> > minor,
> > > > > but definitely annoying) issues because of the weaker typing.
> We're
> > heavy
> > > > > users of the PAPI and have Processors that, while not hundreds of
> > lines
> > > > > long, are certainly quite hefty and call context.forward() in many
> > places.
> > > > >
> > > > > After reading the KIP and discussion a few times, I've convinced
> > myself
> > > > > that any initial concerns I had aren't really concerns at all
> (state
> > store
> > > > > types, for one).  One thing I will mention:  changing *Transformer*
> > to have
> > > > > ProcessorContext gave me pause, because I have code
> that
> > does
> > > > > context.forward in transformers.  Now that "flat transform" is a
> > specific
> > > > > part of the API it seems okay to steer folks in that direction (to
> > never
> > > > > use context.process in a transformer), but it should be called out
> > > > > explicitly in javadocs.  Currently Transformer (which is used for
> > both
> > > > > transform() and flatTransform() ) doesn't really call out the
> > ambiguity:
> > > > >
> >
> https://github.com/apache/kafka/blob/ca641b3e2e48c14ff308181c775775408f5f35f7/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java#L75-L77
> > ,
> > > > > and for migrating users (from before flatTransform) it could be
> > confusing.
> > > > >
> > > > > Side note, I'd like to plug KIP-401 (there is a discussion thread
> > and a
> > > > > voting thread) which also relates to using the PAPI.  

[jira] [Created] (KAFKA-8709) hard fail on "Unknown group metadata version"

2019-07-24 Thread Christian Becker (JIRA)
Christian Becker created KAFKA-8709:
---

 Summary: hard fail on "Unknown group metadata version"
 Key: KAFKA-8709
 URL: https://issues.apache.org/jira/browse/KAFKA-8709
 Project: Kafka
  Issue Type: Improvement
Reporter: Christian Becker


We attempted to do an update from 2.2 to 2.3 and then a rollback was done after 
{{inter.broker.protocol}} was changed. (We know this shouldn't be done, but it 
happened).

After downgrading to 2.2 again, some {{__consumer-offsets}} partitions fail to 
load with the message {{Unknown group metadata version 3}}. Subsequently the 
broker continues it's startup and the consumer groups won't exist. So the 
consumers are starting at their configured OLDEST or NEWEST position and start 
committing their offsets.

However on subsequent restarts of the brokers, the {{Unknown group metadata 
version}} exception remains and so the restarts are happening over and over 
again.

 

In order to prevent this, I'd suggest a updated flow when loading the offsets:
- the loading should continue reading the __consumer-offsets partition to see 
if a subsequent offset is commited that is readable
- if no "valid" offset could be found, throw the existing exception to let the 
operator know about the situation
- if a valid offset can be found, continue as normal

 

This would cause the following sequence of events:
1. corrupted offsets are written
2. broker restart
2a. broker loads offset partition
2b. {{KafkaException}} when loading the offset partition
2c. no "valid" offset is found after the "corrupt" record
2d. offsets reset
3. consumergroups are recreated and "valid" offsets are appended
4. broker restart
4a. broker loads offset partition
4b.  {{KafkaException}} when loading the offset partition
4c. "valid" offset is found after the "corrupted" ones
5. consumergroups still have their latest offset

It's a special case now, that this happened after some human error, but this 
also poses a danger for situations where the offsets might be corrupted for 
some unrelated reason. losing the offsets is a very serious situation and there 
should be safeguards against it, especially when there might be offsets 
recoverable. With this improvement, the offsets would be still lost once, but 
the broker is able to recover automatically over time and after compaction the 
corrupted records will be gone. (In our case this caused serious confusion as 
we've lost the offsets multiple times, as the error message {{Error loading 
offsets from}} implies, that the corrupted data is deleted and therefore the 
situation is recovered, whereas in reality this continues to be a issue until 
the corrupt data is gone once and for all which might take a long time.

In our case we seem to have evicted the broken records with temporarily setting 
the segment time to a very low value and deactivation of compaction
{code:java}
/opt/kafka/bin/kafka-topics.sh --alter --config segment.ms=90 --topic 
__consumer_offsets --zookeeper localhost:2181
/opt/kafka/bin/kafka-topics.sh --alter --config cleanup.policy=delete --topic 
__consumer_offsets --zookeeper localhost:2181
< wait for the cleaner to clean up >
/opt/kafka/bin/kafka-topics.sh --alter --config segment.ms=60480 --topic 
__consumer_offsets --zookeeper localhost:2181
/opt/kafka/bin/kafka-topics.sh --alter --config cleanup.policy=compact --topic 
__consumer_offsets --zookeeper localhost:2181{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: KIP-382 + Kafka Streams Question

2019-07-24 Thread Adam Bellemare
Hi Ryanne

> Lemme know if I haven't answered this clearly.

Nope, this was very helpful. Thank you!

> A single "stream" can come from multiple input topics
I overlooked that - I was thinking of simply using the
StreamBuilder.table() functionality instead, but that function doesn't
support a Collection of topics.

Since the topics would be copartitioned by definition, wouldn't the event
dispatcher in PartitionGroup (priorityQueue and streamtime ordering) ensure
that the topics are processed in incrementing streamtime order?

Alternately, I suppose this could be a case where it is a good idea to have
the timestamp of the event within the event's value payload, such that:
StreamBuilder.streams(Set("userEntity", "primary.userEntity"))
.groupByKey()
.reduce()
can allow us to materialize the latest state for a given key.

Thanks Ryanne, this has been a very helpful discussion for me. We are
prototyping the usage of MM2 internally at the moment in anticipation of
its release in 2.4 and want to ensure we have our replication + recovery
strategies sorted out.

Adam

On Tue, Jul 23, 2019 at 7:26 PM Ryanne Dolan  wrote:

> Adam, I think we are converging :)
>
> > "userEntity"...where I only want the latest emailAddress (basic
> materialization) to send an email on account password update.
>
> Yes, you want all "userEntity" data on both clusters. Each cluster will
> have "userEntity" and the remote counterpart
> "secondary/primary.userEntity", as in my example (1). The send-email part
> can run on either cluster (but not both, to avoid duplicate emails),
> subscribing to both "userEntity" and "secondary/primary.userEntity". For
> DR, you can migrate this app between clusters via offset translation and
> the kafka-streams-application-reset tool.
>
> Then, you want a materialize-email-table app running in _both_ clusters,
> so that the latest emails are readily available in RocksDB from either
> cluster. This also subscribes to both "userEntity" and
> "secondary/primary.userEntity" s.t. records originating from either cluster
> are processed.
>
> (Equivalently, send-email and materialize-email-table could be parts of
> the same Streams app, just configured differently, e.g. with send-email
> short-circuited in all but one cluster.)
>
> Under normal operation, your userEntity events are sent to the primary
> cluster (topic: userEntity), processed there via materialize-email-table
> and send-email, and replicated to the secondary cluster (topic:
> primary.userEntity) via MM2. When primary goes down, your producers
> (whatever is sending userEntity events) can failover to the secondary
> cluster (topic: userEntity). This can happen in real-time, i.e. as soon as
> the producer detects an outage or via a load balancer with healthchecks
> etc. So under normal operation, you have all userEntity events in both
> clusters, and both clusters are available for producing to.
>
> N.B. this is not dual-ingest, which would require you always produce
> directly to both clusters. It's active/active, b/c you can produce to
> either cluster at any point in time, and the effect is the same.
>
> > Q1) Where does the producer write its data to if the primary cluster is
> dead?
>
> With active/active like this, you can send to either cluster.
>
> > Q2) How does a Kafka Streams application materialize state from two
> topics?
>
> A Streams app can subscribe to multiple topics. A single "stream" can come
> from multiple input topics (see:
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream-java.util.Collection-
> )
>
> Likewise, a KTable can be materialized from multiple source topics -- in
> this case, userEntity, primary.userEntity and/or secondary.userEntity. You
> can think of these as parts of a "virtual topic", as you described.
>
> > (loaded question, I know)
>
> There is one caveat I can think of: there is no ordering guarantee across
> different topics in the same stream, so materialization could be
> inconsistent between the two clusters if, say, the same users's email was
> changed to different values at the same millisecond in both clusters. This
> may or may not be a problem.
>
> > Q3) ... recommendations on how to handle replication/producing of
> entity-data (ie: userEntity) across multiple clusters...
>
> Lemme know if I haven't answered this clearly.
>
> Ryanne
>
> On Tue, Jul 23, 2019 at 1:03 PM Adam Bellemare 
> wrote:
>
>> Hi Ryanne
>>
>> Thanks for the clarifications! Here is one of my own, as I think it's the
>> biggest stumbling block in my description:
>>
>> *> What is "table" exactly? I am interpreting this as a KTable changelog
>> topic*
>> "table" is not a KTable changelog topic, but simply entity data that is
>> to be materialized into a table - for example, relational data captured
>> from Kafka Connect. I should have named this "stateful-data" or something
>> less ambiguous and provided an explicit definition. Note that non-KStreams
>> applications will also regularly use 

Re: Kafka consumer is not reading some partition

2019-07-24 Thread Omar Al-Safi
Hi,

Maybe it would be helpful if you can attach the logs for your consumers
when you notice they stopped consuming from some of your partitions

On Tue, 23 Jul 2019 at 16:56, Sergey Fedorov  wrote:

> Hello. I was using Kafka 2.1.1 and facing a problem where our consumers
> sometimes intermittently stop consuming from one or two of the partitions. My
> config
>


Build failed in Jenkins: kafka-trunk-jdk8 #3809

2019-07-24 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-8696: clean up Sum/Count/Total metrics (#7057)

--
[...truncated 6.45 MB...]

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaNullFieldToTimestamp STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaNullFieldToTimestamp PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaFieldConversion STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaFieldConversion PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaNullFieldToString STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaNullFieldToString PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessNullValueToTimestamp STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessNullValueToTimestamp PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessNullValueToString STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessNullValueToString PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaDateToTimestamp STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaDateToTimestamp PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaIdentity STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaIdentity PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessTimestampToDate STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessTimestampToDate PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessTimestampToTime STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessTimestampToTime PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessTimestampToUnix STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessTimestampToUnix PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testConfigMissingFormat STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testConfigMissingFormat PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessNullValueToDate STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessNullValueToDate PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessNullValueToTime STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessNullValueToTime PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessNullValueToUnix STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessNullValueToUnix PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaNullFieldToDate STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaNullFieldToDate PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaNullFieldToTime STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaNullFieldToTime PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaNullFieldToUnix STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaNullFieldToUnix PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testConfigNoTargetType STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testConfigNoTargetType PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaStringToTimestamp STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaStringToTimestamp PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaTimeToTimestamp STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaTimeToTimestamp PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessUnixToTimestamp STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testSchemalessUnixToTimestamp PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaTimestampToString STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testWithSchemaTimestampToString PASSED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testConfigInvalidFormat STARTED

org.apache.kafka.connect.transforms.TimestampConverterTest > 
testConfigInvalidFormat PASSED

org.apache.kafka.connect.transforms.HoistFieldTest > withSchema STARTED

org.apache.kafka.connect.transforms.HoistFieldTest > withSchema PASSED


[jira] [Created] (KAFKA-8708) Zookeeper Session expired either before or while waiting for connection

2019-07-24 Thread Chethan Bheemaiah (JIRA)
Chethan Bheemaiah created KAFKA-8708:


 Summary: Zookeeper Session expired either before or while waiting 
for connection
 Key: KAFKA-8708
 URL: https://issues.apache.org/jira/browse/KAFKA-8708
 Project: Kafka
  Issue Type: Bug
  Components: zkclient
Affects Versions: 2.0.1
Reporter: Chethan Bheemaiah


Recently we had encountered an issue in one of our kafka cluster. One of the 
node went down and was not joining the kafka cluster on restart. We had 
observed Session expired error messages in server.log

Below is one message
ERROR kafka.common.ZkNodeChangeNotificationListener: Error while processing 
notification change for path = /config/changes
kafka.zookeeper.ZooKeeperClientExpiredException: Session expired either before 
or while waiting for connection
at 
kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:238)
at 
kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:226)
at 
kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:226)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at 
kafka.zookeeper.ZooKeeperClient.kafka$zookeeper$ZooKeeperClient$$waitUntilConnected(ZooKeeperClient.scala:226)
at 
kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:220)
at 
kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:220)
at 
kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:220)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at 
kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:219)
at 
kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1510)
at 
kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1486)
at kafka.zk.KafkaZkClient.getChildren(KafkaZkClient.scala:585)
at 
kafka.common.ZkNodeChangeNotificationListener.kafka$common$ZkNodeChangeNotificationListener$$processNotifications(ZkNodeChangeNotificationListener.scala:82)
at 
kafka.common.ZkNodeChangeNotificationListener$ChangeNotification.process(ZkNodeChangeNotificationListener.scala:119)
at 
kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:145)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8707) Zookeeper Session expired either before or while waiting for connection

2019-07-24 Thread Chethan Bheemaiah (JIRA)
Chethan Bheemaiah created KAFKA-8707:


 Summary: Zookeeper Session expired either before or while waiting 
for connection
 Key: KAFKA-8707
 URL: https://issues.apache.org/jira/browse/KAFKA-8707
 Project: Kafka
  Issue Type: Bug
  Components: zkclient
Affects Versions: 2.0.1
Reporter: Chethan Bheemaiah


Recently we had encountered an issue in one of our kafka cluster. One of the 
node went down and was not joining the kafka cluster on restart. We had 
observed Session expired error messages in server.log

Below is one message
ERROR kafka.common.ZkNodeChangeNotificationListener: Error while processing 
notification change for path = /config/changes
kafka.zookeeper.ZooKeeperClientExpiredException: Session expired either before 
or while waiting for connection
at 
kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:238)
at 
kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:226)
at 
kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:226)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at 
kafka.zookeeper.ZooKeeperClient.kafka$zookeeper$ZooKeeperClient$$waitUntilConnected(ZooKeeperClient.scala:226)
at 
kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:220)
at 
kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:220)
at 
kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:220)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at 
kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:219)
at 
kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1510)
at 
kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1486)
at kafka.zk.KafkaZkClient.getChildren(KafkaZkClient.scala:585)
at 
kafka.common.ZkNodeChangeNotificationListener.kafka$common$ZkNodeChangeNotificationListener$$processNotifications(ZkNodeChangeNotificationListener.scala:82)
at 
kafka.common.ZkNodeChangeNotificationListener$ChangeNotification.process(ZkNodeChangeNotificationListener.scala:119)
at 
kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:145)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)