Re: Using Custom Partitioner in Streams

2017-12-18 Thread Sameer Kumar
I understand it now, even if we are able to attach custom partitioning. The
data shall still travel from stream nodes to broker on join topic, so
travel to network will still be there.

-Sameer.

On Tue, Dec 19, 2017 at 1:17 AM, Matthias J. Sax 
wrote:

> > need to map the keys, modify them
> >> and then do a join.
>
> This will always trigger a rebalance. There is no API atm to tell KS
> that partitioning is preserved.
>
> Custom partitioner won't help for your case as far as I understand it.
>
>
> -Matthias
>
> On 12/17/17 9:48 PM, Sameer Kumar wrote:
> > Actually, I am doing joining after map. I need to map the keys, modify
> them
> > and then do a join.
> >
> > I was thinking of using always passing a partition key based on which
> > partition happens.
> > Step by step flow is:-
> > 1. Data is already partitoned by do userid.
> > 2. I do a map to joins impressions tied to a user with view
> notifications.
> > 3. I count valid impressions across different aggregations(i.e. across
> diff
> > dimension groups).
> >
> > Thanks,
> > -Sameer.
> >
> > On Mon, Dec 18, 2017 at 1:37 AM, Matthias J. Sax 
> > wrote:
> >
> >> Two comments:
> >>
> >> 1) As long, as you don't do an aggregation/join after a map(), there
> >> will be not repartitioning. Streams does repartitioning "lazy", ie, only
> >> if it's required. As long as you only chain filter/map etc, no
> >> repartitioning will be done.
> >>
> >> 2) Can't you use mapValue() instead of map()? If you use map() to only
> >> read the key but only modify the value (-> "data is still local") a
> >> custom partitioner won't help. Also, we are improving this in upcoming
> >> version 1.1 and allows read access to a key in mapValue() (cf. KIP-149
> >> for details).
> >>
> >> Hope this helps.
> >>
> >>
> >> -Matthias
> >>
> >> On 12/17/17 8:20 AM, Sameer Kumar wrote:
> >>> I have multiple map and filter phases in my application dag and though
> I
> >> am
> >>> generating different keys at different points, the data is still local.
> >>> Re-partitioning for me here is adding unnecessary network shuffling, I
> >> want
> >>> to minimize it.
> >>>
> >>> -Sameer.
> >>>
> >>> On Friday, December 15, 2017, Matthias J. Sax 
> >> wrote:
> >>>
>  It's not recommended to write a custom partitioner because it's pretty
>  difficult to write a correct one. There are many dependencies and you
>  need deep knowledge of Kafka Streams internals to get it write.
>  Otherwise, your custom partitioner breaks Kafka Streams.
> 
>  That is the reason why it's not documented...
> 
>  Not sure so, what you try to achieve in the first place. What do you
>  mean by
> 
> > I want to make sure that during map phase, the keys
> >> produced adhere to the customized partitioner.
> 
>  Maybe you achieve what you want differently.
> 
> 
>  -Matthias
> 
>  On 12/15/17 1:19 AM, Sameer Kumar wrote:
> > Hi,
> >
> > I want to use the custom partitioner in streams, I couldnt find the
> >> same
>  in
> > the documentation. I want to make sure that during map phase, the
> keys
> > produced adhere to the customized partitioner.
> >
> > -Sameer.
> >
> 
> 
> >>>
> >>
> >>
> >
>
>


Re: Producer is blocking at second commitTransaction

2017-12-18 Thread Ted Yu
For the server log, is it possible to enable DEBUG logging ?

Thanks

On Mon, Dec 18, 2017 at 4:35 PM, HKT  wrote:

