RE: Abnormal working in the method punctuate and error linked to seesion.timeout.ms

2016-11-28 Thread Hamza HACHANI
the print is in line 40 of the class Base...


De : Hamza HACHANI 
Envoyé : lundi 28 novembre 2016 01:25:08
À : users@kafka.apache.org
Objet : RE: Abnormal working in the method punctuate and error linked to 
seesion.timeout.ms


Hi Eno,

Here is the code for the application ExclusiveStatsConnectionDevice which is 
composed of 4 nodes.

For example when i put print("")  you would sess the problem of the 
infinite loop.

I preferred to send the whole code sto make it easier to you even though you 
don't need all of it


De : Eno Thereska 
Envoyé : lundi 28 novembre 2016 01:12:14
À : users@kafka.apache.org
Objet : Re: Abnormal working in the method punctuate and error linked to 
seesion.timeout.ms

Hi Hamza,

Would you be willing to share some of your code so we can have a look?

Thanks
Eno
> On 28 Nov 2016, at 12:58, Hamza HACHANI  wrote:
>
> Hi Eno.
>
> The problem is that there is no infinite while loop that i write.
>
> So I can't understand why the application is doing so.
>
>
> Hamza
>
> 
> De : Eno Thereska 
> Envoyé : dimanche 27 novembre 2016 23:21:24
> À : users@kafka.apache.org
> Objet : Re: Abnormal working in the method punctuate and error linked to 
> seesion.timeout.ms
>
> Hi Hamza,
>
> If you have an infinite while loop, that would mean the app would spend all 
> the time in that loop and poll() would never be called.
>
> Eno
>
>> On 28 Nov 2016, at 10:49, Hamza HACHANI  wrote:
>>
>> Hi,
>>
>> I've some troubles with the method puctuate.In fact when i would like to 
>> print a string in the method punctuate.
>>
>> this string would be printed in an indefinitly way as if I printed (while 
>> (true){print(string)}.
>>
>> I can't understand what happened.Does any body has an explenation ?.
>>
>>
>> Besides In the other hand,for another application it print the following 
>> error :
>>
>> WARN Failed to commit StreamTask #0_0 in thread [StreamThread-1]:  
>> (org.apache.kafka.streams.processor.internals.StreamThread)
>> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
>> completed since the group has already rebalanced and assigned the partitions 
>> to another member. This means that the time between subsequent calls to 
>> poll() was longer than the configured session.timeout.ms, which typically 
>> implies that the poll loop is spending too much time message processing. You 
>> can address this either by increasing the session timeout or by reducing the 
>> maximum size of batches returned in poll() with max.poll.records.
>>
>> When i tried to modify the configuration of the consumer nothing happened.
>>
>> Any Ideas for this too ?
>>
>> Thanks in Advance.
>>
>>
>> Hamza
>>
>



RE: Abnormal working in the method punctuate and error linked to seesion.timeout.ms

2016-11-28 Thread Hamza HACHANI
Hi Eno,

Here is the code for the application ExclusiveStatsConnectionDevice which is 
composed of 4 nodes.

For example when i put print("")  you would sess the problem of the 
infinite loop.

I preferred to send the whole code sto make it easier to you even though you 
don't need all of it


De : Eno Thereska 
Envoyé : lundi 28 novembre 2016 01:12:14
À : users@kafka.apache.org
Objet : Re: Abnormal working in the method punctuate and error linked to 
seesion.timeout.ms

Hi Hamza,

Would you be willing to share some of your code so we can have a look?

Thanks
Eno
> On 28 Nov 2016, at 12:58, Hamza HACHANI  wrote:
>
> Hi Eno.
>
> The problem is that there is no infinite while loop that i write.
>
> So I can't understand why the application is doing so.
>
>
> Hamza
>
> 
> De : Eno Thereska 
> Envoyé : dimanche 27 novembre 2016 23:21:24
> À : users@kafka.apache.org
> Objet : Re: Abnormal working in the method punctuate and error linked to 
> seesion.timeout.ms
>
> Hi Hamza,
>
> If you have an infinite while loop, that would mean the app would spend all 
> the time in that loop and poll() would never be called.
>
> Eno
>
>> On 28 Nov 2016, at 10:49, Hamza HACHANI  wrote:
>>
>> Hi,
>>
>> I've some troubles with the method puctuate.In fact when i would like to 
>> print a string in the method punctuate.
>>
>> this string would be printed in an indefinitly way as if I printed (while 
>> (true){print(string)}.
>>
>> I can't understand what happened.Does any body has an explenation ?.
>>
>>
>> Besides In the other hand,for another application it print the following 
>> error :
>>
>> WARN Failed to commit StreamTask #0_0 in thread [StreamThread-1]:  
>> (org.apache.kafka.streams.processor.internals.StreamThread)
>> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
>> completed since the group has already rebalanced and assigned the partitions 
>> to another member. This means that the time between subsequent calls to 
>> poll() was longer than the configured session.timeout.ms, which typically 
>> implies that the poll loop is spending too much time message processing. You 
>> can address this either by increasing the session timeout or by reducing the 
>> maximum size of batches returned in poll() with max.poll.records.
>>
>> When i tried to modify the configuration of the consumer nothing happened.
>>
>> Any Ideas for this too ?
>>
>> Thanks in Advance.
>>
>>
>> Hamza
>>
>



RE: Abnormal working in the method punctuate and error linked to seesion.timeout.ms

2016-11-28 Thread Hamza HACHANI
Hi Eno.

The problem is that there is no infinite while loop that i write.

So I can't understand why the application is doing so.


Hamza


De : Eno Thereska 
Envoyé : dimanche 27 novembre 2016 23:21:24
À : users@kafka.apache.org
Objet : Re: Abnormal working in the method punctuate and error linked to 
seesion.timeout.ms

Hi Hamza,

If you have an infinite while loop, that would mean the app would spend all the 
time in that loop and poll() would never be called.

Eno

> On 28 Nov 2016, at 10:49, Hamza HACHANI  wrote:
>
> Hi,
>
> I've some troubles with the method puctuate.In fact when i would like to 
> print a string in the method punctuate.
>
> this string would be printed in an indefinitly way as if I printed (while 
> (true){print(string)}.
>
> I can't understand what happened.Does any body has an explenation ?.
>
>
> Besides In the other hand,for another application it print the following 
> error :
>
> WARN Failed to commit StreamTask #0_0 in thread [StreamThread-1]:  
> (org.apache.kafka.streams.processor.internals.StreamThread)
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
> completed since the group has already rebalanced and assigned the partitions 
> to another member. This means that the time between subsequent calls to 
> poll() was longer than the configured session.timeout.ms, which typically 
> implies that the poll loop is spending too much time message processing. You 
> can address this either by increasing the session timeout or by reducing the 
> maximum size of batches returned in poll() with max.poll.records.
>
> When i tried to modify the configuration of the consumer nothing happened.
>
> Any Ideas for this too ?
>
> Thanks in Advance.
>
>
> Hamza
>



Abnormal working in the method punctuate and error linked to seesion.timeout.ms

2016-11-28 Thread Hamza HACHANI
Hi,

I've some troubles with the method puctuate.In fact when i would like to print 
a string in the method punctuate.

this string would be printed in an indefinitly way as if I printed (while 
(true){print(string)}.

I can't understand what happened.Does any body has an explenation ?.


Besides In the other hand,for another application it print the following error :

 WARN Failed to commit StreamTask #0_0 in thread [StreamThread-1]:  
(org.apache.kafka.streams.processor.internals.StreamThread)
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
completed since the group has already rebalanced and assigned the partitions to 
another member. This means that the time between subsequent calls to poll() was 
longer than the configured session.timeout.ms, which typically implies that the 
poll loop is spending too much time message processing. You can address this 
either by increasing the session timeout or by reducing the maximum size of 
batches returned in poll() with max.poll.records.

When i tried to modify the configuration of the consumer nothing happened.

Any Ideas for this too ?

Thanks in Advance.


Hamza



RE: Control results coming from windows

2016-11-04 Thread Hamza HACHANI
OK Thanks Damian.

Have a nice day.

Hamza


De : Damian Guy 
Envoyé : vendredi 4 novembre 2016 00:58:36
À : users@kafka.apache.org
Objet : Re: Control results coming from windows

Hi Hamza,

I'm not sure what you mean in the first sentence? There are some breaking
API changes form 0.10.0 -> 0.10.1, so you may need to change some code. I'd
also suggest you thoroughly test with the new version to ensure there are
no regressions.

There is a known issue with caching that may or may not effect you. It
depends on your topology. https://issues.apache.org/jira/browse/KAFKA-4311 -
there is a fix for it that will hopefully be merged to trunk and 0.10.1
soon, however i don't have any timeline for when/if a 0.10.1.1 release will
be done.

The config parameter is StreamsConfig.CACHE_MAX_BYTES_BUFFERING. You should
also take a look at
https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams

Thanks,
Damian


On Fri, 4 Nov 2016 at 09:44 Hamza HACHANI  wrote:

> Hi Damian,
>
>
> If i would pass tp the 0.10.1. Is this possible without impacting the
> applications that i've already done ?
>
> If yes then which parameters are needed for deduplication with caching.
> Like this you would save me time looking for it.
>
>
> Thanks.
>
> Hamza
>
> 
> De : Damian Guy 
> Envoyé : jeudi 3 novembre 2016 21:32:52
> À : users@kafka.apache.org
> Objet : Re: Control results coming from windows
>
> Hi Hamza,
>
> If you are using version 0.10.0, then there is no way of controlling this.
> In 0.10.1 we do some deduplication with caching, but you should still
> expect multiple results for a window.
>
> Thanks,
> Damian
>
> On Fri, 4 Nov 2016 at 08:42 Hamza HACHANI  wrote:
>
> > Hi,
> >
> >
> > I implemented window using Stream DSL.But results does get out from the
> > window anytime a new data come. Is there anyway  to make result get out
> > from the window just at the end of the size of the window not like this
> way.
> >
> > In other words, I do need to control the flow of data getting out of the
> > window.I'm only interested in the final result not the intermediate ones.
> >
> >
> > Thanks in advance,
> >
> > Hamza
> >
>


RE: Control results coming from windows

2016-11-04 Thread Hamza HACHANI
Hi Damian,


If i would pass tp the 0.10.1. Is this possible without impacting the 
applications that i've already done ?

If yes then which parameters are needed for deduplication with caching. Like 
this you would save me time looking for it.


Thanks.

Hamza


De : Damian Guy 
Envoyé : jeudi 3 novembre 2016 21:32:52
À : users@kafka.apache.org
Objet : Re: Control results coming from windows

Hi Hamza,

If you are using version 0.10.0, then there is no way of controlling this.
In 0.10.1 we do some deduplication with caching, but you should still
expect multiple results for a window.

Thanks,
Damian

On Fri, 4 Nov 2016 at 08:42 Hamza HACHANI  wrote:

> Hi,
>
>
> I implemented window using Stream DSL.But results does get out from the
> window anytime a new data come. Is there anyway  to make result get out
> from the window just at the end of the size of the window not like this way.
>
> In other words, I do need to control the flow of data getting out of the
> window.I'm only interested in the final result not the intermediate ones.
>
>
> Thanks in advance,
>
> Hamza
>


Control results coming from windows

2016-11-04 Thread Hamza HACHANI
Hi,


I implemented window using Stream DSL.But results does get out from the window 
anytime a new data come. Is there anyway  to make result get out from the 
window just at the end of the size of the window not like this way.

In other words, I do need to control the flow of data getting out of the 
window.I'm only interested in the final result not the intermediate ones.


Thanks in advance,

Hamza


Re : windowing with the processor api

2016-11-02 Thread Hamza HACHANI
Thanks a lot.
This was very helpful .

Hamza



- Message de réponse -
De : "Eno Thereska" 
Pour : "users@kafka.apache.org" 
Objet : windowing with the processor api
Date : mer., nov. 2, 2016 19:18

Thanks Matthias, yes, to get window operations, or things like hopping or 
sliding windows you need to use the DSL (e.g., TimeWindows class). The 
Processor API is very basic (and thus flexible but) you'll end up 
re-implementing TimeWindows.

Eno

> On 2 Nov 2016, at 17:45, Matthias J. Sax  wrote:
>
> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> A windowed store does not work the way you expect it. The parameter
> "windowSize" is not a store parameter itself, but a caching parameter
> for the store (only used if caching get's enabled).
>
> For window support, Streams provide window semantics on top of the
> store and the store is not aware of window in this sense. Each window
> gets an ID that is encoded in the store key as "record-key:window-ID".
> And records timestamps are mapped to window-IDs to find the correct
> window a records gets put into... To the store is still a plain
> key-value store and is not aware of any windowing stuff.
>
> I would highly recommend to use DSL to use windows operations. This
> should not be a limitation as you can mix-and-match DSL and Processor
> API. All you can do with plain processor API you can also do within
> DSL via .process(...)
>
>
> - -Matthias
>
> On 11/2/16 3:49 AM, Hamza HACHANI wrote:
>> Hi Eno,
>>
>>
>> What I want to say is that i don't find a place where to define the
>> size of the window and where to precise the time of the advance.
>>
>>
>> Hamza
>>
>> Thanks
>>
>>  De : Eno Thereska
>>  Envoyé : mardi 1 novembre 2016 22:44:47 À
>> : users@kafka.apache.org Objet : Re: windowing with the processor
>> api
>>
>> Hi Hamza,
>>
>> Are you getting a particular error? Here is an example :
>>
>> Stores.create("window-store") .withStringKeys()
>> .withStringValues() .persistent() .windowed(10, 10, 2,
>> false).build(), "the-processor")
>>
>> Thanks Eno
>>
>>> On 2 Nov 2016, at 08:19, Hamza HACHANI 
>>> wrote:
>>>
>>> Hi,
>>>
>>> I would like to know if somebody has an idea how to define the
>>> size of the window in the processor api.
>>>
>>> I've been blocked for 6 days looking for a solution.
>>>
>>> using :
>>>
>>> Stores.create(...).withStringKeys().withStringValues().persistent().w
> indowed(...).build()
>>>
>>>
>>>
> I was able to define the retention time but not the the size of the wind
> ow.
>>>
>>> Please help me if possible.
>>>
>>> Thanks,
>>>
>>> Hamza
>>
>>
> -BEGIN PGP SIGNATURE-
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYGiZDAAoJECnhiMLycopPIJsQAJUmjdbgYctWfKQEmPGcgXip
> 0S7Aicm6g1A9imvb0i33J+wGBf8nx4oHHZz53UmXM/DxTf0JlOSqcxGMAM5YTw04
> IVs5r90E+MiuvaqVoWC0FHMOzZwiXx88Rr192jjrgg3KaoAvjo1WeuF0voD7iQK5
> 7eBcVh0jZELxKFMR51Ax9BQ24DQSbrnjX45Opcn8BZEbwDEp+FIZxQfZLRvu2HRK
> JFgu0ur8pNNvSw8QDJ2ivBXNZ9sEu1altlmpHlrpYw8N8KmJ2bQXQWZYNZZjDrrm
> OjUznGjFsaLGJJymKZiAji49x3anM+h35dzFbJyy0AhXG/mtU3wI5zyxQk8dyue2
> 3iHnehHfdIVj7/STbBSj9ZhcWpvFVotfpLz1Nst5lnN6geGuhBC3ZlHf14yInu/e
> 64rzgJHWrnaSOlicPdQ4b2Y+EH3rFfH2iVJfSG0fOL+QzcqFLcuZfnLJV4PJhqiR
> qB8mY4p0yTdNZIRgTDc3lyTuVv3+lxj810XCp7evhA3erkGDV/hc0lV8Fmqb5eFC
> O3Z3k1N7rlOT7R1ATioONr5JMFgATh0nkpglY91dG38F297PkUZzFpdVe/79gh84
> Z6CE1M32vompN65QQ5P4jB8V24Z0RmaLFhnAknZjZUHCwuLyFaYvJs3RAJVWRIBz
> AxqqweAPlcocjRt3DHu3
> =ZLY9
> -END PGP SIGNATURE-



RE: windowing with the processor api

2016-11-02 Thread Hamza HACHANI
Hi Eno,


What I want to say is that i don't find a place where to define the size of the 
window and where to precise the time of the advance.


Hamza

Thanks


De : Eno Thereska 
Envoyé : mardi 1 novembre 2016 22:44:47
À : users@kafka.apache.org
Objet : Re: windowing with the processor api

Hi Hamza,

Are you getting a particular error? Here is an example :

Stores.create("window-store")
  .withStringKeys()
  .withStringValues()
  .persistent()
  .windowed(10, 10, 2, false).build(), "the-processor")

Thanks
Eno

> On 2 Nov 2016, at 08:19, Hamza HACHANI  wrote:
>
> Hi,
>
> I would like to know if somebody has an idea how to define the size of the 
> window in the processor api.
>
> I've been blocked for 6 days looking for a solution.
>
> using :
>
> Stores.create(...).withStringKeys().withStringValues().persistent().windowed(...).build()
>
> I was able to define the retention time but not the the size of the window.
>
> Please help me if possible.
>
> Thanks,
>
> Hamza



windowing with the processor api

2016-11-02 Thread Hamza HACHANI
Hi,

I would like to know if somebody has an idea how to define the size of the 
window in the processor api.

I've been blocked for 6 days looking for a solution.

using :

Stores.create(...).withStringKeys().withStringValues().persistent().windowed(...).build()

I was able to define the retention time but not the the size of the window.

Please help me if possible.

Thanks,

Hamza


RE: customised event time

2016-10-25 Thread Hamza HACHANI
Merci Guoahang.

Bonne journée.


De : Guozhang Wang 
Envoyé : lundi 24 octobre 2016 16:50:45
À : users@kafka.apache.org
Objet : Re: customised event time

Hi Hamza,

You can create a windowed store in the processor API via the Stores factory
class: org.apache.kafka.streams.state.Stores

More specifically, you you do sth. like:

Stores.create().withKeys().withValues().persistent().windowed(/* you can
specify window size, retention period etc here */)


Which returns the RocksDBWindowStoreSupplier.

Guozhang


On Mon, Oct 24, 2016 at 2:23 AM, Hamza HACHANI 
wrote:

> And the start time and end time of the window.
>
> In other words i need the notion of windows in the proecessor API.
>
> Is this possible ?
>
> ________
> De : Hamza HACHANI 
> Envoyé : dimanche 23 octobre 2016 20:43:05
> À : users@kafka.apache.org
> Objet : RE: customised event time
>
> To be more specific.
>
> What id do really need is the property of the retention time dor the
> window in the processor API.
>
> Because for the window  i think that i can manage to do this.
>
>
> Hamza
>
> 
> De : Hamza HACHANI 
> Envoyé : dimanche 23 octobre 2016 20:30:13
> À : users@kafka.apache.org
> Objet : RE: customised event time
>
> Hi,
>
> I think that maybe i'm asking much.
>
> But Ineed the aspect of windowing in the processor API not in the Stram
> DSL. Is this possible?
>
> The second question is how can i get rid of the intermediate results
> because i'm only interested in the final result given by the window.
>
> Hamza
>
> 
> De : Matthias J. Sax 
> Envoyé : samedi 22 octobre 2016 16:12:45
> À : users@kafka.apache.org
> Objet : Re: customised event time
>
> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> Hi,
>
> you can set window retention time via method Windows#until() (and this
> retention time is based on the timestamps returned from you custom
> timestamp extractor). This keeps all windows until the retention time
> passes and thus, all later arrival records will be processed correctly.
>
> However, Kafka Streams does not close windows as other framework, but
> rather gives you an (intermediate) result each time a window is
> updated with a new record (regardless if the record is in-order or
> late -- you will get a result record in both cases).
>
> As of Kafka 0.10.1 those (intermediate) results get deduplicated so
> you might not receive all (intermediate) results downstream. Of
> course, it is ensured, that you will eventually get the latest/final
> result sent downstream.
>
>
> - -Matthias
>
> On 10/21/16 7:42 AM, Hamza HACHANI wrote:
> > Hi,
> >
> >
> > I would like to process data based on a customised event time.(a
> > timestamp that I implement as part of the message).
> >
> > The data is processed in periodic windows of x time that are
> > parametered via the method punctuate.
> >
> > What I need is a retention time for the window to be able to treat
> > the late arriving messages.
> >
> > Can I do this : define/configure a retention time for windows . For
> > example the window which treat data between 15pm and 16pm forward
> > the result not in 16pm but in 16:15 pm.
> >
> > Thanks in advance for your help.
> >
> >
> > Hamza
> >
> >
> -BEGIN PGP SIGNATURE-
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYDDi9AAoJECnhiMLycopPhR8P+wVnRiRwuM7omj3l4EfBVP9Q
> 7Vflwzf9TFmCjmtpSkezQVNSOhU7maUEB7R/4rjqacLuALJBPrOZ+mrRfYIx/oJd
> E7oc3JXO1dlANMyKQ8SQ9dkXWxkeyNtl9ujkcFhMBejqtamteVaI/iNdUdbs6CCk
> AZG86yQKhhPL2Q+BZK0eu88bc0yB5Kwj86uCFFkzMaMXVkvCY1f/kkplRGADsSfd
> dfv5LONFzXmJ5dftC1giMis4Pg/kJU9gwvmh+WJB2tSM8KjXf1hZWe4nv1f+af8g
> pVLnyepv7i7I6PwX/wSdtY2uSLKjizLSbvR3JsfAqLWSVBMgnPpHjCvGecR8K3IC
> FwsnWUk9QJKn4EoahsfRq+afUYva8HRxNH7Aiq+gXvEx9LPgwCqbOGYy4ZnOaoi5
> PycddoNaK1QEn3WYSxN88kLWFx5+vem4LWiBSDBT+Px0p14ZgKwEjqNgyj4fxBtw
> SRsrxVvXXcj2qWvjtRCmTYpn7hanAbYT0QoNfIPs20/Jrh3qsaVJIt5xzRuYyxC8
> aobYLlRFLL82eYaPcdQNz/1T0Q7kT9NbEabiuADaZIz7X1yQluMWuTaGIDk/6j6v
> A2OqaJd8+KkMMOy9PUX1rBnJhJPZ4CTvkzmjnY10HU7sneNpBvU0tfnPdBBCU15B
> 8bsUiTf0lk6t9amGYT6q
> =PcW7
> -END PGP SIGNATURE-
>



--
-- Guozhang


RE: customised event time

2016-10-24 Thread Hamza HACHANI
And the start time and end time of the window.

In other words i need the notion of windows in the proecessor API.

Is this possible ?


De : Hamza HACHANI 
Envoyé : dimanche 23 octobre 2016 20:43:05
À : users@kafka.apache.org
Objet : RE: customised event time

To be more specific.

What id do really need is the property of the retention time dor the window in 
the processor API.

Because for the window  i think that i can manage to do this.


Hamza


De : Hamza HACHANI 
Envoyé : dimanche 23 octobre 2016 20:30:13
À : users@kafka.apache.org
Objet : RE: customised event time

Hi,

I think that maybe i'm asking much.

But Ineed the aspect of windowing in the processor API not in the Stram DSL. Is 
this possible?

The second question is how can i get rid of the intermediate results because 
i'm only interested in the final result given by the window.

Hamza


De : Matthias J. Sax 
Envoyé : samedi 22 octobre 2016 16:12:45
À : users@kafka.apache.org
Objet : Re: customised event time

-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512

Hi,

you can set window retention time via method Windows#until() (and this
retention time is based on the timestamps returned from you custom
timestamp extractor). This keeps all windows until the retention time
passes and thus, all later arrival records will be processed correctly.

However, Kafka Streams does not close windows as other framework, but
rather gives you an (intermediate) result each time a window is
updated with a new record (regardless if the record is in-order or
late -- you will get a result record in both cases).

As of Kafka 0.10.1 those (intermediate) results get deduplicated so
you might not receive all (intermediate) results downstream. Of
course, it is ensured, that you will eventually get the latest/final
result sent downstream.


- -Matthias

On 10/21/16 7:42 AM, Hamza HACHANI wrote:
> Hi,
>
>
> I would like to process data based on a customised event time.(a
> timestamp that I implement as part of the message).
>
> The data is processed in periodic windows of x time that are
> parametered via the method punctuate.
>
> What I need is a retention time for the window to be able to treat
> the late arriving messages.
>
> Can I do this : define/configure a retention time for windows . For
> example the window which treat data between 15pm and 16pm forward
> the result not in 16pm but in 16:15 pm.
>
> Thanks in advance for your help.
>
>
> Hamza
>
>
-BEGIN PGP SIGNATURE-
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYDDi9AAoJECnhiMLycopPhR8P+wVnRiRwuM7omj3l4EfBVP9Q
7Vflwzf9TFmCjmtpSkezQVNSOhU7maUEB7R/4rjqacLuALJBPrOZ+mrRfYIx/oJd
E7oc3JXO1dlANMyKQ8SQ9dkXWxkeyNtl9ujkcFhMBejqtamteVaI/iNdUdbs6CCk
AZG86yQKhhPL2Q+BZK0eu88bc0yB5Kwj86uCFFkzMaMXVkvCY1f/kkplRGADsSfd
dfv5LONFzXmJ5dftC1giMis4Pg/kJU9gwvmh+WJB2tSM8KjXf1hZWe4nv1f+af8g
pVLnyepv7i7I6PwX/wSdtY2uSLKjizLSbvR3JsfAqLWSVBMgnPpHjCvGecR8K3IC
FwsnWUk9QJKn4EoahsfRq+afUYva8HRxNH7Aiq+gXvEx9LPgwCqbOGYy4ZnOaoi5
PycddoNaK1QEn3WYSxN88kLWFx5+vem4LWiBSDBT+Px0p14ZgKwEjqNgyj4fxBtw
SRsrxVvXXcj2qWvjtRCmTYpn7hanAbYT0QoNfIPs20/Jrh3qsaVJIt5xzRuYyxC8
aobYLlRFLL82eYaPcdQNz/1T0Q7kT9NbEabiuADaZIz7X1yQluMWuTaGIDk/6j6v
A2OqaJd8+KkMMOy9PUX1rBnJhJPZ4CTvkzmjnY10HU7sneNpBvU0tfnPdBBCU15B
8bsUiTf0lk6t9amGYT6q
=PcW7
-END PGP SIGNATURE-


RE: customised event time

2016-10-24 Thread Hamza HACHANI
To be more specific.

What id do really need is the property of the retention time dor the window in 
the processor API.

Because for the window  i think that i can manage to do this.


Hamza


De : Hamza HACHANI 
Envoyé : dimanche 23 octobre 2016 20:30:13
À : users@kafka.apache.org
Objet : RE: customised event time

Hi,

I think that maybe i'm asking much.

But Ineed the aspect of windowing in the processor API not in the Stram DSL. Is 
this possible?

The second question is how can i get rid of the intermediate results because 
i'm only interested in the final result given by the window.

Hamza


De : Matthias J. Sax 
Envoyé : samedi 22 octobre 2016 16:12:45
À : users@kafka.apache.org
Objet : Re: customised event time

-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512

Hi,

you can set window retention time via method Windows#until() (and this
retention time is based on the timestamps returned from you custom
timestamp extractor). This keeps all windows until the retention time
passes and thus, all later arrival records will be processed correctly.

