KStreams API Usage

2018-04-27 Thread pradeep s
Hi,

I am trying to call kafka stream close based on the presence of a value in
the output of ValueTransformer.ValueTransformer produces a
List

Is there a way to avoid the foreach on Kstream and try to get the
first value alone? (like streams api method findFirst)

 private void checkMerchHierarchyEmpty(KStream> trans) {
trans.filter((key, value) -> value.stream().anyMatch(val ->

MERCH_HIERARCHY_CACHE_EMPTY.equals(

  val.getErrorMessage(.foreach(
((key, value) -> {

metricsClient.writeMetric(CountMetric.generate(STREAM_SHUTDOWN_ACTION,
1));
log.fatal("Shutting down kafka stream since merch
hierarchy is empty");
kafkaStreams.close(STREAM_SHUTDOWN_WAITTIME_MS,
TimeUnit.MILLISECONDS);
})
);

}


Thanks
Pradeep


Re: source code location for KSQL

2018-04-27 Thread Guozhang Wang
I guess Ted's 47 minutes faster than me :P

On Fri, Apr 27, 2018 at 5:19 PM, Guozhang Wang  wrote:

> Hey Henry,
>
> Here you go https://github.com/confluentinc/ksql
>
> Guozhang
>
> On Fri, Apr 27, 2018 at 4:23 PM, Henry Cai 
> wrote:
>
>> I think KSQL is also open sourced, where is the source code location for
>> KSQL in github?
>>
>> Thanks.
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang


Re: source code location for KSQL

2018-04-27 Thread Guozhang Wang
Hey Henry,

Here you go https://github.com/confluentinc/ksql

Guozhang

On Fri, Apr 27, 2018 at 4:23 PM, Henry Cai 
wrote:

> I think KSQL is also open sourced, where is the source code location for
> KSQL in github?
>
> Thanks.
>



-- 
-- Guozhang


Re: source code location for KSQL

2018-04-27 Thread Ted Yu
https://github.com/confluentinc/ksql

FYI

On Fri, Apr 27, 2018 at 4:23 PM, Henry Cai 
wrote:

> I think KSQL is also open sourced, where is the source code location for
> KSQL in github?
>
> Thanks.
>


source code location for KSQL

2018-04-27 Thread Henry Cai
I think KSQL is also open sourced, where is the source code location for
KSQL in github?

Thanks.


Kafka-Connect- org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

2018-04-27 Thread Jagannath Bilgi
Hi Team,
I am new to Kafka. Would like to explore option of using Kafka during data 
processing.
Using Oracle virtual box for practicing Kafka and zookeeper. Below are the 
steps followed to deploy and test.
1) Start docker-compose
2) Get into Kafka container. Command (docker exec -it proj1_kafka-1_1 bash)3) 
Move to working directory4) Create file referred in source property file.5) 
Start Kafka -connect. Command (/usr/bin/connect-standalone 
/jb/connect-standalone.properties /jb/connect-file-source.properties 
/jb/connect-file-sink.properties)
However getting below error message
[2018-04-27 14:35:56,595] ERROR Stopping due to error 
(org.apache.kafka.connect.cli.ConnectStandalone:122)org.apache.kafka.connect.errors.ConnectException:
 Failed to connect to and describe Kafka cluster. Check worker's broker 
connection and security properties.       at 
org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
       at 
org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:45)
       at 
org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:81)Caused
 by: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment.       at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
       at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
       at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
       at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:258) 
      at 
org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:58)
       ... 2 moreCaused by: org.apache.kafka.common.errors.TimeoutException: 
Timed out waiting for a node assignment.
Please advise how to resolve the same.
NB. Attached docker compose file and other property files used.
Thanks and regards
Jagannath S Bilgi


Re: Kafka mirror maker help

2018-04-27 Thread Peter Bukowinski
I run instances of Mirror Maker as supervisord tasks (http://supervisord.org 
). I’d recommend looking into it. In addition to 
letting you sidestep the service issue, supervisord watches the processes and 
can auto-restart them if they stop for any reason.