> Thanks for reply.
>
> here is the client side log:
>
> 2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager -
> [Producer clientId=producer-1, transactionalId=hello] Transition from state
> READY to IN_TRANSACTION
> 2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager -
> [Producer clientId=producer-1, transactionalId=hello] Begin adding new
> partition test-0 to transaction
> 2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager -
> [Producer clientId=producer-1, transactionalId=hello] Transition from state
> IN_TRANSACTION to COMMITTING_TRANSACTION
> 2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager -
> [Producer clientId=producer-1, transactionalId=hello] Enqueuing
> transactional request (type=AddPartitionsToTxnRequest,
> transactionalId=hello, producerId=0, producerEpoch=0, partitions=[test-0])
> 2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager -
> [Producer clientId=producer-1, transactionalId=hello] Enqueuing
> transactional request (type=EndTxnRequest, transactionalId=hello,
> producerId=0, producerEpoch=0, result=COMMIT)
> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
> o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1,
> transactionalId=hello] Sending transactional request
> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
> producerEpoch=0, partitions=[test-0]) to node HKT-PC:9092 (id: 0 rack: null)
> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
> o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1,
> transactionalId=hello] Enqueuing transactional request
> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
> producerEpoch=0, partitions=[test-0])
> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
> o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1,
> transactionalId=hello] Sending transactional request
> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
> producerEpoch=0, partitions=[test-0]) to node HKT-PC:9092 (id: 0 rack: null)
> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
> o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1,
> transactionalId=hello] Enqueuing transactional request
> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
> producerEpoch=0, partitions=[test-0])
> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
> o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1,
> transactionalId=hello] Sending transactional request
> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
> producerEpoch=0, partitions=[test-0]) to node HKT-PC:9092 (id: 0 rack: null)
> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
> o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1,
> transactionalId=hello] Enqueuing transactional request
> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
> producerEpoch=0, partitions=[test-0])
> ... // duplicate messages
> 2017-12-19 08:26:14 [kafka-producer-network-thread | producer-1] DEBUG
> o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1,
> transactionalId=hello] Enqueuing transactional request
> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
> producerEpoch=0, partitions=[test-0])
> 2017-12-19 08:26:14 [kafka-producer-network-thread | producer-1] DEBUG
> o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1,
> transactionalId=hello] Sending transactional request
> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
> producerEpoch=0, partitions=[test-0]) to node HKT-PC:9092 (id: 0 rack: null)
>
> and the server.log:
> [2017-12-19 08:26:08,408] INFO Completed load of log __transaction_state-9
> with 1 log segments, log start offset 0 and log end offset 0 in 15 ms
> (kafka.log.Log)
> [2017-12-19 08:26:08,408] INFO Created log for partition
> [__transaction_state,9] in D:\tmp\kafka-logs with properties
> {compression.type -> uncompressed, message.format.version -> 1.0-IV0,
> file.delete.delay.ms -> 6, max.message.bytes -> 112,
> min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime,
> min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false,
> min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096,
> unclean.leader.election.enable -> false, retention.bytes -> -1,
> delete.retention.ms -> 8640, cleanup.policy -> compact, flush.ms ->
> 9223372036854775807, segment.ms -> 60480, segment.bytes -> 104857600,
> retention.ms -> 60480, message.timestamp.difference.max.ms ->
> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages ->
> 9223372036854775807}. (kafka.log.LogManager)
> [2017-12-19 08:26:08,408] INFO [Partition __transaction_sta

Re: deduplication strategy for Kafka Streams DSL

2017-12-18 Thread Matthias J. Sax
The Jira priority just increase by your report!

Of course, we are always happy about pull request :D


-Matthias

On 12/18/17 1:27 PM, Artur Mrozowski wrote:
> Yes, sounds like it. We run into problems at exactly same spot using BEAM
> as well, although in that case it resulted in data loss.
> 
> Thank you Matthias. Doesn't sound like it's going to be resolved any time
> soon, does it?
> 
> /Artur
> 
> On Mon, Dec 18, 2017 at 8:11 PM, Matthias J. Sax 
> wrote:
> 
>> I think you are hitting: https://issues.apache.org/jira/browse/KAFKA-4609
>>
>>
>> -Matthias
>>
>> On 12/18/17 1:52 AM, Artur Mrozowski wrote:
>>> Hi Bill,
>>>
>>>  I am actually referring to duplicates as completely identical records. I
>>> can observe it when I convert result of left join between KTables to
>>> stream. The resulting stream will often contain identical messages.
>>> For example we have
>>>
>>> KTable left
>>> {"claimcounter": 0, "claimreporttime": 55948.33110985625, "claimnumber":
>>> "3_0", "claimtime": 708.521153490306}
>>>
>>> and KTable  right
>>>
>>> {"claimcounter": 0, "paytime": 55960.226718985265, "claimnumber": "3_0",
>>> "payment": 847015.1437781961}
>>>
>>> When I leftjoin theses two objects the result in the state store will be
>> an
>>> object containing two  ArrayLists left and right, like this
>>>
>>> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime"
>> :"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"
>> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
>> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
>>>
>>> But I want to continue processing the results by using groupBy and
>>> aggregate so I convert reuslt of the leftjoin to stream. Now the
>> resulting
>>> reparation and changelog topics,most of the time, will contain two
>>> identical messages, like this
>>>
>>> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime"
>> :"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"
>> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
>> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
>>> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime"
>> :"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"
>> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
>> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
>>>
>>>
>>> and hence passing duplicates to the next operation.
>>>
>>> My question is what is the best practice to avoid that?
>>>
>>> https://github.com/afuyo/KStreamsDemo/blob/master/src/
>> main/java/kstream.demo/CustomerStreamPipelineHDI.java#L423
>>>
>>> Best regards
>>> Artur
>>>
>>>
>>> On Wed, Dec 13, 2017 at 3:42 PM, Bill Bejeck  wrote:
>>>
 Hi Artur,

 The most direct way for deduplication (I'm using the term deduplication
>> to
 mean records with the same key, but not necessarily the same value,
>> where
 later records are considered) is to set the  CACHE_MAX_BYTES_BUFFERING_
 CONFIG
 setting to a value greater than zero.

 Your other option is to use the PAPI and by writing your own logic in
 conjunction with a state store determine what constitutes a duplicate
>> and
 when to emit a record.  You could take the same approach in the DSL
>> layer
 using a Transformer.

 HTH.

 Bill

 On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski 
>> wrote:

> Hi
> I run an app where I transform KTable to stream and then I groupBy and
> aggregate and capture the results in KTable again. That generates many
> duplicates.
>
> I have played with exactly once semantics that seems to reduce
>> duplicates
> for records that should be unique. But I still get duplicates on keys
 that
> have two or more records.
>
> I could not reproduce it on small number of records so I disable
>> caching
 by
> setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I got
 loads
> of duplicates, even these previously eliminated by exactly once
 semantics.
> Now I have hard time to enable it again on Confluent 3.3.
>
> But, generally what it the best deduplication strategy for Kafka
>> Streams?
>
> Artur
>

>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Mistakes in documentation?

2017-12-18 Thread Matthias J. Sax
Thanks a lot!

On 12/18/17 12:46 PM, Dmitry Minkovsky wrote:
> You're welcome. Another one I found today
> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html
> 
>> groupedStream.windowedBy(TimeUnit.MINUTES.toMillis(5))
> 
> should be
> 
>> groupedStream.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5)))
> 
> in two spots.
> 
> On Mon, Dec 18, 2017 at 2:29 PM, Matthias J. Sax 
> wrote:
> 
>> Thanks for reporting this!
>>
>> We will fix it.
>>
>> -Matthias
>>
>> On 12/17/17 7:05 PM, Philippe Derome wrote:
>>> I agree with Dmitry's first comment, it really looks like the paragraph
>> he
>>> points to under "Table" was pasted without edit from the one previously
>>> that pertained to "KStream".
>>>
>>> On Sun, Dec 17, 2017 at 5:31 PM, Dmitry Minkovsky 
>>> wrote:
>>>
 On https://docs.confluent.io/current/streams/developer-