However, Kafka Streams does not close windows as other framework, but
rather gives you an (intermediate) result each time a window is
updated with a new record (regardless if the record is in-order or
late -- you will get a result record in both cases).

As of Kafka 0.10.1 those (intermediate) results get deduplicated so
you might not receive all (intermediate) results downstream. Of
course, it is ensured, that you will eventually get the latest/final
result sent downstream.


- -Matthias

On 10/21/16 7:42 AM, Hamza HACHANI wrote:
> Hi,
>
>
> I would like to process data based on a customised event time.(a
> timestamp that I implement as part of the message).
>
> The data is processed in periodic windows of x time that are
> parametered via the method punctuate.
>
> What I need is a retention time for the window to be able to treat
> the late arriving messages.
>
> Can I do this : define/configure a retention time for windows . For
> example the window which treat data between 15pm and 16pm forward
> the result not in 16pm but in 16:15 pm.
>
> Thanks in advance for your help.
>
>
> Hamza
>
>
-BEGIN PGP SIGNATURE-
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYDDi9AAoJECnhiMLycopPhR8P+wVnRiRwuM7omj3l4EfBVP9Q
7Vflwzf9TFmCjmtpSkezQVNSOhU7maUEB7R/4rjqacLuALJBPrOZ+mrRfYIx/oJd
E7oc3JXO1dlANMyKQ8SQ9dkXWxkeyNtl9ujkcFhMBejqtamteVaI/iNdUdbs6CCk
AZG86yQKhhPL2Q+BZK0eu88bc0yB5Kwj86uCFFkzMaMXVkvCY1f/kkplRGADsSfd
dfv5LONFzXmJ5dftC1giMis4Pg/kJU9gwvmh+WJB2tSM8KjXf1hZWe4nv1f+af8g
pVLnyepv7i7I6PwX/wSdtY2uSLKjizLSbvR3JsfAqLWSVBMgnPpHjCvGecR8K3IC
FwsnWUk9QJKn4EoahsfRq+afUYva8HRxNH7Aiq+gXvEx9LPgwCqbOGYy4ZnOaoi5
PycddoNaK1QEn3WYSxN88kLWFx5+vem4LWiBSDBT+Px0p14ZgKwEjqNgyj4fxBtw
SRsrxVvXXcj2qWvjtRCmTYpn7hanAbYT0QoNfIPs20/Jrh3qsaVJIt5xzRuYyxC8
aobYLlRFLL82eYaPcdQNz/1T0Q7kT9NbEabiuADaZIz7X1yQluMWuTaGIDk/6j6v
A2OqaJd8+KkMMOy9PUX1rBnJhJPZ4CTvkzmjnY10HU7sneNpBvU0tfnPdBBCU15B
8bsUiTf0lk6t9amGYT6q
=PcW7
-END PGP SIGNATURE-