—
Peter Bukowinski

> On Apr 27, 2018, at 11:58 AM, Hans Jespersen  wrote:
> 
> Sorry I hit send a bit too soon. I was so focused on the systemd part of
> the email and not the Mirror Maker part.
> Confluent packages include Mirror Maker but the systemd scripts are setup
> to use Confluent Replicator rather than Mirror Maker.
> My apologies.
> 
> -hans
> 
> /**
> * Hans Jespersen, Director Systems Engineering, Confluent Inc.
> * h...@confluent.io (650)924-2670
> */
> 
> On Fri, Apr 27, 2018 at 11:56 AM, Hans Jespersen  wrote:
> 
>> The latest Confluent packages now ship with systemd scripts. That is since
>> Confluent Version 4.1 - which included Apache Kafka 1.1
>> 
>> -hans
>> 
>> /**
>> * Hans Jespersen, Director Systems Engineering, Confluent Inc.
>> * h...@confluent.io (650)924-2670
>> */
>> 
>> On Fri, Apr 27, 2018 at 11:15 AM, Andrew Otto  wrote:
>> 
>>> Hiya,
>>> 
>>> Saravanan, I saw you emailed my colleague Alex about WMF’s old debian
>>> packaging.  I’ll reply here.
>>> 
>>> We now use Confluent’s Kafka debian packaging which does not (or did not?)
>>> ship with init scripts.  We don’t use Sys V init.d scripts anymore either,
>>> but use systemd instead.  Our systemd service unit (ERB template format)
>>> is
>>> here:
>>> 
>>> https://github.com/wikimedia/puppet/blob/production/modules/
>>> confluent/templates/initscripts/kafka-mirror-instance.systemd.erb
>>> 
>>> 
>>> 
>>> On Fri, Apr 27, 2018 at 1:35 AM, Amrit Jangid 
>>> wrote:
>>> 
 You should share related info, such source-destination Kafka versions,
 sample Config or error if any.
 
 FYI,  Go through
 https://kafka.apache.org/documentation/#basic_ops_mirror_maker
 
>>> 
>> 
>> 



Re: Kafka mirror maker help

2018-04-27 Thread Hans Jespersen
Sorry I hit send a bit too soon. I was so focused on the systemd part of
the email and not the Mirror Maker part.
Confluent packages include Mirror Maker but the systemd scripts are setup
to use Confluent Replicator rather than Mirror Maker.
My apologies.

-hans

/**
 * Hans Jespersen, Director Systems Engineering, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Fri, Apr 27, 2018 at 11:56 AM, Hans Jespersen  wrote:

> The latest Confluent packages now ship with systemd scripts. That is since
> Confluent Version 4.1 - which included Apache Kafka 1.1
>
> -hans
>
> /**
>  * Hans Jespersen, Director Systems Engineering, Confluent Inc.
>  * h...@confluent.io (650)924-2670
>  */
>
> On Fri, Apr 27, 2018 at 11:15 AM, Andrew Otto  wrote:
>
>> Hiya,
>>
>> Saravanan, I saw you emailed my colleague Alex about WMF’s old debian
>> packaging.  I’ll reply here.
>>
>> We now use Confluent’s Kafka debian packaging which does not (or did not?)
>> ship with init scripts.  We don’t use Sys V init.d scripts anymore either,
>> but use systemd instead.  Our systemd service unit (ERB template format)
>> is
>> here:
>>
>> https://github.com/wikimedia/puppet/blob/production/modules/
>> confluent/templates/initscripts/kafka-mirror-instance.systemd.erb
>>
>>
>>
>> On Fri, Apr 27, 2018 at 1:35 AM, Amrit Jangid 
>> wrote:
>>
>> > You should share related info, such source-destination Kafka versions,
>> > sample Config or error if any.
>> >
>> > FYI,  Go through
>> > https://kafka.apache.org/documentation/#basic_ops_mirror_maker
>> >
>>
>
>


Re: Kafka mirror maker help