>> guide/dsl-api.html
 for version 4.0.0:

 Under "Table", currently:

> In the case of a KStream, the local KStream instance of every
>> application
 instance will be populated with data from only a subset of the
>> partitions
 of the input topic. Collectively, across all application instances, all
 input topic partitions are read and processed.

 Shouldn't it be KTable instead of KStream?

 Also, under "GlobalKTable", current:

> In the case of a GlobalKTable, the local GlobalKTable instance of every
 application instance will be populated with data from only a subset of
>> the
 partitions of the input topic. Collectively, across all application
 instances, all input topic partitions are read and processed.

 Shouldn't it say that all partitions are consumed by all instances of
>> the
 application?


 Best,
 Dmitry

>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: kafka 1.0.1?

2017-12-18 Thread Ismael Juma
Hi Maciek,

I expect that 1.0.1 will be released some time in January.

Ismael

On Mon, Dec 18, 2017 at 10:42 AM, Maciek Próchniak  wrote:

> Hello,
>
> are there plans to release version 1.0.1?
>
> We are affected by https://issues.apache.org/jira/browse/KAFKA-6185 and
> cannot upgrade at the moment to 1.0.0 (on our UAT env we applied the patch
> and problems are gone, but we are reluctant to do this on prod...),
>
> we'd like to know if there are plans to do bugfix release in foreseeable
> future,
>
>
> thanks,
>
> maciek
>
>