RE: customised event time

2016-10-24 Thread Hamza HACHANI
Hi,

I think that maybe i'm asking much.

But Ineed the aspect of windowing in the processor API not in the Stram DSL. Is 
this possible?

The second question is how can i get rid of the intermediate results because 
i'm only interested in the final result given by the window.

Hamza


De : Matthias J. Sax 
Envoyé : samedi 22 octobre 2016 16:12:45
À : users@kafka.apache.org
Objet : Re: customised event time

-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512

Hi,

you can set window retention time via method Windows#until() (and this
retention time is based on the timestamps returned from you custom
timestamp extractor). This keeps all windows until the retention time
passes and thus, all later arrival records will be processed correctly.

However, Kafka Streams does not close windows as other framework, but
rather gives you an (intermediate) result each time a window is
updated with a new record (regardless if the record is in-order or
late -- you will get a result record in both cases).

As of Kafka 0.10.1 those (intermediate) results get deduplicated so
you might not receive all (intermediate) results downstream. Of
course, it is ensured, that you will eventually get the latest/final
result sent downstream.


- -Matthias

On 10/21/16 7:42 AM, Hamza HACHANI wrote:
> Hi,
>
>
> I would like to process data based on a customised event time.(a
> timestamp that I implement as part of the message).
>
> The data is processed in periodic windows of x time that are
> parametered via the method punctuate.
>
> What I need is a retention time for the window to be able to treat
> the late arriving messages.
>
> Can I do this : define/configure a retention time for windows . For
> example the window which treat data between 15pm and 16pm forward
> the result not in 16pm but in 16:15 pm.
>
> Thanks in advance for your help.
>
>
> Hamza
>
>
-BEGIN PGP SIGNATURE-
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYDDi9AAoJECnhiMLycopPhR8P+wVnRiRwuM7omj3l4EfBVP9Q
7Vflwzf9TFmCjmtpSkezQVNSOhU7maUEB7R/4rjqacLuALJBPrOZ+mrRfYIx/oJd
E7oc3JXO1dlANMyKQ8SQ9dkXWxkeyNtl9ujkcFhMBejqtamteVaI/iNdUdbs6CCk
AZG86yQKhhPL2Q+BZK0eu88bc0yB5Kwj86uCFFkzMaMXVkvCY1f/kkplRGADsSfd
dfv5LONFzXmJ5dftC1giMis4Pg/kJU9gwvmh+WJB2tSM8KjXf1hZWe4nv1f+af8g
pVLnyepv7i7I6PwX/wSdtY2uSLKjizLSbvR3JsfAqLWSVBMgnPpHjCvGecR8K3IC
FwsnWUk9QJKn4EoahsfRq+afUYva8HRxNH7Aiq+gXvEx9LPgwCqbOGYy4ZnOaoi5
PycddoNaK1QEn3WYSxN88kLWFx5+vem4LWiBSDBT+Px0p14ZgKwEjqNgyj4fxBtw
SRsrxVvXXcj2qWvjtRCmTYpn7hanAbYT0QoNfIPs20/Jrh3qsaVJIt5xzRuYyxC8
aobYLlRFLL82eYaPcdQNz/1T0Q7kT9NbEabiuADaZIz7X1yQluMWuTaGIDk/6j6v
A2OqaJd8+KkMMOy9PUX1rBnJhJPZ4CTvkzmjnY10HU7sneNpBvU0tfnPdBBCU15B
8bsUiTf0lk6t9amGYT6q
=PcW7
-END PGP SIGNATURE-