2018-04-27 Thread Hans Jespersen
The latest Confluent packages now ship with systemd scripts. That is since
Confluent Version 4.1 - which included Apache Kafka 1.1

-hans

/**
 * Hans Jespersen, Director Systems Engineering, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Fri, Apr 27, 2018 at 11:15 AM, Andrew Otto  wrote:

> Hiya,
>
> Saravanan, I saw you emailed my colleague Alex about WMF’s old debian
> packaging.  I’ll reply here.
>
> We now use Confluent’s Kafka debian packaging which does not (or did not?)
> ship with init scripts.  We don’t use Sys V init.d scripts anymore either,
> but use systemd instead.  Our systemd service unit (ERB template format) is
> here:
>
> https://github.com/wikimedia/puppet/blob/production/
> modules/confluent/templates/initscripts/kafka-mirror-instance.systemd.erb
>
>
>
> On Fri, Apr 27, 2018 at 1:35 AM, Amrit Jangid 
> wrote:
>
> > You should share related info, such source-destination Kafka versions,
> > sample Config or error if any.
> >
> > FYI,  Go through
> > https://kafka.apache.org/documentation/#basic_ops_mirror_maker
> >
>


Re: Kafka mirror maker help

2018-04-27 Thread Andrew Otto
Hiya,

Saravanan, I saw you emailed my colleague Alex about WMF’s old debian
packaging.  I’ll reply here.

We now use Confluent’s Kafka debian packaging which does not (or did not?)
ship with init scripts.  We don’t use Sys V init.d scripts anymore either,
but use systemd instead.  Our systemd service unit (ERB template format) is
here:

https://github.com/wikimedia/puppet/blob/production/modules/confluent/templates/initscripts/kafka-mirror-instance.systemd.erb



On Fri, Apr 27, 2018 at 1:35 AM, Amrit Jangid 
wrote:

> You should share related info, such source-destination Kafka versions,
> sample Config or error if any.
>
> FYI,  Go through
> https://kafka.apache.org/documentation/#basic_ops_mirror_maker
>


Re: Kafka Streams 1.1.0 - Significant Performance Drop

2018-04-27 Thread Bill Bejeck
Hi Tony,

I'll try to address your questions below:

   1. While it's not technically "wrong" to commit for each record, you
   need to keep in mind that calling commit has resource implications for
   Kafka Streams.  Here's a link to our FAQ describing what happens during a
   commit
   
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-WhenIcommitmyprocessstate,whatdoestheStreamslibrarydoandhowitaffectsmyapplication'sperformance
   ?
   2. While you can keep track of the number of records and call commit
   manually after N records, Kafka Streams executes commit on regular
   intervals as determined by the StreamsConfig.COMMIT_INTERVAL_MS_CONFIG
   with the default value of 30 seconds.  So depending on your use case it
   might be more advantageous to figure out the optimal time interval for
   committing and set the configuration accordingly and let the framework
   handle the commit calls.
   3.  Not a specific issue with 1.1.0 as far as I know.

HTH,
Bill


On Fri, Apr 27, 2018 at 11:43 AM, Tony John 
wrote:

> Hi All,
>
> I was trying to switch to the latest version of streams (1.1.0) and
> started seeing a significant drop in performance of the application. I was
> using 0.11.0.2 before. After doing some checks I found that the choking
> point was Rocksdb flush which contributes almost 80% of the CPU time (PFA
> the screenshots). One thing which I was doing earlier with the application
> was that I was doing a context.commit() from the Processor's process method
> for each record which gets processed. After doing some throttling on this
> and restricting the commit to every 100K records, I could see that the
> performance was on par with the previous version. So below are my queries
>
>
>1. Is it wrong to do the store commits for each record which gets
>processed?
>2. Are there any other configurations which I need to make in order to
>get rid of this other than throttling the commits
>3. Or is it actually an issue with the 1.1.0, which I don't think will
>be the case as I haven't seen anyone else reporting this so far.
>
> Please suggest.
>
>
>
>
> Thanks,
> Tony
>


Kafka Streams 1.1.0 - Significant Performance Drop

2018-04-27 Thread Tony John
Hi All,

I was trying to switch to the latest version of streams (1.1.0) and started
seeing a significant drop in performance of the application. I was using
0.11.0.2 before. After doing some checks I found that the choking point was
Rocksdb flush which contributes almost 80% of the CPU time (PFA the
screenshots). One thing which I was doing earlier with the application was
that I was doing a context.commit() from the Processor's process method for
each record which gets processed. After doing some throttling on this and
restricting the commit to every 100K records, I could see that the
performance was on par with the previous version. So below are my queries


   1. Is it wrong to do the store commits for each record which gets
   processed?
   2. Are there any other configurations which I need to make in order to
   get rid of this other than throttling the commits
   3. Or is it actually an issue with the 1.1.0, which I don't think will
   be the case as I haven't seen anyone else reporting this so far.

Please suggest.




Thanks,
Tony


Re: Re: kafka streams with TimeWindows ,incorrect result

2018-04-27 Thread 杰 杨
the value is defined type which implements Serializer and Deserializer


funk...@live.com

From: Ted Yu
Date: 2018-04-27 16:39
To: users; 杰 杨
Subject: Re: Re: kafka streams with TimeWindows ,incorrect result
Noticed a typo in streams in subject.

Corrected it in this reply.

 Original message 
From: 杰 杨 
Date: 4/27/18 1:28 AM (GMT-08:00)
To: 杰 杨 , users 
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result

and I checked windowStore interface found it has put method not get method.
in one second
the stream had sample key and different value in it.
and I must update key value which store in it.



funk...@live.com

From: funk...@live.com
Date: 2018-04-27 16:08
To: users
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
Hi:
I don't kown what to do with transform function.
and stream is preapred well
like this at blew
key:
44_14_2018-04-27
value:
CountInfo(start=1,active=0,fresh =0)

there is amount data like that。
how I aggregate it with peer 1 seconds using transform function?



funk...@live.com

From: Guozhang Wang
Date: 2018-04-27 01:50
To: users
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
Using a control message to flush results to downstream (in your case to the
result db) looks good to me as well.

On Thu, Apr 26, 2018 at 10:49 AM, Guozhang Wang  wrote:

> If you're talking about which store to use in your transform function, it
> should be a windowed store.
>
> You can create such a store with the `Stores` factory, and suppose your
> old code has `windowedBy(TimeWindows.of(6))`, then you can do
>
> `
> windows = TimeWindows.of(6);
>
> Stores.WindowStoreBuilder(
> Stores.persistentWindowStore("Counts"),
> windows.maintainMs(),
>
> windows.segments,
>
> windows.size(),
> true)
>
> )
>
> `
>
>
> Guozhang
>
>
>
> On Thu, Apr 26, 2018 at 4:39 AM, 杰 杨  wrote:
>
>> I return back .
>> Which StateStore could I use for this problem?
>> and another idea .I can send 'flush' message into this topic .
>> when received this message could update results to db.
>> I don't know it's work?
>>
>> 
>> funk...@live.com
>>
>> From: Guozhang Wang
>> Date: 2018-03-12 03:58
>> To: users
>> Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
>> If you want to strictly "only have one output per window", then for now
>> you'd probably implement that logic using a lower-level "transform"
>> function in which you can schedule a punctuate function to send all the
>> results at the end of a window.
>>
>> If you just want to reduce the amount of data to your sink, but your sink
>> can still handle overwritten records of the same key, you can enlarge the
>> cache size via the cache.max.bytes.buffering config.
>>
>> https://kafka.apache.org/documentation/#streamsconfigs
>>
>> On Fri, Mar 9, 2018 at 9:45 PM, 杰 杨  wrote:
>>
>> > thx for your reply!
>> > I see that it is designed to operate on an infinite, unbounded stream of
>> > data.
>> > now I want to process for  unbounded stream but divided by time
>> interval .
>> > so what can I do for doing this ?
>> >
>> > 
>> > funk...@live.com
>> >
>> > From: Guozhang Wang
>> > Date: 2018-03-10 02:50
>> > To: users
>> > Subject: Re: kafka steams with TimeWindows ,incorrect result
>> > Hi Jie,
>> >
>> > This is by design of Kafka Streams, please read this doc for more
>> details
>> > (search for "outputs of the Wordcount application is actually a
>> continuous
>> > stream of updates"):
>> >
>> > https://kafka.apache.org/0110/documentation/streams/quickstart
>> >
>> > Note this semantics applies for both windowed and un-windowed tables.
>> >
>> >
>> > Guozhang
>> >
>> > On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨  wrote:
>> >
>> > > Hi:
>> > > I used TimeWindow for aggregate data in kafka.
>> > >
>> > > this is code snippet ;
>> > >
>> > >   view.flatMap(new MultipleKeyValueMapper(client)
>> > > ).groupByKey(Serialized.with(Serdes.String(),
>> > > Serdes.serdeFrom(new CountInfoSerializer(), new
>> > > CountInfoDeserializer(
>> > > .windowedBy(TimeWindows.of(6)).reduce(new
>> > > Reducer() {
>> > > @Override
>> > > public CountInfo apply(CountInfo value1, CountInfo
>> value2) {
>> > > return new CountInfo(value1.start + value2.start,
>> > > value1.active + value2.active, value1.fresh + value2.fresh);
>> > > }
>> > > }) .toStream(new KeyValueMapper, CountInfo,
>> > > String>() {
>> > > @Override
>> > > p

Re: Re: kafka streams with TimeWindows ,incorrect result

2018-04-27 Thread Ted Yu
Noticed a typo in streams in subject.
Corrected it in this reply.
 Original message From: 杰 杨  Date: 4/27/18  
1:28 AM  (GMT-08:00) To: 杰 杨 , users  
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result 
and I checked windowStore interface found it has put method not get method.
in one second
the stream had sample key and different value in it.
and I must update key value which store in it.



funk...@live.com

From: funk...@live.com
Date: 2018-04-27 16:08
To: users
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
Hi:
I don't kown what to do with transform function.
and stream is preapred well
like this at blew
key:
44_14_2018-04-27
value:
CountInfo(start=1,active=0,fresh =0)

there is amount data like that。
how I aggregate it with peer 1 seconds using transform function?



funk...@live.com

From: Guozhang Wang
Date: 2018-04-27 01:50
To: users
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
Using a control message to flush results to downstream (in your case to the
result db) looks good to me as well.

On Thu, Apr 26, 2018 at 10:49 AM, Guozhang Wang  wrote:

> If you're talking about which store to use in your transform function, it
> should be a windowed store.
>
> You can create such a store with the `Stores` factory, and suppose your
> old code has `windowedBy(TimeWindows.of(6))`, then you can do
>
> `
> windows = TimeWindows.of(6);
>
> Stores.WindowStoreBuilder(
> Stores.persistentWindowStore("Counts"),
> windows.maintainMs(),
>
> windows.segments,
>
> windows.size(),
> true)
>
> )
>
> `
>
>
> Guozhang
>
>
>
> On Thu, Apr 26, 2018 at 4:39 AM, 杰 杨  wrote:
>
>> I return back .
>> Which StateStore could I use for this problem?
>> and another idea .I can send 'flush' message into this topic .
>> when received this message could update results to db.
>> I don't know it's work?
>>
>> 
>> funk...@live.com
>>
>> From: Guozhang Wang
>> Date: 2018-03-12 03:58
>> To: users
>> Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
>> If you want to strictly "only have one output per window", then for now
>> you'd probably implement that logic using a lower-level "transform"
>> function in which you can schedule a punctuate function to send all the
>> results at the end of a window.
>>
>> If you just want to reduce the amount of data to your sink, but your sink
>> can still handle overwritten records of the same key, you can enlarge the
>> cache size via the cache.max.bytes.buffering config.
>>
>> https://kafka.apache.org/documentation/#streamsconfigs
>>
>> On Fri, Mar 9, 2018 at 9:45 PM, 杰 杨  wrote:
>>
>> > thx for your reply!
>> > I see that it is designed to operate on an infinite, unbounded stream of
>> > data.
>> > now I want to process for  unbounded stream but divided by time
>> interval .
>> > so what can I do for doing this ?
>> >
>> > 
>> > funk...@live.com
>> >
>> > From: Guozhang Wang
>> > Date: 2018-03-10 02:50
>> > To: users
>> > Subject: Re: kafka steams with TimeWindows ,incorrect result
>> > Hi Jie,
>> >
>> > This is by design of Kafka Streams, please read this doc for more
>> details
>> > (search for "outputs of the Wordcount application is actually a
>> continuous
>> > stream of updates"):
>> >
>> > https://kafka.apache.org/0110/documentation/streams/quickstart
>> >
>> > Note this semantics applies for both windowed and un-windowed tables.
>> >
>> >
>> > Guozhang
>> >
>> > On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨  wrote:
>> >
>> > > Hi:
>> > > I used TimeWindow for aggregate data in kafka.
>> > >
>> > > this is code snippet ;
>> > >
>> > >   view.flatMap(new MultipleKeyValueMapper(client)
>> > > ).groupByKey(Serialized.with(Serdes.String(),
>> > > Serdes.serdeFrom(new CountInfoSerializer(), new
>> > > CountInfoDeserializer(
>> > > .windowedBy(TimeWindows.of(6)).reduce(new
>> > > Reducer() {
>> > > @Override
>> > > public CountInfo apply(CountInfo value1, CountInfo
>> value2) {
>> > > return new CountInfo(value1.start + value2.start,
>> > > value1.active + value2.active, value1.fresh + value2.fresh);
>> > > }
>> > > }) .toStream(new KeyValueMapper, CountInfo,
>> > > String>() {
>> > > @Override
>> > > public String apply(Windowed key, CountInfo
>> value) {
>> > > return key.key();
>> > > }
>> > > }).print(Printed.toSysOut());
>> > >
>> > > KafkaStreams streams = new KafkaStreams(builder.build(),
>> > > KStreamReducer.getConf());
>> > > streams.start();
>> > >
>> > > an

Re: Re: kafka steams with TimeWindows ,incorrect result

2018-04-27 Thread 杰 杨
and I checked windowStore interface found it has put method not get method.
in one second
the stream had sample key and different value in it.
and I must update key value which store in it.



funk...@live.com

From: funk...@live.com
Date: 2018-04-27 16:08
To: users
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
Hi:
I don't kown what to do with transform function.
and stream is preapred well
like this at blew
key:
44_14_2018-04-27
value:
CountInfo(start=1,active=0,fresh =0)

there is amount data like that。
how I aggregate it with peer 1 seconds using transform function?



funk...@live.com

From: Guozhang Wang
Date: 2018-04-27 01:50
To: users
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
Using a control message to flush results to downstream (in your case to the
result db) looks good to me as well.

On Thu, Apr 26, 2018 at 10:49 AM, Guozhang Wang  wrote:

> If you're talking about which store to use in your transform function, it
> should be a windowed store.
>
> You can create such a store with the `Stores` factory, and suppose your
> old code has `windowedBy(TimeWindows.of(6))`, then you can do
>
> `
> windows = TimeWindows.of(6);
>
> Stores.WindowStoreBuilder(
> Stores.persistentWindowStore("Counts"),
> windows.maintainMs(),
>
> windows.segments,
>
> windows.size(),
> true)
>
> )
>
> `
>
>
> Guozhang
>
>
>
> On Thu, Apr 26, 2018 at 4:39 AM, 杰 杨  wrote:
>
>> I return back .
>> Which StateStore could I use for this problem?
>> and another idea .I can send 'flush' message into this topic .
>> when received this message could update results to db.
>> I don't know it's work?
>>
>> 
>> funk...@live.com
>>
>> From: Guozhang Wang
>> Date: 2018-03-12 03:58
>> To: users
>> Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
>> If you want to strictly "only have one output per window", then for now
>> you'd probably implement that logic using a lower-level "transform"
>> function in which you can schedule a punctuate function to send all the
>> results at the end of a window.
>>
>> If you just want to reduce the amount of data to your sink, but your sink
>> can still handle overwritten records of the same key, you can enlarge the
>> cache size via the cache.max.bytes.buffering config.
>>
>> https://kafka.apache.org/documentation/#streamsconfigs
>>
>> On Fri, Mar 9, 2018 at 9:45 PM, 杰 杨  wrote:
>>
>> > thx for your reply!
>> > I see that it is designed to operate on an infinite, unbounded stream of
>> > data.
>> > now I want to process for  unbounded stream but divided by time
>> interval .
>> > so what can I do for doing this ?
>> >
>> > 
>> > funk...@live.com
>> >
>> > From: Guozhang Wang
>> > Date: 2018-03-10 02:50
>> > To: users
>> > Subject: Re: kafka steams with TimeWindows ,incorrect result
>> > Hi Jie,
>> >
>> > This is by design of Kafka Streams, please read this doc for more
>> details
>> > (search for "outputs of the Wordcount application is actually a
>> continuous
>> > stream of updates"):
>> >
>> > https://kafka.apache.org/0110/documentation/streams/quickstart
>> >
>> > Note this semantics applies for both windowed and un-windowed tables.
>> >
>> >
>> > Guozhang
>> >
>> > On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨  wrote:
>> >
>> > > Hi:
>> > > I used TimeWindow for aggregate data in kafka.
>> > >
>> > > this is code snippet ;
>> > >
>> > >   view.flatMap(new MultipleKeyValueMapper(client)
>> > > ).groupByKey(Serialized.with(Serdes.String(),
>> > > Serdes.serdeFrom(new CountInfoSerializer(), new
>> > > CountInfoDeserializer(
>> > > .windowedBy(TimeWindows.of(6)).reduce(new
>> > > Reducer() {
>> > > @Override
>> > > public CountInfo apply(CountInfo value1, CountInfo
>> value2) {
>> > > return new CountInfo(value1.start + value2.start,
>> > > value1.active + value2.active, value1.fresh + value2.fresh);
>> > > }
>> > > }) .toStream(new KeyValueMapper, CountInfo,
>> > > String>() {
>> > > @Override
>> > > public String apply(Windowed key, CountInfo
>> value) {
>> > > return key.key();
>> > > }
>> > > }).print(Printed.toSysOut());
>> > >
>> > > KafkaStreams streams = new KafkaStreams(builder.build(),
>> > > KStreamReducer.getConf());
>> > > streams.start();
>> > >
>> > > and I test 3 data in kafka .
>> > > and I print key value .
>> > >
>> > >
>> > > [KTABLE-TOSTREAM-07]: [9_9_2018-03-09_hour_
>> > > 21@152060130/152060136], CountInfo{start=12179, active=12179,
>> > > fresh

Re: Re: kafka steams with TimeWindows ,incorrect result

2018-04-27 Thread 杰 杨
Hi:
I don't kown what to do with transform function.
and stream is preapred well
like this at blew
key:
44_14_2018-04-27
value:
CountInfo(start=1,active=0,fresh =0)

there is amount data like that。
how I aggregate it with peer 1 seconds using transform function?



funk...@live.com

From: Guozhang Wang
Date: 2018-04-27 01:50
To: users
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
Using a control message to flush results to downstream (in your case to the
result db) looks good to me as well.

On Thu, Apr 26, 2018 at 10:49 AM, Guozhang Wang  wrote:

> If you're talking about which store to use in your transform function, it
> should be a windowed store.
>
> You can create such a store with the `Stores` factory, and suppose your
> old code has `windowedBy(TimeWindows.of(6))`, then you can do
>
> `
> windows = TimeWindows.of(6);
>
> Stores.WindowStoreBuilder(
> Stores.persistentWindowStore("Counts"),
> windows.maintainMs(),
>
> windows.segments,
>
> windows.size(),
> true)
>
> )
>
> `
>
>
> Guozhang
>
>
>
> On Thu, Apr 26, 2018 at 4:39 AM, 杰 杨  wrote:
>
>> I return back .
>> Which StateStore could I use for this problem?
>> and another idea .I can send 'flush' message into this topic .
>> when received this message could update results to db.
>> I don't know it's work?
>>
>> 
>> funk...@live.com
>>
>> From: Guozhang Wang
>> Date: 2018-03-12 03:58
>> To: users
>> Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
>> If you want to strictly "only have one output per window", then for now
>> you'd probably implement that logic using a lower-level "transform"
>> function in which you can schedule a punctuate function to send all the
>> results at the end of a window.
>>
>> If you just want to reduce the amount of data to your sink, but your sink
>> can still handle overwritten records of the same key, you can enlarge the
>> cache size via the cache.max.bytes.buffering config.
>>
>> https://kafka.apache.org/documentation/#streamsconfigs
>>
>> On Fri, Mar 9, 2018 at 9:45 PM, 杰 杨  wrote:
>>
>> > thx for your reply!
>> > I see that it is designed to operate on an infinite, unbounded stream of
>> > data.
>> > now I want to process for  unbounded stream but divided by time
>> interval .
>> > so what can I do for doing this ?
>> >
>> > 
>> > funk...@live.com
>> >
>> > From: Guozhang Wang
>> > Date: 2018-03-10 02:50
>> > To: users
>> > Subject: Re: kafka steams with TimeWindows ,incorrect result
>> > Hi Jie,
>> >
>> > This is by design of Kafka Streams, please read this doc for more
>> details
>> > (search for "outputs of the Wordcount application is actually a
>> continuous
>> > stream of updates"):
>> >
>> > https://kafka.apache.org/0110/documentation/streams/quickstart
>> >
>> > Note this semantics applies for both windowed and un-windowed tables.
>> >
>> >
>> > Guozhang
>> >
>> > On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨  wrote:
>> >
>> > > Hi:
>> > > I used TimeWindow for aggregate data in kafka.
>> > >
>> > > this is code snippet ;
>> > >
>> > >   view.flatMap(new MultipleKeyValueMapper(client)
>> > > ).groupByKey(Serialized.with(Serdes.String(),
>> > > Serdes.serdeFrom(new CountInfoSerializer(), new
>> > > CountInfoDeserializer(
>> > > .windowedBy(TimeWindows.of(6)).reduce(new
>> > > Reducer() {
>> > > @Override
>> > > public CountInfo apply(CountInfo value1, CountInfo
>> value2) {
>> > > return new CountInfo(value1.start + value2.start,
>> > > value1.active + value2.active, value1.fresh + value2.fresh);
>> > > }
>> > > }) .toStream(new KeyValueMapper, CountInfo,
>> > > String>() {
>> > > @Override
>> > > public String apply(Windowed key, CountInfo
>> value) {
>> > > return key.key();
>> > > }
>> > > }).print(Printed.toSysOut());
>> > >
>> > > KafkaStreams streams = new KafkaStreams(builder.build(),
>> > > KStreamReducer.getConf());
>> > > streams.start();
>> > >
>> > > and I test 3 data in kafka .
>> > > and I print key value .
>> > >
>> > >
>> > > [KTABLE-TOSTREAM-07]: [9_9_2018-03-09_hour_
>> > > 21@152060130/152060136], CountInfo{start=12179, active=12179,
>> > > fresh=12179}
>> > > [KTABLE-TOSTREAM-07]: [9_9_2018-03-09@
>> > 152060130/152060136],
>> > > CountInfo{start=12179, active=12179, fresh=12179}
>> > > [KTABLE-TOSTREAM-07]: [9_9_2018-03-09_hour_
>> > > 21@152060130/152060136], CountInfo{start=3, active=3,
>> > > fresh=3}
>> > > [KTABLE-TOSTREAM-07]: [9_9_2018-03-09@
>> > 152060130/152060136