Re: deduplication strategy for Kafka Streams DSL

2017-12-18 Thread Artur Mrozowski
Yes, sounds like it. We run into problems at exactly same spot using BEAM
as well, although in that case it resulted in data loss.

Thank you Matthias. Doesn't sound like it's going to be resolved any time
soon, does it?

/Artur

On Mon, Dec 18, 2017 at 8:11 PM, Matthias J. Sax 
wrote:

> I think you are hitting: https://issues.apache.org/jira/browse/KAFKA-4609
>
>
> -Matthias
>
> On 12/18/17 1:52 AM, Artur Mrozowski wrote:
> > Hi Bill,
> >
> >  I am actually referring to duplicates as completely identical records. I
> > can observe it when I convert result of left join between KTables to
> > stream. The resulting stream will often contain identical messages.
> > For example we have
> >
> > KTable left
> > {"claimcounter": 0, "claimreporttime": 55948.33110985625, "claimnumber":
> > "3_0", "claimtime": 708.521153490306}
> >
> > and KTable  right
> >
> > {"claimcounter": 0, "paytime": 55960.226718985265, "claimnumber": "3_0",
> > "payment": 847015.1437781961}
> >
> > When I leftjoin theses two objects the result in the state store will be
> an
> > object containing two  ArrayLists left and right, like this
> >
> > {"claimList":{"lst":[{"claimnumber":"3_0","claimtime"
> :"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"
> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> >
> > But I want to continue processing the results by using groupBy and
> > aggregate so I convert reuslt of the leftjoin to stream. Now the
> resulting
> > reparation and changelog topics,most of the time, will contain two
> > identical messages, like this
> >
> > {"claimList":{"lst":[{"claimnumber":"3_0","claimtime"
> :"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"
> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> > {"claimList":{"lst":[{"claimnumber":"3_0","claimtime"
> :"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"
> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> >
> >
> > and hence passing duplicates to the next operation.
> >
> > My question is what is the best practice to avoid that?
> >
> > https://github.com/afuyo/KStreamsDemo/blob/master/src/
> main/java/kstream.demo/CustomerStreamPipelineHDI.java#L423
> >
> > Best regards
> > Artur
> >
> >
> > On Wed, Dec 13, 2017 at 3:42 PM, Bill Bejeck  wrote:
> >
> >> Hi Artur,
> >>
> >> The most direct way for deduplication (I'm using the term deduplication
> to
> >> mean records with the same key, but not necessarily the same value,
> where
> >> later records are considered) is to set the  CACHE_MAX_BYTES_BUFFERING_
> >> CONFIG
> >> setting to a value greater than zero.
> >>
> >> Your other option is to use the PAPI and by writing your own logic in
> >> conjunction with a state store determine what constitutes a duplicate
> and
> >> when to emit a record.  You could take the same approach in the DSL
> layer
> >> using a Transformer.
> >>
> >> HTH.
> >>
> >> Bill
> >>
> >> On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski 
> wrote:
> >>
> >>> Hi
> >>> I run an app where I transform KTable to stream and then I groupBy and
> >>> aggregate and capture the results in KTable again. That generates many
> >>> duplicates.
> >>>
> >>> I have played with exactly once semantics that seems to reduce
> duplicates
> >>> for records that should be unique. But I still get duplicates on keys
> >> that
> >>> have two or more records.
> >>>
> >>> I could not reproduce it on small number of records so I disable
> caching
> >> by
> >>> setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I got
> >> loads
> >>> of duplicates, even these previously eliminated by exactly once
> >> semantics.
> >>> Now I have hard time to enable it again on Confluent 3.3.
> >>>
> >>> But, generally what it the best deduplication strategy for Kafka
> Streams?
> >>>
> >>> Artur
> >>>
> >>
> >
>
>


Re: Mistakes in documentation?

2017-12-18 Thread Dmitry Minkovsky
You're welcome. Another one I found today
https://docs.confluent.io/current/streams/developer-guide/dsl-api.html

> groupedStream.windowedBy(TimeUnit.MINUTES.toMillis(5))

should be

> groupedStream.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5)))

in two spots.

On Mon, Dec 18, 2017 at 2:29 PM, Matthias J. Sax 
wrote:

> Thanks for reporting this!
>
> We will fix it.
>
> -Matthias
>
> On 12/17/17 7:05 PM, Philippe Derome wrote:
> > I agree with Dmitry's first comment, it really looks like the paragraph
> he
> > points to under "Table" was pasted without edit from the one previously
> > that pertained to "KStream".
> >
> > On Sun, Dec 17, 2017 at 5:31 PM, Dmitry Minkovsky 
> > wrote:
> >
> >> On https://docs.confluent.io/current/streams/developer-
> guide/dsl-api.html
> >> for version 4.0.0:
> >>
> >> Under "Table", currently:
> >>
> >>> In the case of a KStream, the local KStream instance of every
> application
> >> instance will be populated with data from only a subset of the
> partitions
> >> of the input topic. Collectively, across all application instances, all
> >> input topic partitions are read and processed.
> >>
> >> Shouldn't it be KTable instead of KStream?
> >>
> >> Also, under "GlobalKTable", current:
> >>
> >>> In the case of a GlobalKTable, the local GlobalKTable instance of every
> >> application instance will be populated with data from only a subset of
> the
> >> partitions of the input topic. Collectively, across all application
> >> instances, all input topic partitions are read and processed.
> >>
> >> Shouldn't it say that all partitions are consumed by all instances of
> the
> >> application?
> >>
> >>
> >> Best,
> >> Dmitry
> >>
> >
>
>


Re: Using Custom Partitioner in Streams

2017-12-18 Thread Matthias J. Sax
> need to map the keys, modify them
>> and then do a join.

This will always trigger a rebalance. There is no API atm to tell KS
that partitioning is preserved.

Custom partitioner won't help for your case as far as I understand it.


-Matthias

On 12/17/17 9:48 PM, Sameer Kumar wrote:
> Actually, I am doing joining after map. I need to map the keys, modify them
> and then do a join.
> 
> I was thinking of using always passing a partition key based on which
> partition happens.
> Step by step flow is:-
> 1. Data is already partitoned by do userid.
> 2. I do a map to joins impressions tied to a user with view notifications.
> 3. I count valid impressions across different aggregations(i.e. across diff
> dimension groups).
> 
> Thanks,
> -Sameer.
> 
> On Mon, Dec 18, 2017 at 1:37 AM, Matthias J. Sax 
> wrote:
> 
>> Two comments:
>>
>> 1) As long, as you don't do an aggregation/join after a map(), there
>> will be not repartitioning. Streams does repartitioning "lazy", ie, only
>> if it's required. As long as you only chain filter/map etc, no
>> repartitioning will be done.
>>
>> 2) Can't you use mapValue() instead of map()? If you use map() to only
>> read the key but only modify the value (-> "data is still local") a
>> custom partitioner won't help. Also, we are improving this in upcoming
>> version 1.1 and allows read access to a key in mapValue() (cf. KIP-149
>> for details).
>>
>> Hope this helps.
>>
>>
>> -Matthias
>>
>> On 12/17/17 8:20 AM, Sameer Kumar wrote:
>>> I have multiple map and filter phases in my application dag and though I
>> am
>>> generating different keys at different points, the data is still local.
>>> Re-partitioning for me here is adding unnecessary network shuffling, I
>> want
>>> to minimize it.
>>>
>>> -Sameer.
>>>
>>> On Friday, December 15, 2017, Matthias J. Sax 
>> wrote:
>>>
 It's not recommended to write a custom partitioner because it's pretty
 difficult to write a correct one. There are many dependencies and you
 need deep knowledge of Kafka Streams internals to get it write.
 Otherwise, your custom partitioner breaks Kafka Streams.

 That is the reason why it's not documented...

 Not sure so, what you try to achieve in the first place. What do you
 mean by

> I want to make sure that during map phase, the keys
>> produced adhere to the customized partitioner.

 Maybe you achieve what you want differently.


 -Matthias

 On 12/15/17 1:19 AM, Sameer Kumar wrote:
> Hi,
>
> I want to use the custom partitioner in streams, I couldnt find the
>> same
 in
> the documentation. I want to make sure that during map phase, the keys
> produced adhere to the customized partitioner.
>
> -Sameer.
>


>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Mistakes in documentation?

2017-12-18 Thread Matthias J. Sax
Thanks for reporting this!

We will fix it.

-Matthias

On 12/17/17 7:05 PM, Philippe Derome wrote:
> I agree with Dmitry's first comment, it really looks like the paragraph he
> points to under "Table" was pasted without edit from the one previously
> that pertained to "KStream".
> 
> On Sun, Dec 17, 2017 at 5:31 PM, Dmitry Minkovsky 
> wrote:
> 
>> On https://docs.confluent.io/current/streams/developer-guide/dsl-api.html
>> for version 4.0.0:
>>
>> Under "Table", currently:
>>
>>> In the case of a KStream, the local KStream instance of every application
>> instance will be populated with data from only a subset of the partitions
>> of the input topic. Collectively, across all application instances, all
>> input topic partitions are read and processed.
>>
>> Shouldn't it be KTable instead of KStream?
>>
>> Also, under "GlobalKTable", current:
>>
>>> In the case of a GlobalKTable, the local GlobalKTable instance of every
>> application instance will be populated with data from only a subset of the
>> partitions of the input topic. Collectively, across all application
>> instances, all input topic partitions are read and processed.
>>
>> Shouldn't it say that all partitions are consumed by all instances of the
>> application?
>>
>>
>> Best,
>> Dmitry
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: deduplication strategy for Kafka Streams DSL

2017-12-18 Thread Matthias J. Sax
I think you are hitting: https://issues.apache.org/jira/browse/KAFKA-4609


-Matthias

On 12/18/17 1:52 AM, Artur Mrozowski wrote:
> Hi Bill,
> 
>  I am actually referring to duplicates as completely identical records. I
> can observe it when I convert result of left join between KTables to
> stream. The resulting stream will often contain identical messages.
> For example we have
> 
> KTable left
> {"claimcounter": 0, "claimreporttime": 55948.33110985625, "claimnumber":
> "3_0", "claimtime": 708.521153490306}
> 
> and KTable  right
> 
> {"claimcounter": 0, "paytime": 55960.226718985265, "claimnumber": "3_0",
> "payment": 847015.1437781961}
> 
> When I leftjoin theses two objects the result in the state store will be an
> object containing two  ArrayLists left and right, like this
> 
> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> 
> But I want to continue processing the results by using groupBy and
> aggregate so I convert reuslt of the leftjoin to stream. Now the resulting
> reparation and changelog topics,most of the time, will contain two
> identical messages, like this
> 
> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> 
> 
> and hence passing duplicates to the next operation.
> 
> My question is what is the best practice to avoid that?
> 
> https://github.com/afuyo/KStreamsDemo/blob/master/src/main/java/kstream.demo/CustomerStreamPipelineHDI.java#L423
> 
> Best regards
> Artur
> 
> 
> On Wed, Dec 13, 2017 at 3:42 PM, Bill Bejeck  wrote:
> 
>> Hi Artur,
>>
>> The most direct way for deduplication (I'm using the term deduplication to
>> mean records with the same key, but not necessarily the same value, where
>> later records are considered) is to set the  CACHE_MAX_BYTES_BUFFERING_
>> CONFIG
>> setting to a value greater than zero.
>>
>> Your other option is to use the PAPI and by writing your own logic in
>> conjunction with a state store determine what constitutes a duplicate and
>> when to emit a record.  You could take the same approach in the DSL layer
>> using a Transformer.
>>
>> HTH.
>>
>> Bill
>>
>> On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski  wrote:
>>
>>> Hi
>>> I run an app where I transform KTable to stream and then I groupBy and
>>> aggregate and capture the results in KTable again. That generates many
>>> duplicates.
>>>
>>> I have played with exactly once semantics that seems to reduce duplicates
>>> for records that should be unique. But I still get duplicates on keys
>> that
>>> have two or more records.
>>>
>>> I could not reproduce it on small number of records so I disable caching
>> by
>>> setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I got
>> loads
>>> of duplicates, even these previously eliminated by exactly once
>> semantics.
>>> Now I have hard time to enable it again on Confluent 3.3.
>>>
>>> But, generally what it the best deduplication strategy for Kafka Streams?
>>>
>>> Artur
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Single producer for stream or multiple producers

2017-12-18 Thread Matthias J. Sax
It's better to use single producer.

closing() and recreating only result in unnecessary overhead and does
not give you any advantages.


-Matthias

On 12/18/17 3:12 AM, Shubham Dhoble wrote:
> I am accessing kafka through java code. The use case is, I want to publish
> stream of messages on two kafka topic, which would be in a transaction.
> 
> My point of concern is would it be better to use one single global kafka
> producer for all messages or invoking and deleting a new kafkaPoducer for
> every new message to publish?
> 
> What would be the pros and cons of new producer creation and deletion for
> every request and can a single producer handle all transactional calls.
> 



signature.asc
Description: OpenPGP digital signature


Single producer for stream or multiple producers

2017-12-18 Thread Shubham Dhoble
I am accessing kafka through java code. The use case is, I want to publish
stream of messages on two kafka topic, which would be in a transaction.

My point of concern is would it be better to use one single global kafka
producer for all messages or invoking and deleting a new kafkaPoducer for
every new message to publish?

What would be the pros and cons of new producer creation and deletion for
every request and can a single producer handle all transactional calls.

-- 



[image: http://mediaiqdigital.com/] 


*Shubham Dhoble*


*Software Engineer*



*a.*  5th & 6th Floor | SKAV 909 | 9/1, Lavelle Road | Bangalore |  560001

*m.*  +91 7728095515

*e*. shub...@mediaiqdigital.com 

*s.* shubham dhoble









Re: Producer is blocking at second commitTransaction

2017-12-18 Thread Ted Yu
Can you capture stack trace on the broker and pastebin it ?

Broker log may also provide some clue.

Thanks

On Mon, Dec 18, 2017 at 4:46 AM, HKT  wrote:

> Hello,
>
> I was testing the transactional message on kafka.
> but I get a problem.
> the producer always blocking at second commitTransaction.
> Here is my code:
>
> Properties kafkaProps = new Properties();
> kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
> kafkaProps.setProperty("key.serializer",
> LongSerializer.class.getName());
> kafkaProps.setProperty("value.serializer",
> StringSerializer.class.getName());
> kafkaProps.setProperty("transactional.id", "hello");
> try (KafkaProducer producer = new
> KafkaProducer<>(kafkaProps)) {
> producer.initTransactions();
> producer.beginTransaction();
> ProducerRecord record = new
> ProducerRecord<>("test", 0, (long) 0, Long.toString(0));
> producer.send(record);
> producer.sendOffsetsToTransaction(new HashMap<>(), "");
> producer.commitTransaction();
> producer.beginTransaction();
> record = new ProducerRecord<>("test", 0, (long)0,
> Long.toString(0));
> producer.send(record);
> producer.commitTransaction(); // blocking here
> }
>
> Enviroment:
> Kafka broker: 1.0.0
> broker count: 1
> Kafka Client: 1.0.0
> and I use the default server.properties in config/
>
> broker.id=0
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/tmp/kafka-logs
> num.partitions=1
> num.recovery.threads.per.data.dir=1
> offsets.topic.replication.factor=1
> transaction.state.log.replication.factor=1
> transaction.state.log.min.isr=1
> log.retention.hours=168
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=30
> zookeeper.connect=localhost:2181
> zookeeper.connection.timeout.ms=6000
> group.initial.rebalance.delay.ms=0
>
> I have run the program in Windows 7 and CentOS 6.9.
> but it blocking in the second commitTransaction.
>
>
>


Running SSL and PLAINTEXT mode together (Kafka 10.2.1)

2017-12-18 Thread Darshan
Hi

I am wondering if there is a way to run the SSL and PLAINTEXT mode together
? I am running Kafka 10.2.1. We want our internal clients to use the
PLAINTEXT mode to write to certain topics, but any external clients should
use SSL to read messages on those topics. We also want to enforce ACLs.

To try this out, I modified my server.properties as follows, but without
any luck. Can someone please let me know if it needs any change ?

listeners=INTERNAL://10.10.10.64:9092,EXTERNAL://172.1.1.157:9093
advertised.listeners=INTERNAL://10.10.10.64:9092,EXTERNAL://172.1.1.157:9093
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
inter.broker.listener.name=INTERNAL

ssl.keystore.location=/opt/keystores/keystotr.jks
ssl.keystore.password=ABCDEFGH
ssl.key.password=ABCDEFGH
ssl.truststore.location=/opt/keystores/truststore.jks
ssl.truststore.password=ABCDEFGH
ssl.keystore.type=JKS
ssl.truststore.type=JKS
security.protocol=SSL
ssl.client.auth=required
# allow.everyone.if.no.acl.found=false
allow.everyone.if.no.acl.found=true
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:CN=KafkaBroker01

Thanks.

--Darshan


Producer is blocking at second commitTransaction

2017-12-18 Thread HKT

Hello,

I was testing the transactional message on kafka.
but I get a problem.
the producer always blocking at second commitTransaction.
Here is my code:

Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("key.serializer", 
LongSerializer.class.getName());
kafkaProps.setProperty("value.serializer", 
StringSerializer.class.getName());

kafkaProps.setProperty("transactional.id", "hello");
try (KafkaProducer producer = new 
KafkaProducer<>(kafkaProps)) {

producer.initTransactions();
producer.beginTransaction();
ProducerRecord record = new 
ProducerRecord<>("test", 0, (long) 0, Long.toString(0));

producer.send(record);
producer.sendOffsetsToTransaction(new HashMap<>(), "");
producer.commitTransaction();
producer.beginTransaction();
record = new ProducerRecord<>("test", 0, (long)0, 
Long.toString(0));

producer.send(record);
producer.commitTransaction(); // blocking here
}

Enviroment:
Kafka broker: 1.0.0
broker count: 1
Kafka Client: 1.0.0
and I use the default server.properties in config/

broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=30
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

I have run the program in Windows 7 and CentOS 6.9.
but it blocking in the second commitTransaction.




Re: deduplication strategy for Kafka Streams DSL

2017-12-18 Thread Artur Mrozowski
Hi Bill,

 I am actually referring to duplicates as completely identical records. I
can observe it when I convert result of left join between KTables to
stream. The resulting stream will often contain identical messages.
For example we have

KTable left
{"claimcounter": 0, "claimreporttime": 55948.33110985625, "claimnumber":
"3_0", "claimtime": 708.521153490306}

and KTable  right

{"claimcounter": 0, "paytime": 55960.226718985265, "claimnumber": "3_0",
"payment": 847015.1437781961}

When I leftjoin theses two objects the result in the state store will be an
object containing two  ArrayLists left and right, like this

{"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}

But I want to continue processing the results by using groupBy and
aggregate so I convert reuslt of the leftjoin to stream. Now the resulting
reparation and changelog topics,most of the time, will contain two
identical messages, like this

{"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
{"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}


and hence passing duplicates to the next operation.

My question is what is the best practice to avoid that?

https://github.com/afuyo/KStreamsDemo/blob/master/src/main/java/kstream.demo/CustomerStreamPipelineHDI.java#L423

Best regards
Artur


On Wed, Dec 13, 2017 at 3:42 PM, Bill Bejeck  wrote:

> Hi Artur,
>
> The most direct way for deduplication (I'm using the term deduplication to
> mean records with the same key, but not necessarily the same value, where
> later records are considered) is to set the  CACHE_MAX_BYTES_BUFFERING_
> CONFIG
> setting to a value greater than zero.
>
> Your other option is to use the PAPI and by writing your own logic in
> conjunction with a state store determine what constitutes a duplicate and
> when to emit a record.  You could take the same approach in the DSL layer
> using a Transformer.
>
> HTH.
>
> Bill
>
> On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski  wrote:
>
> > Hi
> > I run an app where I transform KTable to stream and then I groupBy and
> > aggregate and capture the results in KTable again. That generates many
> > duplicates.
> >
> > I have played with exactly once semantics that seems to reduce duplicates
> > for records that should be unique. But I still get duplicates on keys
> that
> > have two or more records.
> >
> > I could not reproduce it on small number of records so I disable caching
> by
> > setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I got
> loads
> > of duplicates, even these previously eliminated by exactly once
> semantics.
> > Now I have hard time to enable it again on Confluent 3.3.
> >
> > But, generally what it the best deduplication strategy for Kafka Streams?
> >
> > Artur
> >
>


kafka 1.0.1?

2017-12-18 Thread Maciek Próchniak

Hello,

are there plans to release version 1.0.1?

We are affected by https://issues.apache.org/jira/browse/KAFKA-6185 and 
cannot upgrade at the moment to 1.0.0 (on our UAT env we applied the 
patch and problems are gone, but we are reluctant to do this on prod...),


we'd like to know if there are plans to do bugfix release in foreseeable 
future,



thanks,

maciek