customised event time

2016-10-21 Thread Hamza HACHANI
Hi,


I would like to process data based on a customised event time.(a timestamp that 
I implement as part of the message).

The data is processed in periodic windows of x time that are parametered via 
the method punctuate.

What I need is a retention time for the window to be able to treat the late 
arriving messages.

Can I do this : define/configure a retention time for windows . For example the 
window which treat data between 15pm and 16pm forward the result not in 16pm 
but in 16:15 pm.

Thanks in advance for your help.


Hamza



RE: Problem with ke-value in the KvStorre

2016-10-13 Thread Hamza HACHANI
Sorry i was saying anything.

Please consider  as if i didn't say anything.

Kafka does ensure the unicity of the key.


De : Hamza HACHANI 
Envoyé : jeudi 13 octobre 2016 01:38:42
À : users@kafka.apache.org
Objet : Problem with ke-value in the KvStorre

Hi,

I've designed two processors with a different topology.

The issue is that in the firs  topology in one node i was able to associate 
diffrent messages (key,value) where the key could be the same and so i was able 
to do something like countbyKey.

In the second example when i liked to do the same i noticed that this was not 
possible. eny new value assiated to a key is erased by the next value so there 
is a respect to the unicity of the key.

I think this is really weird.

Does anybody has an explenation or a suggestion ?

Thanks in advance,


Hamza


Problem with ke-value in the KvStorre

2016-10-13 Thread Hamza HACHANI
Hi,

I've designed two processors with a different topology.

The issue is that in the firs  topology in one node i was able to associate 
diffrent messages (key,value) where the key could be the same and so i was able 
to do something like countbyKey.

In the second example when i liked to do the same i noticed that this was not 
possible. eny new value assiated to a key is erased by the next value so there 
is a respect to the unicity of the key.

I think this is really weird.

Does anybody has an explenation or a suggestion ?

Thanks in advance,


Hamza


RE: difficulty to delete a topic because of its syntax

2016-10-06 Thread Hamza HACHANI
Yes in fact,

The topic in question was a name of a store.

Ok i will do it for th matter of JIRA.


De : isma...@gmail.com  de la part de Ismael Juma 

Envoyé : mercredi 5 octobre 2016 22:24:53
À : users@kafka.apache.org
Objet : Re: difficulty to delete a topic because of its syntax

It's worth mentioning that Streams is in the process of transitioning from
updating ZooKeeper directly to using the newly introduced create topics and
delete topics protocol requests. It was too late for 0.10.1.0, but should
land in trunk soonish.

Ismael

On Thu, Oct 6, 2016 at 11:15 AM, Yuto KAWAMURA 
wrote:

> I guess this topic is created by Kafka Streams.
> Kafka Streams has it's own topic creation(zookeeper node creation)
> implementation and not using core's AdminUtils to create internal use
> topics such as XX-changelog:
> https://github.com/apache/kafka/blob/trunk/streams/src/main/
> java/org/apache/kafka/streams/processor/internals/InternalTo
> picManager.java#L208
> In AdminUtils it has topic name
> validation(https://github.com/apache/kafka/blob/trunk/core/s
> rc/main/scala/kafka/common/Topic.scala#L33-L47)
> logic but I don't see similar thing in Kafka Streams version as I read
> the code briefly.
> Since the topic name is created by the store name which is an
> arbitrary string given by users, this could happen if you give a
> string that contains whitespace as a name of state store.
>
>
> 2016-10-06 18:40 GMT+09:00 Rajini Sivaram :
> > Hamza,
> >
> > Can you raise a JIRA with details on how the topic was created by Kafka
> > with an invalid name? Sounds like there might be a missing validation
> > somewhere.
> >
> > Regards,
> >
> > Rajini
> >
> > On Thu, Oct 6, 2016 at 10:12 AM, Hamza HACHANI 
> > wrote:
> >
> >> Thanks Todd,
> >>
> >>
> >> I've resolved it by suing what you told me.
> >>
> >> Thanks very much. But i think that there is a problem with kafka by
> >> letting the saving names of topic and logs where there is a space as i
> >> showes in the images.
> >>
> >> Have a good day to you all.
> >>
> >>
> >> Hamza
> >>
> >> 
> >> De : Hamza HACHANI 
> >> Envoyé : mercredi 5 octobre 2016 19:23:00
> >> À : users@kafka.apache.org
> >> Objet : RE: difficulty to delete a topic because of its syntax
> >>
> >>
> >> Hi,
> >>
> >> Attached the files showing what i'm talking about.
> >>
> >>
> >> Hamza
> >>
> >> 
> >> De : Todd S 
> >> Envoyé : mercredi 5 octobre 2016 07:25:48
> >> À : users@kafka.apache.org
> >> Objet : Re: difficulty to delete a topic because of its syntax
> >>
> >> You *could* go in to zookeeper and nuke the topic, then delete the
> files on
> >> disk
> >>
> >> Slightly more risky but it should work
> >>
> >> On Wednesday, 5 October 2016, Manikumar 
> wrote:
> >>
> >> > Kafka doesn't support white spaces in topic names.  Only support '.',
> '_'
> >> > and '-' these are allowed.
> >> > Not sure how you got white space in topic name.
> >> >
> >> > On Wed, Oct 5, 2016 at 8:19 PM, Hamza HACHANI <
> hamza.hach...@supcom.tn
> >> > >
> >> > wrote:
> >> >
> >> > > Well ackwardly when i list the topics i find it but when i do
> delete it
> >> > it
> >> > > says that this topic does not exist.
> >> > >
> >> > > 
> >> > > De : Ben Davison >
> >> > > Envoyé : mercredi 5 octobre 2016 02:37:14
> >> > > À : users@kafka.apache.org 
> >> > > Objet : Re: difficulty to delete a topic because of its syntax
> >> > >
> >> > > Try putting "" or '' around the string when running the command.
> >> > >
> >> > > On Wed, Oct 5, 2016 at 3:29 PM, Hamza HACHANI <
> hamza.hach...@supcom.tn
> >> > >
> >> > > wrote:
> >> > >
> >> > > > It's between "the" and "metric"
> >> > > >
> >> > > > 
> >> > > > De : Ali Akhtar >
> >> > > > Envoyé : mercredi 5 octobre 2016 02:16:33
> >> > > > À : users@kafka.apache.org

RE: difficulty to delete a topic because of its syntax

2016-10-06 Thread Hamza HACHANI
Thanks Todd,


I've resolved it by suing what you told me.

Thanks very much. But i think that there is a problem with kafka by letting the 
saving names of topic and logs where there is a space as i showes in the images.

Have a good day to you all.


Hamza


De : Hamza HACHANI 
Envoyé : mercredi 5 octobre 2016 19:23:00
À : users@kafka.apache.org
Objet : RE: difficulty to delete a topic because of its syntax


Hi,

Attached the files showing what i'm talking about.


Hamza


De : Todd S 
Envoyé : mercredi 5 octobre 2016 07:25:48
À : users@kafka.apache.org
Objet : Re: difficulty to delete a topic because of its syntax

You *could* go in to zookeeper and nuke the topic, then delete the files on
disk

Slightly more risky but it should work

On Wednesday, 5 October 2016, Manikumar  wrote:

> Kafka doesn't support white spaces in topic names.  Only support '.', '_'
> and '-' these are allowed.
> Not sure how you got white space in topic name.
>
> On Wed, Oct 5, 2016 at 8:19 PM, Hamza HACHANI  >
> wrote:
>
> > Well ackwardly when i list the topics i find it but when i do delete it
> it
> > says that this topic does not exist.
> >
> > 
> > De : Ben Davison >
> > Envoyé : mercredi 5 octobre 2016 02:37:14
> > À : users@kafka.apache.org 
> > Objet : Re: difficulty to delete a topic because of its syntax
> >
> > Try putting "" or '' around the string when running the command.
> >
> > On Wed, Oct 5, 2016 at 3:29 PM, Hamza HACHANI  >
> > wrote:
> >
> > > It's between "the" and "metric"
> > >
> > > 
> > > De : Ali Akhtar >
> > > Envoyé : mercredi 5 octobre 2016 02:16:33
> > > À : users@kafka.apache.org 
> > > Objet : Re: difficulty to delete a topic because of its syntax
> > >
> > > I don't see a space in that topic name
> > >
> > > On Wed, Oct 5, 2016 at 6:42 PM, Hamza HACHANI  >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I created a topic called device-connection-invert-key-value-the
> > > > metric-changelog.
> > > >
> > > > I insit that there is a space in it.
> > > >
> > > >
> > > >
> > > > Now that i want to delete it because my  cluster can no longer work
> > > > correctly i can't do it as it  only reads the first part of it : (
> > > > device-connection-invert-key-value-the) which obviously it doesn't
> > find.
> > > >
> > > > Does some body have a wolution to delete it ?
> > > >
> > > > Thanks in advance.
> > > >
> > > >
> > > > Hamza
> > > >
> > > >
> > >
> >
> > --
> >
> >
> > This email, including attachments, is private and confidential. If you
> have
> > received this email in error please notify the sender and delete it from
> > your system. Emails are not secure and may contain viruses. No liability
> > can be accepted for viruses that might be transferred by this email or
> any
> > attachment. Any unauthorised copying of this message or unauthorised
> > distribution and publication of the information contained herein are
> > prohibited.
> >
> > 7digital Limited. Registered office: 69 Wilson Street, London EC2A 2BB.
> > Registered in England and Wales. Registered No. 04843573.
> >
>


RE: difficulty to delete a topic because of its syntax

2016-10-06 Thread Hamza HACHANI
Hi,

Attached the files showing what i'm talking about.


Hamza


De : Todd S 
Envoyé : mercredi 5 octobre 2016 07:25:48
À : users@kafka.apache.org
Objet : Re: difficulty to delete a topic because of its syntax

You *could* go in to zookeeper and nuke the topic, then delete the files on
disk

Slightly more risky but it should work

On Wednesday, 5 October 2016, Manikumar  wrote:

> Kafka doesn't support white spaces in topic names.  Only support '.', '_'
> and '-' these are allowed.
> Not sure how you got white space in topic name.
>
> On Wed, Oct 5, 2016 at 8:19 PM, Hamza HACHANI  >
> wrote:
>
> > Well ackwardly when i list the topics i find it but when i do delete it
> it
> > says that this topic does not exist.
> >
> > 
> > De : Ben Davison >
> > Envoyé : mercredi 5 octobre 2016 02:37:14
> > À : users@kafka.apache.org 
> > Objet : Re: difficulty to delete a topic because of its syntax
> >
> > Try putting "" or '' around the string when running the command.
> >
> > On Wed, Oct 5, 2016 at 3:29 PM, Hamza HACHANI  >
> > wrote:
> >
> > > It's between "the" and "metric"
> > >
> > > 
> > > De : Ali Akhtar >
> > > Envoyé : mercredi 5 octobre 2016 02:16:33
> > > À : users@kafka.apache.org 
> > > Objet : Re: difficulty to delete a topic because of its syntax
> > >
> > > I don't see a space in that topic name
> > >
> > > On Wed, Oct 5, 2016 at 6:42 PM, Hamza HACHANI  >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I created a topic called device-connection-invert-key-value-the
> > > > metric-changelog.
> > > >
> > > > I insit that there is a space in it.
> > > >
> > > >
> > > >
> > > > Now that i want to delete it because my  cluster can no longer work
> > > > correctly i can't do it as it  only reads the first part of it : (
> > > > device-connection-invert-key-value-the) which obviously it doesn't
> > find.
> > > >
> > > > Does some body have a wolution to delete it ?
> > > >
> > > > Thanks in advance.
> > > >
> > > >
> > > > Hamza
> > > >
> > > >
> > >
> >
> > --
> >
> >
> > This email, including attachments, is private and confidential. If you
> have
> > received this email in error please notify the sender and delete it from
> > your system. Emails are not secure and may contain viruses. No liability
> > can be accepted for viruses that might be transferred by this email or
> any
> > attachment. Any unauthorised copying of this message or unauthorised
> > distribution and publication of the information contained herein are
> > prohibited.
> >
> > 7digital Limited. Registered office: 69 Wilson Street, London EC2A 2BB.
> > Registered in England and Wales. Registered No. 04843573.
> >
>


RE: difficulty to delete a topic because of its syntax

2016-10-05 Thread Hamza HACHANI
Well ackwardly when i list the topics i find it but when i do delete it it says 
that this topic does not exist.


De : Ben Davison 
Envoyé : mercredi 5 octobre 2016 02:37:14
À : users@kafka.apache.org
Objet : Re: difficulty to delete a topic because of its syntax

Try putting "" or '' around the string when running the command.

On Wed, Oct 5, 2016 at 3:29 PM, Hamza HACHANI 
wrote:

> It's between "the" and "metric"
>
> 
> De : Ali Akhtar 
> Envoyé : mercredi 5 octobre 2016 02:16:33
> À : users@kafka.apache.org
> Objet : Re: difficulty to delete a topic because of its syntax
>
> I don't see a space in that topic name
>
> On Wed, Oct 5, 2016 at 6:42 PM, Hamza HACHANI 
> wrote:
>
> > Hi,
> >
> > I created a topic called device-connection-invert-key-value-the
> > metric-changelog.
> >
> > I insit that there is a space in it.
> >
> >
> >
> > Now that i want to delete it because my  cluster can no longer work
> > correctly i can't do it as it  only reads the first part of it : (
> > device-connection-invert-key-value-the) which obviously it doesn't find.
> >
> > Does some body have a wolution to delete it ?
> >
> > Thanks in advance.
> >
> >
> > Hamza
> >
> >
>

--


This email, including attachments, is private and confidential. If you have
received this email in error please notify the sender and delete it from
your system. Emails are not secure and may contain viruses. No liability
can be accepted for viruses that might be transferred by this email or any
attachment. Any unauthorised copying of this message or unauthorised
distribution and publication of the information contained herein are
prohibited.

7digital Limited. Registered office: 69 Wilson Street, London EC2A 2BB.
Registered in England and Wales. Registered No. 04843573.


RE: difficulty to delete a topic because of its syntax

2016-10-05 Thread Hamza HACHANI
It's between "the" and "metric"


De : Ali Akhtar 
Envoyé : mercredi 5 octobre 2016 02:16:33
À : users@kafka.apache.org
Objet : Re: difficulty to delete a topic because of its syntax

I don't see a space in that topic name

On Wed, Oct 5, 2016 at 6:42 PM, Hamza HACHANI 
wrote:

> Hi,
>
> I created a topic called device-connection-invert-key-value-the
> metric-changelog.
>
> I insit that there is a space in it.
>
>
>
> Now that i want to delete it because my  cluster can no longer work
> correctly i can't do it as it  only reads the first part of it : (
> device-connection-invert-key-value-the) which obviously it doesn't find.
>
> Does some body have a wolution to delete it ?
>
> Thanks in advance.
>
>
> Hamza
>
>


difficulty to delete a topic because of its syntax

2016-10-05 Thread Hamza HACHANI
Hi,

I created a topic called device-connection-invert-key-value-the 
metric-changelog.

I insit that there is a space in it.



Now that i want to delete it because my  cluster can no longer work correctly i 
can't do it as it  only reads the first part of it : ( 
device-connection-invert-key-value-the) which obviously it doesn't find.

Does some body have a wolution to delete it ?

Thanks in advance.


Hamza



intilisation of the contexte

2016-09-27 Thread Hamza HACHANI
Hi,


i would like to know how in kafka streams  the context is initilised.

Because I 've a problem with one kafka-stream apllication. every time i call it 
i notice that  the context is initilaised more than once or is created more 
than once which is abnormal and this cause a bug in the system.


Hamza



RE: Error kafka-stream method punctuate in context.forward()

2016-09-20 Thread Hamza HACHANI
I'm using the version 10.0


De : Hamza HACHANI
Envoyé : lundi 19 septembre 2016 19:20:23
À : users@kafka.apache.org
Objet : RE: Error kafka-stream method punctuate in context.forward()


Hi Guozhang,

 Here is the code for the two concerned classes

If this can help i fugure out that the instances  of

ProcessorStatsByHourSupplier and  ProcessorStatsByMinuteSupplier  which are 
returned are the same.

I think this is the problem. I tried to fix it but i was not to do it.


Thanks Guozhang

Hamza


--

public class StatsByMinute {

public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "processorByMinute");
// Where to find Kafka broker(s).
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"192.168.1.82:9094,192.168.1.89:9093,192.168.1.82:9092");
// Where to find the corresponding ZooKeeper ensemble.
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "192.168.1.82:2181");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());

TopologyBuilder builder = new TopologyBuilder();

builder.addSource("Source", "uplink");

String countStoreName= "CountsStore" + System.currentTimeMillis();

builder.addProcessor("Process", new ProcessorStatsByMinuteSupplier(new 
ProcessorStatsByMinute(1, countStoreName)), "Source");

builder.addStateStore(Stores.create(countStoreName).withStringKeys().withStringValues().inMemory().build(),
 "Process");
builder.addSink("Sink", "statsM", Serdes.String().serializer(), 
Serdes.String().serializer(), "Process");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}
}


-

public class StatsByHour {

public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "processorByHour");
// Where to find Kafka broker(s).
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"192.168.1.82:9094,192.168.1.89:9093,192.168.1.82:9092");
// Where to find the corresponding ZooKeeper ensemble.
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "192.168.1.82:2181");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());

TopologyBuilder builder = new TopologyBuilder();

builder.addSource("Source", "statsM");

String countStoreName= "CountsStore" + System.currentTimeMillis();

ProcessorStatsByHourSupplier processorStatsByHourSupplier = new 
ProcessorStatsByHourSupplier(new ProcessorStatsByHour(3, countStoreName));
System.out.println(processorStatsByHourSupplier);
builder.addProcessor("Process", processorStatsByHourSupplier, "Source");

builder.addStateStore(Stores.create(countStoreName).withStringKeys().withStringValues().inMemory().build(),
 "Process");
builder.addSink("Sink", "statsH", Serdes.String().serializer(), 
Serdes.String().serializer(), "Process");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}
}


---

public class ProcessorStatsByHour extends BaseProcessor {

public ProcessorStatsByHour(int countTimeUnit, String countStoreName) {
super(TimeUnit.MINUTES, countTimeUnit, countStoreName);
}

@Override
public void process(String key, String json) {
Stat stat = deserializeStat(json);
if(stat != null) {
if ((stat.getNetworkPartnerId() == null) && 
(stat.getBaseStationId() == null)) {
String opKey = stat.getOperatorId();
Stat statOp = deserializeStat(this.kvStore.get(opKey));
if (statOp == null) {
statOp = new Stat();
statOp.setCount(stat.getCount());
statOp.setOperatorId(stat.getOperatorId());
this.kvStore.put(opKey, serializeStat(statOp));
} else {
statOp.setCount(statOp.getCount() + stat.getCount());
this.kvStore.put(opKey, serializeStat(statOp));
}
} else if (stat.getBaseStationId() == null) {
String npKey = stat.getOperatorId() + "_" + 
stat.getNetworkPartnerId();
Stat statNp = deserializeStat(this.kvStore.get(npKey));
 

RE: Error kafka-stream method punctuate in context.forward()

2016-09-20 Thread Hamza HACHANI
yValueStore) 
context.getStateStore(countStoreName);
}

@Override
public void punctuate(long timestamp) {
try (KeyValueIterator iter = this.kvStore.all()) {
System.out.println("--- " + timestamp + " --- ");
while (iter.hasNext()) {
System.out.println("--- pass1 --- ");
KeyValue entry = iter.next();
Stat stat = deserializeStat(entry.value);
if (stat != null) {
System.out.println("--- pass2 --- ");
stat.setTimestamp(timestamp);
}
System.out.println("--- pass3 --- ");
System.out.println("key"+entry.key);
System.out.println("stat"+serializeStat(stat));
System.out.println("context"+context);
context.forward(entry.key, serializeStat(stat));
System.out.println("[" + entry.key + ", " + serializeStat(stat) 
+ "]");
iter.remove();
}
} finally {
context.commit();
}
}

@Override
public void close() {
this.kvStore.close();
}

protected static Uplink deserialize(String json) {
try {
return objectMapper.readValue(json, Uplink.class);
} catch (IOException e) {
System.out.println(e.getMessage());
return new 
Uplink().setOperatorId("fake").setNetworkPartnerId("fake").setBaseStationId("fake").setTimeStampProduce(60L);

}
}

protected static Stat deserializeStat(String json) {
if (json == null) {
return null;
}

try {
return objectMapper.readValue(json, Stat.class);
} catch (IOException e) {
System.out.println(e.getMessage());
return new 
Stat().setOperatorId("fake").setNetworkPartnerId("fake").setBaseStationId("fake").setTimestamp(System.currentTimeMillis()).setCount(-1L);
}
}

protected static String serializeStat(Stat stat) {
try {
String s = objectMapper.writeValueAsString(stat);

return s;
} catch (IOException e) {
System.out.println(e.getMessage());
return 
"{'operatorId':'fake','networkPartnerId':'fake','baseStationId':'fake','count':-1,'timestamp':5}";
}
}
}



De : Guozhang Wang 
Envoyé : lundi 19 septembre 2016 12:19:36
À : users@kafka.apache.org
Objet : Re: Error kafka-stream method punctuate in context.forward()

Hello Hamza,

Which Kafka version are you using with this application? Also could you
share your code skeleton of the StatsByDay processor implementation?


Guozhang


On Fri, Sep 16, 2016 at 6:58 AM, Hamza HACHANI 
wrote:

> Good morning,
>
> I have a problem with a kafka-stream application.
>
> In fact I 've created already two kafka stream applications :
>
> StatsByMinute : entry topic : uplinks, out topic : statsM.
>
> StatsByHour : entrey topic : statsM, out topic : statsH.
>
> StatsByDay : entry topic : statsH, out topic : statsD.
>
>
>
> The three of these application hava naerly the same struture (
> StatsByMinute and StatsBy Hour/Stats By Day are only different in the
> application ID KVstore and the mthos process() ).
>
> StatsBy Day and Stats BY Hour have exactly the same structure (the only
> exception is the ID parameters) .
>
>
> The Problem is that stastByMinute and StatsByHour works parfectly.
>
> But this this not the case for StatsByDay where i verified that i do
> receive data and process it (so process works). but in the line
> context.forward in punctuate  there is a problem.
>
> I get the following error :
>
>
> [2016-09-16 15:44:24,467] ERROR Streams application error during
> processing in thread [StreamThread-1]:  (org.apache.kafka.streams.
> processor.internals.StreamThread)
> java.lang.NullPointerException
> at org.apache.kafka.streams.processor.internals.
> StreamTask.forward(StreamTask.java:336)
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> at com.actility.tpk.stat.BaseProcessor.punctuate(
> BaseProcessor.java:54)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.punctuate(StreamTask.java:227)
> at org.apache.kafka.streams.processor.internals.
> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.maybePunctu

Error kafka-stream method punctuate in context.forward()

2016-09-16 Thread Hamza HACHANI
Good morning,

I have a problem with a kafka-stream application.

In fact I 've created already two kafka stream applications :

StatsByMinute : entry topic : uplinks, out topic : statsM.

StatsByHour : entrey topic : statsM, out topic : statsH.

StatsByDay : entry topic : statsH, out topic : statsD.



The three of these application hava naerly the same struture ( StatsByMinute 
and StatsBy Hour/Stats By Day are only different in the application ID KVstore 
and the mthos process() ).

StatsBy Day and Stats BY Hour have exactly the same structure (the only 
exception is the ID parameters) .


The Problem is that stastByMinute and StatsByHour works parfectly.

But this this not the case for StatsByDay where i verified that i do receive 
data and process it (so process works). but in the line context.forward in 
punctuate  there is a problem.

I get the following error :


[2016-09-16 15:44:24,467] ERROR Streams application error during processing in 
thread [StreamThread-1]:  
(org.apache.kafka.streams.processor.internals.StreamThread)
java.lang.NullPointerException
at 
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:336)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at com.actility.tpk.stat.BaseProcessor.punctuate(BaseProcessor.java:54)
at 
org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:227)
at 
org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
at 
org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:212)
at 
org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:407)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Exception in thread "StreamThread-1" java.lang.NullPointerException
at 
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:336)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at com.actility.tpk.stat.BaseProcessor.punctuate(BaseProcessor.java:54)
at 
org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:227)
at 
org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
at 
org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:212)
at 
org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:407)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)




RE: Re : A specific use case

2016-08-05 Thread Hamza HACHANI
Thanks Guozhang Wang.


Hamza


De : Guozhang Wang 
Envoyé : jeudi 4 août 2016 06:58:22
À : users@kafka.apache.org
Objet : Re: Re : A specific use case

Yeah, if you can buffer yourself in the process() function and then rely on
punctuate() for generating the outputs that would resolve your issue.

Remember that punctuate() function itself is event-time driven so if you do
not have any data coming in then it may not be triggered. Details:

https://github.com/apache/kafka/pull/1689

Guozhang

On Wed, Aug 3, 2016 at 8:53 PM, Hamza HACHANI 
wrote:

> Hi,
> Yes in fact .
> And ï found à solution.
> It was in editing the method punctuate in kafka stream processor.
>
> - Message de réponse -
> De : "Guozhang Wang" 
> Pour : "users@kafka.apache.org" 
> Objet : A specific use case
> Date : mer., août 3, 2016 23:38
>
> Hello Hamza,
>
> By saying "broker" I think you are actually referring to a Kafka Streams
> instance?
>
>
> Guozhang
>
> On Mon, Aug 1, 2016 at 1:01 AM, Hamza HACHANI 
> wrote:
>
> > Good morning,
> >
> > I'm working on a specific use case. In fact i'm receiving messages from
> an
> > operator network and trying to do statistics on their number per
> > minute,perhour,per day ...
> >
> > I would like to create a broker that receives the messages and generates
> a
> > message every minute. These producted messages are consumed by a consumer
> > from in one hand and also se,t to an other topic which receives them and
> > generates messages every minute.
> >
> > I've  been doing that for a while without a success. In fact the first
> > broker in any time it receives a messages ,it produces one and send it to
> > the other topic.
> >
> > My question is ,what i'm trying to do,Is it possible without passing by
> an
> > intermediate java processus which is out of kafka.
> >
> > If yes , How ?
> >
> > Thanks In advance.
> >
>
>
>
> --
> -- Guozhang
>



--
-- Guozhang


Re : A specific use case

2016-08-03 Thread Hamza HACHANI
Hi,
Yes in fact .
And ï found à solution.
It was in editing the method punctuate in kafka stream processor.

- Message de réponse -
De : "Guozhang Wang" 
Pour : "users@kafka.apache.org" 
Objet : A specific use case
Date : mer., août 3, 2016 23:38

Hello Hamza,

By saying "broker" I think you are actually referring to a Kafka Streams
instance?


Guozhang

On Mon, Aug 1, 2016 at 1:01 AM, Hamza HACHANI 
wrote:

> Good morning,
>
> I'm working on a specific use case. In fact i'm receiving messages from an
> operator network and trying to do statistics on their number per
> minute,perhour,per day ...
>
> I would like to create a broker that receives the messages and generates a
> message every minute. These producted messages are consumed by a consumer
> from in one hand and also se,t to an other topic which receives them and
> generates messages every minute.
>
> I've  been doing that for a while without a success. In fact the first
> broker in any time it receives a messages ,it produces one and send it to
> the other topic.
>
> My question is ,what i'm trying to do,Is it possible without passing by an
> intermediate java processus which is out of kafka.
>
> If yes , How ?
>
> Thanks In advance.
>



--
-- Guozhang


A specific use case

2016-08-01 Thread Hamza HACHANI
Good morning,

I'm working on a specific use case. In fact i'm receiving messages from an 
operator network and trying to do statistics on their number per 
minute,perhour,per day ...

I would like to create a broker that receives the messages and generates a 
message every minute. These producted messages are consumed by a consumer from 
in one hand and also se,t to an other topic which receives them and generates 
messages every minute.

I've  been doing that for a while without a success. In fact the first broker 
in any time it receives a messages ,it produces one and send it to the other 
topic.

My question is ,what i'm trying to do,Is it possible without passing by an 
intermediate java processus which is out of kafka.

If yes , How ?

Thanks In advance.


RE: Kafka streams Issue

2016-07-29 Thread Hamza HACHANI
Thanks i will try that.


Hamza


De : Tauzell, Dave 
Envoyé : vendredi 29 juillet 2016 03:18:47
À : users@kafka.apache.org
Objet : RE: Kafka streams Issue

Let's say you currently have:

Procesing App---> OUTPUT TOPIC ---> output consumer

You would ideally like the processing app to only write to the output topic 
every minute, but cannot easily do this.  So what you might be able to do is:


Processing App ---> INTERMIDIATE OUTPUT TOPIC --->  Coalesce Process --->>= 
OUTPUT TOPIC

The Coalesce Process is an application that does something like:

Bucket = new list()
Consumer = createConsumer()
While( message = Cosumer.next() ) {
Window = calculate current window
   If message is after Window:
 Send Bucket to OUTPUT TOPIC
  Else
Add message to Bucket

}

Dave Tauzell | Senior Software Engineer | Surescripts
O: 651.855.3042 | www.surescripts.com<http://www.surescripts.com> |   
dave.tauz...@surescripts.com
Connect with us: Twitter I LinkedIn I Facebook I YouTube


-Original Message-
From: Hamza HACHANI [mailto:hamza.hach...@supcom.tn]
Sent: Friday, July 29, 2016 9:53 AM
To: users@kafka.apache.org
Subject: RE: Kafka streams Issue

Hi Dave,

Could you explain a little bit much your idea ?
I can't figure out what you are suggesting.
Thank you

-Hamza

De : Tauzell, Dave  Envoyé : vendredi 29 juillet 
2016 02:39:53 À : users@kafka.apache.org Objet : RE: Kafka streams Issue

You could send the message immediately to an intermediary topic.  Then have a 
consumer of that topic that pull messages off and waits until the minute is up.

-Dave

Dave Tauzell | Senior Software Engineer | Surescripts
O: 651.855.3042 | www.surescripts.com<http://www.surescripts.com> |   
dave.tauz...@surescripts.com
Connect with us: Twitter I LinkedIn I Facebook I YouTube


-Original Message-
From: Hamza HACHANI [mailto:hamza.hach...@supcom.tn]
Sent: Friday, July 29, 2016 9:36 AM
To: users@kafka.apache.org
Subject: Kafka streams Issue

> Good morning,
>
> I'm an ICT student in TELECOM BRRETAGNE (a french school).
> I did follow your presentation in Youtube and i found them really
> intresting.
> I'm trying to do some stuffs with Kafka. And now it has been  about 3
> days that I'm blocked.
> I'm trying to control the time in which my processing application send
> data to the output topic .
> What i'm trying to do is to make the application process data from the
> input topic all the time but send the messages only at the end of a
> minute/an hour/a month  (the notion of windowing).
> For the moment what i managed to do is that the application instead of
> sending data only at the end of the minute,it send it anytime it does
> receive it from the input topic.
> Have you any suggestions to help me?
> I would be really gratfeul.


Preliminary answer for now:

> For the moment what i managed to do is that the application instead of
sending data only at the end
> of the minute,it send it anytime it does receive it from the input topic.

This is actually the expected behavior at the moment.

The main reason for this behavior is that, in stream processing, we never know 
whether there is still late-arriving data to be received.  For example, imagine 
you have 1-minute windows based on event-time.  Here, it may happen that, after 
the first 1 minute window has passed, another record arrives five minutes later 
but, according to the record's event-time, it should have still been part of 
the first 1-minute window.  In this case, what we typically want to happen is 
that the first 1-window will be updated/reprocessed with the late-arriving 
record included.  In other words, just because 1 minute has passed (= the 
1-minute window is "done") it does not mean that actually all the data for that 
time interval has been processed already -- so sending only a single update 
after 1 minute has passed would even produce incorrect results in many cases.  
For this reason you currently see a downstream update anytime there is a new 
incoming data record ("send it anytime it does receive it from the input 
topic").  So the point here is due ensure correctness of processing.

That said, one known drawback of the current behavior is that users haven't 
been able to control (read: decrease/reduce) the rate/volume of the resulting 
downstream updates.  For example, if you have an input topic with a rate of 1 
million msg/s (which is easy for Kafka), some users want to aggregate/window 
results primarily to reduce the input rate to a lower numbers (e.g. 1 thousand 
msg/s) so that the data can be fed from Kafka to other systems that might not 
scale as well as Kafka.  To help these use cases we will have a new 
configuration parameter in the next major version of Kafka that allows you to 
control the rate/volume

RE: Kafka streams Issue

2016-07-29 Thread Hamza HACHANI
Hi Dave,

Could you explain a little bit much your idea ?
I can't figure out what you are suggesting.
Thank you

-Hamza

De : Tauzell, Dave 
Envoyé : vendredi 29 juillet 2016 02:39:53
À : users@kafka.apache.org
Objet : RE: Kafka streams Issue

You could send the message immediately to an intermediary topic.  Then have a 
consumer of that topic that pull messages off and waits until the minute is up.

-Dave

Dave Tauzell | Senior Software Engineer | Surescripts
O: 651.855.3042 | www.surescripts.com<http://www.surescripts.com> |   
dave.tauz...@surescripts.com
Connect with us: Twitter I LinkedIn I Facebook I YouTube


-Original Message-
From: Hamza HACHANI [mailto:hamza.hach...@supcom.tn]
Sent: Friday, July 29, 2016 9:36 AM
To: users@kafka.apache.org
Subject: Kafka streams Issue

> Good morning,
>
> I'm an ICT student in TELECOM BRRETAGNE (a french school).
> I did follow your presentation in Youtube and i found them really
> intresting.
> I'm trying to do some stuffs with Kafka. And now it has been  about 3
> days that I'm blocked.
> I'm trying to control the time in which my processing application send
> data to the output topic .
> What i'm trying to do is to make the application process data from the
> input topic all the time but send the messages only at the end of a
> minute/an hour/a month  (the notion of windowing).
> For the moment what i managed to do is that the application instead of
> sending data only at the end of the minute,it send it anytime it does
> receive it from the input topic.
> Have you any suggestions to help me?
> I would be really gratfeul.


Preliminary answer for now:

> For the moment what i managed to do is that the application instead of
sending data only at the end
> of the minute,it send it anytime it does receive it from the input topic.

This is actually the expected behavior at the moment.

The main reason for this behavior is that, in stream processing, we never know 
whether there is still late-arriving data to be received.  For example, imagine 
you have 1-minute windows based on event-time.  Here, it may happen that, after 
the first 1 minute window has passed, another record arrives five minutes later 
but, according to the record's event-time, it should have still been part of 
the first 1-minute window.  In this case, what we typically want to happen is 
that the first 1-window will be updated/reprocessed with the late-arriving 
record included.  In other words, just because 1 minute has passed (= the 
1-minute window is "done") it does not mean that actually all the data for that 
time interval has been processed already -- so sending only a single update 
after 1 minute has passed would even produce incorrect results in many cases.  
For this reason you currently see a downstream update anytime there is a new 
incoming data record ("send it anytime it does receive it from the input 
topic").  So the point here is due ensure correctness of processing.

That said, one known drawback of the current behavior is that users haven't 
been able to control (read: decrease/reduce) the rate/volume of the resulting 
downstream updates.  For example, if you have an input topic with a rate of 1 
million msg/s (which is easy for Kafka), some users want to aggregate/window 
results primarily to reduce the input rate to a lower numbers (e.g. 1 thousand 
msg/s) so that the data can be fed from Kafka to other systems that might not 
scale as well as Kafka.  To help these use cases we will have a new 
configuration parameter in the next major version of Kafka that allows you to 
control the rate/volume of downstream updates.
Here, the point is to help users optimize resource usage rather than 
correctness of processing.  This new parameter should also help you with your 
use case.  But even this new parameter is not based on strict time behavior or 
time windows.

This e-mail and any files transmitted with it are confidential, may contain 
sensitive information, and are intended solely for the use of the individual or 
entity to whom they are addressed. If you have received this e-mail in error, 
please notify the sender by reply e-mail immediately and destroy all copies of 
the e-mail and any attachments.


Kafka streams Issue

2016-07-29 Thread Hamza HACHANI
> Good morning,
>
> I'm an ICT student in TELECOM BRRETAGNE (a french school).
> I did follow your presentation in Youtube and i found them really
> intresting.
> I'm trying to do some stuffs with Kafka. And now it has been  about 3 days
> that I'm blocked.
> I'm trying to control the time in which my processing application send
> data to the output topic .
> What i'm trying to do is to make the application process data from the
> input topic all the time but send the messages only at the end of a
> minute/an hour/a month  (the notion of windowing).
> For the moment what i managed to do is that the application instead of
> sending data only at the end of the minute,it send it anytime it does
> receive it from the input topic.
> Have you any suggestions to help me?
> I would be really gratfeul.


Preliminary answer for now:

> For the moment what i managed to do is that the application instead of
sending data only at the end
> of the minute,it send it anytime it does receive it from the input topic.

This is actually the expected behavior at the moment.

The main reason for this behavior is that, in stream processing, we never
know whether there is still late-arriving data to be received.  For
example, imagine you have 1-minute windows based on event-time.  Here, it
may happen that, after the first 1 minute window has passed, another record
arrives five minutes later but, according to the record's event-time, it
should have still been part of the first 1-minute window.  In this case,
what we typically want to happen is that the first 1-window will be
updated/reprocessed with the late-arriving record included.  In other
words, just because 1 minute has passed (= the 1-minute window is "done")
it does not mean that actually all the data for that time interval has been
processed already -- so sending only a single update after 1 minute has
passed would even produce incorrect results in many cases.  For this reason
you currently see a downstream update anytime there is a new incoming data
record ("send it anytime it does receive it from the input topic").  So the
point here is due ensure correctness of processing.

That said, one known drawback of the current behavior is that users haven't
been able to control (read: decrease/reduce) the rate/volume of the
resulting downstream updates.  For example, if you have an input topic with
a rate of 1 million msg/s (which is easy for Kafka), some users want to
aggregate/window results primarily to reduce the input rate to a lower
numbers (e.g. 1 thousand msg/s) so that the data can be fed from Kafka to
other systems that might not scale as well as Kafka.  To help these use
cases we will have a new configuration parameter in the next major version
of Kafka that allows you to control the rate/volume of downstream updates.
Here, the point is to help users optimize resource usage rather than
correctness of processing.  This new parameter should also help you with
your use case.  But even this new parameter is not based on strict time
behavior or time windows.