RE: Abnormal working in the method punctuate and error linked to seesion.timeout.ms
the print is in line 40 of the class Base... De : Hamza HACHANI Envoyé : lundi 28 novembre 2016 01:25:08 À : users@kafka.apache.org Objet : RE: Abnormal working in the method punctuate and error linked to seesion.timeout.ms Hi Eno, Here is the code for the application ExclusiveStatsConnectionDevice which is composed of 4 nodes. For example when i put print("") you would sess the problem of the infinite loop. I preferred to send the whole code sto make it easier to you even though you don't need all of it De : Eno Thereska Envoyé : lundi 28 novembre 2016 01:12:14 À : users@kafka.apache.org Objet : Re: Abnormal working in the method punctuate and error linked to seesion.timeout.ms Hi Hamza, Would you be willing to share some of your code so we can have a look? Thanks Eno > On 28 Nov 2016, at 12:58, Hamza HACHANI wrote: > > Hi Eno. > > The problem is that there is no infinite while loop that i write. > > So I can't understand why the application is doing so. > > > Hamza > > > De : Eno Thereska > Envoyé : dimanche 27 novembre 2016 23:21:24 > À : users@kafka.apache.org > Objet : Re: Abnormal working in the method punctuate and error linked to > seesion.timeout.ms > > Hi Hamza, > > If you have an infinite while loop, that would mean the app would spend all > the time in that loop and poll() would never be called. > > Eno > >> On 28 Nov 2016, at 10:49, Hamza HACHANI wrote: >> >> Hi, >> >> I've some troubles with the method puctuate.In fact when i would like to >> print a string in the method punctuate. >> >> this string would be printed in an indefinitly way as if I printed (while >> (true){print(string)}. >> >> I can't understand what happened.Does any body has an explenation ?. >> >> >> Besides In the other hand,for another application it print the following >> error : >> >> WARN Failed to commit StreamTask #0_0 in thread [StreamThread-1]: >> (org.apache.kafka.streams.processor.internals.StreamThread) >> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be >> completed since the group has already rebalanced and assigned the partitions >> to another member. This means that the time between subsequent calls to >> poll() was longer than the configured session.timeout.ms, which typically >> implies that the poll loop is spending too much time message processing. You >> can address this either by increasing the session timeout or by reducing the >> maximum size of batches returned in poll() with max.poll.records. >> >> When i tried to modify the configuration of the consumer nothing happened. >> >> Any Ideas for this too ? >> >> Thanks in Advance. >> >> >> Hamza >> >
RE: Abnormal working in the method punctuate and error linked to seesion.timeout.ms
Hi Eno, Here is the code for the application ExclusiveStatsConnectionDevice which is composed of 4 nodes. For example when i put print("") you would sess the problem of the infinite loop. I preferred to send the whole code sto make it easier to you even though you don't need all of it De : Eno Thereska Envoyé : lundi 28 novembre 2016 01:12:14 À : users@kafka.apache.org Objet : Re: Abnormal working in the method punctuate and error linked to seesion.timeout.ms Hi Hamza, Would you be willing to share some of your code so we can have a look? Thanks Eno > On 28 Nov 2016, at 12:58, Hamza HACHANI wrote: > > Hi Eno. > > The problem is that there is no infinite while loop that i write. > > So I can't understand why the application is doing so. > > > Hamza > > > De : Eno Thereska > Envoyé : dimanche 27 novembre 2016 23:21:24 > À : users@kafka.apache.org > Objet : Re: Abnormal working in the method punctuate and error linked to > seesion.timeout.ms > > Hi Hamza, > > If you have an infinite while loop, that would mean the app would spend all > the time in that loop and poll() would never be called. > > Eno > >> On 28 Nov 2016, at 10:49, Hamza HACHANI wrote: >> >> Hi, >> >> I've some troubles with the method puctuate.In fact when i would like to >> print a string in the method punctuate. >> >> this string would be printed in an indefinitly way as if I printed (while >> (true){print(string)}. >> >> I can't understand what happened.Does any body has an explenation ?. >> >> >> Besides In the other hand,for another application it print the following >> error : >> >> WARN Failed to commit StreamTask #0_0 in thread [StreamThread-1]: >> (org.apache.kafka.streams.processor.internals.StreamThread) >> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be >> completed since the group has already rebalanced and assigned the partitions >> to another member. This means that the time between subsequent calls to >> poll() was longer than the configured session.timeout.ms, which typically >> implies that the poll loop is spending too much time message processing. You >> can address this either by increasing the session timeout or by reducing the >> maximum size of batches returned in poll() with max.poll.records. >> >> When i tried to modify the configuration of the consumer nothing happened. >> >> Any Ideas for this too ? >> >> Thanks in Advance. >> >> >> Hamza >> >
RE: Abnormal working in the method punctuate and error linked to seesion.timeout.ms
Hi Eno. The problem is that there is no infinite while loop that i write. So I can't understand why the application is doing so. Hamza De : Eno Thereska Envoyé : dimanche 27 novembre 2016 23:21:24 À : users@kafka.apache.org Objet : Re: Abnormal working in the method punctuate and error linked to seesion.timeout.ms Hi Hamza, If you have an infinite while loop, that would mean the app would spend all the time in that loop and poll() would never be called. Eno > On 28 Nov 2016, at 10:49, Hamza HACHANI wrote: > > Hi, > > I've some troubles with the method puctuate.In fact when i would like to > print a string in the method punctuate. > > this string would be printed in an indefinitly way as if I printed (while > (true){print(string)}. > > I can't understand what happened.Does any body has an explenation ?. > > > Besides In the other hand,for another application it print the following > error : > > WARN Failed to commit StreamTask #0_0 in thread [StreamThread-1]: > (org.apache.kafka.streams.processor.internals.StreamThread) > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be > completed since the group has already rebalanced and assigned the partitions > to another member. This means that the time between subsequent calls to > poll() was longer than the configured session.timeout.ms, which typically > implies that the poll loop is spending too much time message processing. You > can address this either by increasing the session timeout or by reducing the > maximum size of batches returned in poll() with max.poll.records. > > When i tried to modify the configuration of the consumer nothing happened. > > Any Ideas for this too ? > > Thanks in Advance. > > > Hamza >
Abnormal working in the method punctuate and error linked to seesion.timeout.ms
Hi, I've some troubles with the method puctuate.In fact when i would like to print a string in the method punctuate. this string would be printed in an indefinitly way as if I printed (while (true){print(string)}. I can't understand what happened.Does any body has an explenation ?. Besides In the other hand,for another application it print the following error : WARN Failed to commit StreamTask #0_0 in thread [StreamThread-1]: (org.apache.kafka.streams.processor.internals.StreamThread) org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. When i tried to modify the configuration of the consumer nothing happened. Any Ideas for this too ? Thanks in Advance. Hamza
RE: Control results coming from windows
OK Thanks Damian. Have a nice day. Hamza De : Damian Guy Envoyé : vendredi 4 novembre 2016 00:58:36 À : users@kafka.apache.org Objet : Re: Control results coming from windows Hi Hamza, I'm not sure what you mean in the first sentence? There are some breaking API changes form 0.10.0 -> 0.10.1, so you may need to change some code. I'd also suggest you thoroughly test with the new version to ensure there are no regressions. There is a known issue with caching that may or may not effect you. It depends on your topology. https://issues.apache.org/jira/browse/KAFKA-4311 - there is a fix for it that will hopefully be merged to trunk and 0.10.1 soon, however i don't have any timeline for when/if a 0.10.1.1 release will be done. The config parameter is StreamsConfig.CACHE_MAX_BYTES_BUFFERING. You should also take a look at https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams Thanks, Damian On Fri, 4 Nov 2016 at 09:44 Hamza HACHANI wrote: > Hi Damian, > > > If i would pass tp the 0.10.1. Is this possible without impacting the > applications that i've already done ? > > If yes then which parameters are needed for deduplication with caching. > Like this you would save me time looking for it. > > > Thanks. > > Hamza > > > De : Damian Guy > Envoyé : jeudi 3 novembre 2016 21:32:52 > À : users@kafka.apache.org > Objet : Re: Control results coming from windows > > Hi Hamza, > > If you are using version 0.10.0, then there is no way of controlling this. > In 0.10.1 we do some deduplication with caching, but you should still > expect multiple results for a window. > > Thanks, > Damian > > On Fri, 4 Nov 2016 at 08:42 Hamza HACHANI wrote: > > > Hi, > > > > > > I implemented window using Stream DSL.But results does get out from the > > window anytime a new data come. Is there anyway to make result get out > > from the window just at the end of the size of the window not like this > way. > > > > In other words, I do need to control the flow of data getting out of the > > window.I'm only interested in the final result not the intermediate ones. > > > > > > Thanks in advance, > > > > Hamza > > >
RE: Control results coming from windows
Hi Damian, If i would pass tp the 0.10.1. Is this possible without impacting the applications that i've already done ? If yes then which parameters are needed for deduplication with caching. Like this you would save me time looking for it. Thanks. Hamza De : Damian Guy Envoyé : jeudi 3 novembre 2016 21:32:52 À : users@kafka.apache.org Objet : Re: Control results coming from windows Hi Hamza, If you are using version 0.10.0, then there is no way of controlling this. In 0.10.1 we do some deduplication with caching, but you should still expect multiple results for a window. Thanks, Damian On Fri, 4 Nov 2016 at 08:42 Hamza HACHANI wrote: > Hi, > > > I implemented window using Stream DSL.But results does get out from the > window anytime a new data come. Is there anyway to make result get out > from the window just at the end of the size of the window not like this way. > > In other words, I do need to control the flow of data getting out of the > window.I'm only interested in the final result not the intermediate ones. > > > Thanks in advance, > > Hamza >
Control results coming from windows
Hi, I implemented window using Stream DSL.But results does get out from the window anytime a new data come. Is there anyway to make result get out from the window just at the end of the size of the window not like this way. In other words, I do need to control the flow of data getting out of the window.I'm only interested in the final result not the intermediate ones. Thanks in advance, Hamza
Re : windowing with the processor api
Thanks a lot. This was very helpful . Hamza - Message de réponse - De : "Eno Thereska" Pour : "users@kafka.apache.org" Objet : windowing with the processor api Date : mer., nov. 2, 2016 19:18 Thanks Matthias, yes, to get window operations, or things like hopping or sliding windows you need to use the DSL (e.g., TimeWindows class). The Processor API is very basic (and thus flexible but) you'll end up re-implementing TimeWindows. Eno > On 2 Nov 2016, at 17:45, Matthias J. Sax wrote: > > -BEGIN PGP SIGNED MESSAGE- > Hash: SHA512 > > A windowed store does not work the way you expect it. The parameter > "windowSize" is not a store parameter itself, but a caching parameter > for the store (only used if caching get's enabled). > > For window support, Streams provide window semantics on top of the > store and the store is not aware of window in this sense. Each window > gets an ID that is encoded in the store key as "record-key:window-ID". > And records timestamps are mapped to window-IDs to find the correct > window a records gets put into... To the store is still a plain > key-value store and is not aware of any windowing stuff. > > I would highly recommend to use DSL to use windows operations. This > should not be a limitation as you can mix-and-match DSL and Processor > API. All you can do with plain processor API you can also do within > DSL via .process(...) > > > - -Matthias > > On 11/2/16 3:49 AM, Hamza HACHANI wrote: >> Hi Eno, >> >> >> What I want to say is that i don't find a place where to define the >> size of the window and where to precise the time of the advance. >> >> >> Hamza >> >> Thanks >> >> De : Eno Thereska >> Envoyé : mardi 1 novembre 2016 22:44:47 À >> : users@kafka.apache.org Objet : Re: windowing with the processor >> api >> >> Hi Hamza, >> >> Are you getting a particular error? Here is an example : >> >> Stores.create("window-store") .withStringKeys() >> .withStringValues() .persistent() .windowed(10, 10, 2, >> false).build(), "the-processor") >> >> Thanks Eno >> >>> On 2 Nov 2016, at 08:19, Hamza HACHANI >>> wrote: >>> >>> Hi, >>> >>> I would like to know if somebody has an idea how to define the >>> size of the window in the processor api. >>> >>> I've been blocked for 6 days looking for a solution. >>> >>> using : >>> >>> Stores.create(...).withStringKeys().withStringValues().persistent().w > indowed(...).build() >>> >>> >>> > I was able to define the retention time but not the the size of the wind > ow. >>> >>> Please help me if possible. >>> >>> Thanks, >>> >>> Hamza >> >> > -BEGIN PGP SIGNATURE- > Comment: GPGTools - https://gpgtools.org > > iQIcBAEBCgAGBQJYGiZDAAoJECnhiMLycopPIJsQAJUmjdbgYctWfKQEmPGcgXip > 0S7Aicm6g1A9imvb0i33J+wGBf8nx4oHHZz53UmXM/DxTf0JlOSqcxGMAM5YTw04 > IVs5r90E+MiuvaqVoWC0FHMOzZwiXx88Rr192jjrgg3KaoAvjo1WeuF0voD7iQK5 > 7eBcVh0jZELxKFMR51Ax9BQ24DQSbrnjX45Opcn8BZEbwDEp+FIZxQfZLRvu2HRK > JFgu0ur8pNNvSw8QDJ2ivBXNZ9sEu1altlmpHlrpYw8N8KmJ2bQXQWZYNZZjDrrm > OjUznGjFsaLGJJymKZiAji49x3anM+h35dzFbJyy0AhXG/mtU3wI5zyxQk8dyue2 > 3iHnehHfdIVj7/STbBSj9ZhcWpvFVotfpLz1Nst5lnN6geGuhBC3ZlHf14yInu/e > 64rzgJHWrnaSOlicPdQ4b2Y+EH3rFfH2iVJfSG0fOL+QzcqFLcuZfnLJV4PJhqiR > qB8mY4p0yTdNZIRgTDc3lyTuVv3+lxj810XCp7evhA3erkGDV/hc0lV8Fmqb5eFC > O3Z3k1N7rlOT7R1ATioONr5JMFgATh0nkpglY91dG38F297PkUZzFpdVe/79gh84 > Z6CE1M32vompN65QQ5P4jB8V24Z0RmaLFhnAknZjZUHCwuLyFaYvJs3RAJVWRIBz > AxqqweAPlcocjRt3DHu3 > =ZLY9 > -END PGP SIGNATURE-
RE: windowing with the processor api
Hi Eno, What I want to say is that i don't find a place where to define the size of the window and where to precise the time of the advance. Hamza Thanks De : Eno Thereska Envoyé : mardi 1 novembre 2016 22:44:47 À : users@kafka.apache.org Objet : Re: windowing with the processor api Hi Hamza, Are you getting a particular error? Here is an example : Stores.create("window-store") .withStringKeys() .withStringValues() .persistent() .windowed(10, 10, 2, false).build(), "the-processor") Thanks Eno > On 2 Nov 2016, at 08:19, Hamza HACHANI wrote: > > Hi, > > I would like to know if somebody has an idea how to define the size of the > window in the processor api. > > I've been blocked for 6 days looking for a solution. > > using : > > Stores.create(...).withStringKeys().withStringValues().persistent().windowed(...).build() > > I was able to define the retention time but not the the size of the window. > > Please help me if possible. > > Thanks, > > Hamza
windowing with the processor api
Hi, I would like to know if somebody has an idea how to define the size of the window in the processor api. I've been blocked for 6 days looking for a solution. using : Stores.create(...).withStringKeys().withStringValues().persistent().windowed(...).build() I was able to define the retention time but not the the size of the window. Please help me if possible. Thanks, Hamza
RE: customised event time
Merci Guoahang. Bonne journée. De : Guozhang Wang Envoyé : lundi 24 octobre 2016 16:50:45 À : users@kafka.apache.org Objet : Re: customised event time Hi Hamza, You can create a windowed store in the processor API via the Stores factory class: org.apache.kafka.streams.state.Stores More specifically, you you do sth. like: Stores.create().withKeys().withValues().persistent().windowed(/* you can specify window size, retention period etc here */) Which returns the RocksDBWindowStoreSupplier. Guozhang On Mon, Oct 24, 2016 at 2:23 AM, Hamza HACHANI wrote: > And the start time and end time of the window. > > In other words i need the notion of windows in the proecessor API. > > Is this possible ? > > ________ > De : Hamza HACHANI > Envoyé : dimanche 23 octobre 2016 20:43:05 > À : users@kafka.apache.org > Objet : RE: customised event time > > To be more specific. > > What id do really need is the property of the retention time dor the > window in the processor API. > > Because for the window i think that i can manage to do this. > > > Hamza > > > De : Hamza HACHANI > Envoyé : dimanche 23 octobre 2016 20:30:13 > À : users@kafka.apache.org > Objet : RE: customised event time > > Hi, > > I think that maybe i'm asking much. > > But Ineed the aspect of windowing in the processor API not in the Stram > DSL. Is this possible? > > The second question is how can i get rid of the intermediate results > because i'm only interested in the final result given by the window. > > Hamza > > > De : Matthias J. Sax > Envoyé : samedi 22 octobre 2016 16:12:45 > À : users@kafka.apache.org > Objet : Re: customised event time > > -BEGIN PGP SIGNED MESSAGE- > Hash: SHA512 > > Hi, > > you can set window retention time via method Windows#until() (and this > retention time is based on the timestamps returned from you custom > timestamp extractor). This keeps all windows until the retention time > passes and thus, all later arrival records will be processed correctly. > > However, Kafka Streams does not close windows as other framework, but > rather gives you an (intermediate) result each time a window is > updated with a new record (regardless if the record is in-order or > late -- you will get a result record in both cases). > > As of Kafka 0.10.1 those (intermediate) results get deduplicated so > you might not receive all (intermediate) results downstream. Of > course, it is ensured, that you will eventually get the latest/final > result sent downstream. > > > - -Matthias > > On 10/21/16 7:42 AM, Hamza HACHANI wrote: > > Hi, > > > > > > I would like to process data based on a customised event time.(a > > timestamp that I implement as part of the message). > > > > The data is processed in periodic windows of x time that are > > parametered via the method punctuate. > > > > What I need is a retention time for the window to be able to treat > > the late arriving messages. > > > > Can I do this : define/configure a retention time for windows . For > > example the window which treat data between 15pm and 16pm forward > > the result not in 16pm but in 16:15 pm. > > > > Thanks in advance for your help. > > > > > > Hamza > > > > > -BEGIN PGP SIGNATURE- > Comment: GPGTools - https://gpgtools.org > > iQIcBAEBCgAGBQJYDDi9AAoJECnhiMLycopPhR8P+wVnRiRwuM7omj3l4EfBVP9Q > 7Vflwzf9TFmCjmtpSkezQVNSOhU7maUEB7R/4rjqacLuALJBPrOZ+mrRfYIx/oJd > E7oc3JXO1dlANMyKQ8SQ9dkXWxkeyNtl9ujkcFhMBejqtamteVaI/iNdUdbs6CCk > AZG86yQKhhPL2Q+BZK0eu88bc0yB5Kwj86uCFFkzMaMXVkvCY1f/kkplRGADsSfd > dfv5LONFzXmJ5dftC1giMis4Pg/kJU9gwvmh+WJB2tSM8KjXf1hZWe4nv1f+af8g > pVLnyepv7i7I6PwX/wSdtY2uSLKjizLSbvR3JsfAqLWSVBMgnPpHjCvGecR8K3IC > FwsnWUk9QJKn4EoahsfRq+afUYva8HRxNH7Aiq+gXvEx9LPgwCqbOGYy4ZnOaoi5 > PycddoNaK1QEn3WYSxN88kLWFx5+vem4LWiBSDBT+Px0p14ZgKwEjqNgyj4fxBtw > SRsrxVvXXcj2qWvjtRCmTYpn7hanAbYT0QoNfIPs20/Jrh3qsaVJIt5xzRuYyxC8 > aobYLlRFLL82eYaPcdQNz/1T0Q7kT9NbEabiuADaZIz7X1yQluMWuTaGIDk/6j6v > A2OqaJd8+KkMMOy9PUX1rBnJhJPZ4CTvkzmjnY10HU7sneNpBvU0tfnPdBBCU15B > 8bsUiTf0lk6t9amGYT6q > =PcW7 > -END PGP SIGNATURE- > -- -- Guozhang
RE: customised event time
And the start time and end time of the window. In other words i need the notion of windows in the proecessor API. Is this possible ? De : Hamza HACHANI Envoyé : dimanche 23 octobre 2016 20:43:05 À : users@kafka.apache.org Objet : RE: customised event time To be more specific. What id do really need is the property of the retention time dor the window in the processor API. Because for the window i think that i can manage to do this. Hamza De : Hamza HACHANI Envoyé : dimanche 23 octobre 2016 20:30:13 À : users@kafka.apache.org Objet : RE: customised event time Hi, I think that maybe i'm asking much. But Ineed the aspect of windowing in the processor API not in the Stram DSL. Is this possible? The second question is how can i get rid of the intermediate results because i'm only interested in the final result given by the window. Hamza De : Matthias J. Sax Envoyé : samedi 22 octobre 2016 16:12:45 À : users@kafka.apache.org Objet : Re: customised event time -BEGIN PGP SIGNED MESSAGE- Hash: SHA512 Hi, you can set window retention time via method Windows#until() (and this retention time is based on the timestamps returned from you custom timestamp extractor). This keeps all windows until the retention time passes and thus, all later arrival records will be processed correctly. However, Kafka Streams does not close windows as other framework, but rather gives you an (intermediate) result each time a window is updated with a new record (regardless if the record is in-order or late -- you will get a result record in both cases). As of Kafka 0.10.1 those (intermediate) results get deduplicated so you might not receive all (intermediate) results downstream. Of course, it is ensured, that you will eventually get the latest/final result sent downstream. - -Matthias On 10/21/16 7:42 AM, Hamza HACHANI wrote: > Hi, > > > I would like to process data based on a customised event time.(a > timestamp that I implement as part of the message). > > The data is processed in periodic windows of x time that are > parametered via the method punctuate. > > What I need is a retention time for the window to be able to treat > the late arriving messages. > > Can I do this : define/configure a retention time for windows . For > example the window which treat data between 15pm and 16pm forward > the result not in 16pm but in 16:15 pm. > > Thanks in advance for your help. > > > Hamza > > -BEGIN PGP SIGNATURE- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJYDDi9AAoJECnhiMLycopPhR8P+wVnRiRwuM7omj3l4EfBVP9Q 7Vflwzf9TFmCjmtpSkezQVNSOhU7maUEB7R/4rjqacLuALJBPrOZ+mrRfYIx/oJd E7oc3JXO1dlANMyKQ8SQ9dkXWxkeyNtl9ujkcFhMBejqtamteVaI/iNdUdbs6CCk AZG86yQKhhPL2Q+BZK0eu88bc0yB5Kwj86uCFFkzMaMXVkvCY1f/kkplRGADsSfd dfv5LONFzXmJ5dftC1giMis4Pg/kJU9gwvmh+WJB2tSM8KjXf1hZWe4nv1f+af8g pVLnyepv7i7I6PwX/wSdtY2uSLKjizLSbvR3JsfAqLWSVBMgnPpHjCvGecR8K3IC FwsnWUk9QJKn4EoahsfRq+afUYva8HRxNH7Aiq+gXvEx9LPgwCqbOGYy4ZnOaoi5 PycddoNaK1QEn3WYSxN88kLWFx5+vem4LWiBSDBT+Px0p14ZgKwEjqNgyj4fxBtw SRsrxVvXXcj2qWvjtRCmTYpn7hanAbYT0QoNfIPs20/Jrh3qsaVJIt5xzRuYyxC8 aobYLlRFLL82eYaPcdQNz/1T0Q7kT9NbEabiuADaZIz7X1yQluMWuTaGIDk/6j6v A2OqaJd8+KkMMOy9PUX1rBnJhJPZ4CTvkzmjnY10HU7sneNpBvU0tfnPdBBCU15B 8bsUiTf0lk6t9amGYT6q =PcW7 -END PGP SIGNATURE-
RE: customised event time
To be more specific. What id do really need is the property of the retention time dor the window in the processor API. Because for the window i think that i can manage to do this. Hamza De : Hamza HACHANI Envoyé : dimanche 23 octobre 2016 20:30:13 À : users@kafka.apache.org Objet : RE: customised event time Hi, I think that maybe i'm asking much. But Ineed the aspect of windowing in the processor API not in the Stram DSL. Is this possible? The second question is how can i get rid of the intermediate results because i'm only interested in the final result given by the window. Hamza De : Matthias J. Sax Envoyé : samedi 22 octobre 2016 16:12:45 À : users@kafka.apache.org Objet : Re: customised event time -BEGIN PGP SIGNED MESSAGE- Hash: SHA512 Hi, you can set window retention time via method Windows#until() (and this retention time is based on the timestamps returned from you custom timestamp extractor). This keeps all windows until the retention time passes and thus, all later arrival records will be processed correctly. However, Kafka Streams does not close windows as other framework, but rather gives you an (intermediate) result each time a window is updated with a new record (regardless if the record is in-order or late -- you will get a result record in both cases). As of Kafka 0.10.1 those (intermediate) results get deduplicated so you might not receive all (intermediate) results downstream. Of course, it is ensured, that you will eventually get the latest/final result sent downstream. - -Matthias On 10/21/16 7:42 AM, Hamza HACHANI wrote: > Hi, > > > I would like to process data based on a customised event time.(a > timestamp that I implement as part of the message). > > The data is processed in periodic windows of x time that are > parametered via the method punctuate. > > What I need is a retention time for the window to be able to treat > the late arriving messages. > > Can I do this : define/configure a retention time for windows . For > example the window which treat data between 15pm and 16pm forward > the result not in 16pm but in 16:15 pm. > > Thanks in advance for your help. > > > Hamza > > -BEGIN PGP SIGNATURE- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJYDDi9AAoJECnhiMLycopPhR8P+wVnRiRwuM7omj3l4EfBVP9Q 7Vflwzf9TFmCjmtpSkezQVNSOhU7maUEB7R/4rjqacLuALJBPrOZ+mrRfYIx/oJd E7oc3JXO1dlANMyKQ8SQ9dkXWxkeyNtl9ujkcFhMBejqtamteVaI/iNdUdbs6CCk AZG86yQKhhPL2Q+BZK0eu88bc0yB5Kwj86uCFFkzMaMXVkvCY1f/kkplRGADsSfd dfv5LONFzXmJ5dftC1giMis4Pg/kJU9gwvmh+WJB2tSM8KjXf1hZWe4nv1f+af8g pVLnyepv7i7I6PwX/wSdtY2uSLKjizLSbvR3JsfAqLWSVBMgnPpHjCvGecR8K3IC FwsnWUk9QJKn4EoahsfRq+afUYva8HRxNH7Aiq+gXvEx9LPgwCqbOGYy4ZnOaoi5 PycddoNaK1QEn3WYSxN88kLWFx5+vem4LWiBSDBT+Px0p14ZgKwEjqNgyj4fxBtw SRsrxVvXXcj2qWvjtRCmTYpn7hanAbYT0QoNfIPs20/Jrh3qsaVJIt5xzRuYyxC8 aobYLlRFLL82eYaPcdQNz/1T0Q7kT9NbEabiuADaZIz7X1yQluMWuTaGIDk/6j6v A2OqaJd8+KkMMOy9PUX1rBnJhJPZ4CTvkzmjnY10HU7sneNpBvU0tfnPdBBCU15B 8bsUiTf0lk6t9amGYT6q =PcW7 -END PGP SIGNATURE-
RE: customised event time
Hi, I think that maybe i'm asking much. But Ineed the aspect of windowing in the processor API not in the Stram DSL. Is this possible? The second question is how can i get rid of the intermediate results because i'm only interested in the final result given by the window. Hamza De : Matthias J. Sax Envoyé : samedi 22 octobre 2016 16:12:45 À : users@kafka.apache.org Objet : Re: customised event time -BEGIN PGP SIGNED MESSAGE- Hash: SHA512 Hi, you can set window retention time via method Windows#until() (and this retention time is based on the timestamps returned from you custom timestamp extractor). This keeps all windows until the retention time passes and thus, all later arrival records will be processed correctly. However, Kafka Streams does not close windows as other framework, but rather gives you an (intermediate) result each time a window is updated with a new record (regardless if the record is in-order or late -- you will get a result record in both cases). As of Kafka 0.10.1 those (intermediate) results get deduplicated so you might not receive all (intermediate) results downstream. Of course, it is ensured, that you will eventually get the latest/final result sent downstream. - -Matthias On 10/21/16 7:42 AM, Hamza HACHANI wrote: > Hi, > > > I would like to process data based on a customised event time.(a > timestamp that I implement as part of the message). > > The data is processed in periodic windows of x time that are > parametered via the method punctuate. > > What I need is a retention time for the window to be able to treat > the late arriving messages. > > Can I do this : define/configure a retention time for windows . For > example the window which treat data between 15pm and 16pm forward > the result not in 16pm but in 16:15 pm. > > Thanks in advance for your help. > > > Hamza > > -BEGIN PGP SIGNATURE- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJYDDi9AAoJECnhiMLycopPhR8P+wVnRiRwuM7omj3l4EfBVP9Q 7Vflwzf9TFmCjmtpSkezQVNSOhU7maUEB7R/4rjqacLuALJBPrOZ+mrRfYIx/oJd E7oc3JXO1dlANMyKQ8SQ9dkXWxkeyNtl9ujkcFhMBejqtamteVaI/iNdUdbs6CCk AZG86yQKhhPL2Q+BZK0eu88bc0yB5Kwj86uCFFkzMaMXVkvCY1f/kkplRGADsSfd dfv5LONFzXmJ5dftC1giMis4Pg/kJU9gwvmh+WJB2tSM8KjXf1hZWe4nv1f+af8g pVLnyepv7i7I6PwX/wSdtY2uSLKjizLSbvR3JsfAqLWSVBMgnPpHjCvGecR8K3IC FwsnWUk9QJKn4EoahsfRq+afUYva8HRxNH7Aiq+gXvEx9LPgwCqbOGYy4ZnOaoi5 PycddoNaK1QEn3WYSxN88kLWFx5+vem4LWiBSDBT+Px0p14ZgKwEjqNgyj4fxBtw SRsrxVvXXcj2qWvjtRCmTYpn7hanAbYT0QoNfIPs20/Jrh3qsaVJIt5xzRuYyxC8 aobYLlRFLL82eYaPcdQNz/1T0Q7kT9NbEabiuADaZIz7X1yQluMWuTaGIDk/6j6v A2OqaJd8+KkMMOy9PUX1rBnJhJPZ4CTvkzmjnY10HU7sneNpBvU0tfnPdBBCU15B 8bsUiTf0lk6t9amGYT6q =PcW7 -END PGP SIGNATURE-
customised event time
Hi, I would like to process data based on a customised event time.(a timestamp that I implement as part of the message). The data is processed in periodic windows of x time that are parametered via the method punctuate. What I need is a retention time for the window to be able to treat the late arriving messages. Can I do this : define/configure a retention time for windows . For example the window which treat data between 15pm and 16pm forward the result not in 16pm but in 16:15 pm. Thanks in advance for your help. Hamza
RE: Problem with ke-value in the KvStorre
Sorry i was saying anything. Please consider as if i didn't say anything. Kafka does ensure the unicity of the key. De : Hamza HACHANI Envoyé : jeudi 13 octobre 2016 01:38:42 À : users@kafka.apache.org Objet : Problem with ke-value in the KvStorre Hi, I've designed two processors with a different topology. The issue is that in the firs topology in one node i was able to associate diffrent messages (key,value) where the key could be the same and so i was able to do something like countbyKey. In the second example when i liked to do the same i noticed that this was not possible. eny new value assiated to a key is erased by the next value so there is a respect to the unicity of the key. I think this is really weird. Does anybody has an explenation or a suggestion ? Thanks in advance, Hamza
Problem with ke-value in the KvStorre
Hi, I've designed two processors with a different topology. The issue is that in the firs topology in one node i was able to associate diffrent messages (key,value) where the key could be the same and so i was able to do something like countbyKey. In the second example when i liked to do the same i noticed that this was not possible. eny new value assiated to a key is erased by the next value so there is a respect to the unicity of the key. I think this is really weird. Does anybody has an explenation or a suggestion ? Thanks in advance, Hamza
RE: difficulty to delete a topic because of its syntax
Yes in fact, The topic in question was a name of a store. Ok i will do it for th matter of JIRA. De : isma...@gmail.com de la part de Ismael Juma Envoyé : mercredi 5 octobre 2016 22:24:53 À : users@kafka.apache.org Objet : Re: difficulty to delete a topic because of its syntax It's worth mentioning that Streams is in the process of transitioning from updating ZooKeeper directly to using the newly introduced create topics and delete topics protocol requests. It was too late for 0.10.1.0, but should land in trunk soonish. Ismael On Thu, Oct 6, 2016 at 11:15 AM, Yuto KAWAMURA wrote: > I guess this topic is created by Kafka Streams. > Kafka Streams has it's own topic creation(zookeeper node creation) > implementation and not using core's AdminUtils to create internal use > topics such as XX-changelog: > https://github.com/apache/kafka/blob/trunk/streams/src/main/ > java/org/apache/kafka/streams/processor/internals/InternalTo > picManager.java#L208 > In AdminUtils it has topic name > validation(https://github.com/apache/kafka/blob/trunk/core/s > rc/main/scala/kafka/common/Topic.scala#L33-L47) > logic but I don't see similar thing in Kafka Streams version as I read > the code briefly. > Since the topic name is created by the store name which is an > arbitrary string given by users, this could happen if you give a > string that contains whitespace as a name of state store. > > > 2016-10-06 18:40 GMT+09:00 Rajini Sivaram : > > Hamza, > > > > Can you raise a JIRA with details on how the topic was created by Kafka > > with an invalid name? Sounds like there might be a missing validation > > somewhere. > > > > Regards, > > > > Rajini > > > > On Thu, Oct 6, 2016 at 10:12 AM, Hamza HACHANI > > wrote: > > > >> Thanks Todd, > >> > >> > >> I've resolved it by suing what you told me. > >> > >> Thanks very much. But i think that there is a problem with kafka by > >> letting the saving names of topic and logs where there is a space as i > >> showes in the images. > >> > >> Have a good day to you all. > >> > >> > >> Hamza > >> > >> > >> De : Hamza HACHANI > >> Envoyé : mercredi 5 octobre 2016 19:23:00 > >> À : users@kafka.apache.org > >> Objet : RE: difficulty to delete a topic because of its syntax > >> > >> > >> Hi, > >> > >> Attached the files showing what i'm talking about. > >> > >> > >> Hamza > >> > >> > >> De : Todd S > >> Envoyé : mercredi 5 octobre 2016 07:25:48 > >> À : users@kafka.apache.org > >> Objet : Re: difficulty to delete a topic because of its syntax > >> > >> You *could* go in to zookeeper and nuke the topic, then delete the > files on > >> disk > >> > >> Slightly more risky but it should work > >> > >> On Wednesday, 5 October 2016, Manikumar > wrote: > >> > >> > Kafka doesn't support white spaces in topic names. Only support '.', > '_' > >> > and '-' these are allowed. > >> > Not sure how you got white space in topic name. > >> > > >> > On Wed, Oct 5, 2016 at 8:19 PM, Hamza HACHANI < > hamza.hach...@supcom.tn > >> > > > >> > wrote: > >> > > >> > > Well ackwardly when i list the topics i find it but when i do > delete it > >> > it > >> > > says that this topic does not exist. > >> > > > >> > > > >> > > De : Ben Davison > > >> > > Envoyé : mercredi 5 octobre 2016 02:37:14 > >> > > À : users@kafka.apache.org > >> > > Objet : Re: difficulty to delete a topic because of its syntax > >> > > > >> > > Try putting "" or '' around the string when running the command. > >> > > > >> > > On Wed, Oct 5, 2016 at 3:29 PM, Hamza HACHANI < > hamza.hach...@supcom.tn > >> > > > >> > > wrote: > >> > > > >> > > > It's between "the" and "metric" > >> > > > > >> > > > > >> > > > De : Ali Akhtar > > >> > > > Envoyé : mercredi 5 octobre 2016 02:16:33 > >> > > > À : users@kafka.apache.org
RE: difficulty to delete a topic because of its syntax
Thanks Todd, I've resolved it by suing what you told me. Thanks very much. But i think that there is a problem with kafka by letting the saving names of topic and logs where there is a space as i showes in the images. Have a good day to you all. Hamza De : Hamza HACHANI Envoyé : mercredi 5 octobre 2016 19:23:00 À : users@kafka.apache.org Objet : RE: difficulty to delete a topic because of its syntax Hi, Attached the files showing what i'm talking about. Hamza De : Todd S Envoyé : mercredi 5 octobre 2016 07:25:48 À : users@kafka.apache.org Objet : Re: difficulty to delete a topic because of its syntax You *could* go in to zookeeper and nuke the topic, then delete the files on disk Slightly more risky but it should work On Wednesday, 5 October 2016, Manikumar wrote: > Kafka doesn't support white spaces in topic names. Only support '.', '_' > and '-' these are allowed. > Not sure how you got white space in topic name. > > On Wed, Oct 5, 2016 at 8:19 PM, Hamza HACHANI > > wrote: > > > Well ackwardly when i list the topics i find it but when i do delete it > it > > says that this topic does not exist. > > > > > > De : Ben Davison > > > Envoyé : mercredi 5 octobre 2016 02:37:14 > > À : users@kafka.apache.org > > Objet : Re: difficulty to delete a topic because of its syntax > > > > Try putting "" or '' around the string when running the command. > > > > On Wed, Oct 5, 2016 at 3:29 PM, Hamza HACHANI > > > wrote: > > > > > It's between "the" and "metric" > > > > > > > > > De : Ali Akhtar > > > > Envoyé : mercredi 5 octobre 2016 02:16:33 > > > À : users@kafka.apache.org > > > Objet : Re: difficulty to delete a topic because of its syntax > > > > > > I don't see a space in that topic name > > > > > > On Wed, Oct 5, 2016 at 6:42 PM, Hamza HACHANI > > > > wrote: > > > > > > > Hi, > > > > > > > > I created a topic called device-connection-invert-key-value-the > > > > metric-changelog. > > > > > > > > I insit that there is a space in it. > > > > > > > > > > > > > > > > Now that i want to delete it because my cluster can no longer work > > > > correctly i can't do it as it only reads the first part of it : ( > > > > device-connection-invert-key-value-the) which obviously it doesn't > > find. > > > > > > > > Does some body have a wolution to delete it ? > > > > > > > > Thanks in advance. > > > > > > > > > > > > Hamza > > > > > > > > > > > > > > > -- > > > > > > This email, including attachments, is private and confidential. If you > have > > received this email in error please notify the sender and delete it from > > your system. Emails are not secure and may contain viruses. No liability > > can be accepted for viruses that might be transferred by this email or > any > > attachment. Any unauthorised copying of this message or unauthorised > > distribution and publication of the information contained herein are > > prohibited. > > > > 7digital Limited. Registered office: 69 Wilson Street, London EC2A 2BB. > > Registered in England and Wales. Registered No. 04843573. > > >
RE: difficulty to delete a topic because of its syntax
Hi, Attached the files showing what i'm talking about. Hamza De : Todd S Envoyé : mercredi 5 octobre 2016 07:25:48 À : users@kafka.apache.org Objet : Re: difficulty to delete a topic because of its syntax You *could* go in to zookeeper and nuke the topic, then delete the files on disk Slightly more risky but it should work On Wednesday, 5 October 2016, Manikumar wrote: > Kafka doesn't support white spaces in topic names. Only support '.', '_' > and '-' these are allowed. > Not sure how you got white space in topic name. > > On Wed, Oct 5, 2016 at 8:19 PM, Hamza HACHANI > > wrote: > > > Well ackwardly when i list the topics i find it but when i do delete it > it > > says that this topic does not exist. > > > > > > De : Ben Davison > > > Envoyé : mercredi 5 octobre 2016 02:37:14 > > À : users@kafka.apache.org > > Objet : Re: difficulty to delete a topic because of its syntax > > > > Try putting "" or '' around the string when running the command. > > > > On Wed, Oct 5, 2016 at 3:29 PM, Hamza HACHANI > > > wrote: > > > > > It's between "the" and "metric" > > > > > > > > > De : Ali Akhtar > > > > Envoyé : mercredi 5 octobre 2016 02:16:33 > > > À : users@kafka.apache.org > > > Objet : Re: difficulty to delete a topic because of its syntax > > > > > > I don't see a space in that topic name > > > > > > On Wed, Oct 5, 2016 at 6:42 PM, Hamza HACHANI > > > > wrote: > > > > > > > Hi, > > > > > > > > I created a topic called device-connection-invert-key-value-the > > > > metric-changelog. > > > > > > > > I insit that there is a space in it. > > > > > > > > > > > > > > > > Now that i want to delete it because my cluster can no longer work > > > > correctly i can't do it as it only reads the first part of it : ( > > > > device-connection-invert-key-value-the) which obviously it doesn't > > find. > > > > > > > > Does some body have a wolution to delete it ? > > > > > > > > Thanks in advance. > > > > > > > > > > > > Hamza > > > > > > > > > > > > > > > -- > > > > > > This email, including attachments, is private and confidential. If you > have > > received this email in error please notify the sender and delete it from > > your system. Emails are not secure and may contain viruses. No liability > > can be accepted for viruses that might be transferred by this email or > any > > attachment. Any unauthorised copying of this message or unauthorised > > distribution and publication of the information contained herein are > > prohibited. > > > > 7digital Limited. Registered office: 69 Wilson Street, London EC2A 2BB. > > Registered in England and Wales. Registered No. 04843573. > > >
RE: difficulty to delete a topic because of its syntax
Well ackwardly when i list the topics i find it but when i do delete it it says that this topic does not exist. De : Ben Davison Envoyé : mercredi 5 octobre 2016 02:37:14 À : users@kafka.apache.org Objet : Re: difficulty to delete a topic because of its syntax Try putting "" or '' around the string when running the command. On Wed, Oct 5, 2016 at 3:29 PM, Hamza HACHANI wrote: > It's between "the" and "metric" > > > De : Ali Akhtar > Envoyé : mercredi 5 octobre 2016 02:16:33 > À : users@kafka.apache.org > Objet : Re: difficulty to delete a topic because of its syntax > > I don't see a space in that topic name > > On Wed, Oct 5, 2016 at 6:42 PM, Hamza HACHANI > wrote: > > > Hi, > > > > I created a topic called device-connection-invert-key-value-the > > metric-changelog. > > > > I insit that there is a space in it. > > > > > > > > Now that i want to delete it because my cluster can no longer work > > correctly i can't do it as it only reads the first part of it : ( > > device-connection-invert-key-value-the) which obviously it doesn't find. > > > > Does some body have a wolution to delete it ? > > > > Thanks in advance. > > > > > > Hamza > > > > > -- This email, including attachments, is private and confidential. If you have received this email in error please notify the sender and delete it from your system. Emails are not secure and may contain viruses. No liability can be accepted for viruses that might be transferred by this email or any attachment. Any unauthorised copying of this message or unauthorised distribution and publication of the information contained herein are prohibited. 7digital Limited. Registered office: 69 Wilson Street, London EC2A 2BB. Registered in England and Wales. Registered No. 04843573.
RE: difficulty to delete a topic because of its syntax
It's between "the" and "metric" De : Ali Akhtar Envoyé : mercredi 5 octobre 2016 02:16:33 À : users@kafka.apache.org Objet : Re: difficulty to delete a topic because of its syntax I don't see a space in that topic name On Wed, Oct 5, 2016 at 6:42 PM, Hamza HACHANI wrote: > Hi, > > I created a topic called device-connection-invert-key-value-the > metric-changelog. > > I insit that there is a space in it. > > > > Now that i want to delete it because my cluster can no longer work > correctly i can't do it as it only reads the first part of it : ( > device-connection-invert-key-value-the) which obviously it doesn't find. > > Does some body have a wolution to delete it ? > > Thanks in advance. > > > Hamza > >
difficulty to delete a topic because of its syntax
Hi, I created a topic called device-connection-invert-key-value-the metric-changelog. I insit that there is a space in it. Now that i want to delete it because my cluster can no longer work correctly i can't do it as it only reads the first part of it : ( device-connection-invert-key-value-the) which obviously it doesn't find. Does some body have a wolution to delete it ? Thanks in advance. Hamza
intilisation of the contexte
Hi, i would like to know how in kafka streams the context is initilised. Because I 've a problem with one kafka-stream apllication. every time i call it i notice that the context is initilaised more than once or is created more than once which is abnormal and this cause a bug in the system. Hamza
RE: Error kafka-stream method punctuate in context.forward()
I'm using the version 10.0 De : Hamza HACHANI Envoyé : lundi 19 septembre 2016 19:20:23 À : users@kafka.apache.org Objet : RE: Error kafka-stream method punctuate in context.forward() Hi Guozhang, Here is the code for the two concerned classes If this can help i fugure out that the instances of ProcessorStatsByHourSupplier and ProcessorStatsByMinuteSupplier which are returned are the same. I think this is the problem. I tried to fix it but i was not to do it. Thanks Guozhang Hamza -- public class StatsByMinute { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "processorByMinute"); // Where to find Kafka broker(s). props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.82:9094,192.168.1.89:9093,192.168.1.82:9092"); // Where to find the corresponding ZooKeeper ensemble. props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "192.168.1.82:2181"); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); TopologyBuilder builder = new TopologyBuilder(); builder.addSource("Source", "uplink"); String countStoreName= "CountsStore" + System.currentTimeMillis(); builder.addProcessor("Process", new ProcessorStatsByMinuteSupplier(new ProcessorStatsByMinute(1, countStoreName)), "Source"); builder.addStateStore(Stores.create(countStoreName).withStringKeys().withStringValues().inMemory().build(), "Process"); builder.addSink("Sink", "statsM", Serdes.String().serializer(), Serdes.String().serializer(), "Process"); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); } } - public class StatsByHour { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "processorByHour"); // Where to find Kafka broker(s). props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.82:9094,192.168.1.89:9093,192.168.1.82:9092"); // Where to find the corresponding ZooKeeper ensemble. props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "192.168.1.82:2181"); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); TopologyBuilder builder = new TopologyBuilder(); builder.addSource("Source", "statsM"); String countStoreName= "CountsStore" + System.currentTimeMillis(); ProcessorStatsByHourSupplier processorStatsByHourSupplier = new ProcessorStatsByHourSupplier(new ProcessorStatsByHour(3, countStoreName)); System.out.println(processorStatsByHourSupplier); builder.addProcessor("Process", processorStatsByHourSupplier, "Source"); builder.addStateStore(Stores.create(countStoreName).withStringKeys().withStringValues().inMemory().build(), "Process"); builder.addSink("Sink", "statsH", Serdes.String().serializer(), Serdes.String().serializer(), "Process"); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); } } --- public class ProcessorStatsByHour extends BaseProcessor { public ProcessorStatsByHour(int countTimeUnit, String countStoreName) { super(TimeUnit.MINUTES, countTimeUnit, countStoreName); } @Override public void process(String key, String json) { Stat stat = deserializeStat(json); if(stat != null) { if ((stat.getNetworkPartnerId() == null) && (stat.getBaseStationId() == null)) { String opKey = stat.getOperatorId(); Stat statOp = deserializeStat(this.kvStore.get(opKey)); if (statOp == null) { statOp = new Stat(); statOp.setCount(stat.getCount()); statOp.setOperatorId(stat.getOperatorId()); this.kvStore.put(opKey, serializeStat(statOp)); } else { statOp.setCount(statOp.getCount() + stat.getCount()); this.kvStore.put(opKey, serializeStat(statOp)); } } else if (stat.getBaseStationId() == null) { String npKey = stat.getOperatorId() + "_" + stat.getNetworkPartnerId(); Stat statNp = deserializeStat(this.kvStore.get(npKey));
RE: Error kafka-stream method punctuate in context.forward()
yValueStore) context.getStateStore(countStoreName); } @Override public void punctuate(long timestamp) { try (KeyValueIterator iter = this.kvStore.all()) { System.out.println("--- " + timestamp + " --- "); while (iter.hasNext()) { System.out.println("--- pass1 --- "); KeyValue entry = iter.next(); Stat stat = deserializeStat(entry.value); if (stat != null) { System.out.println("--- pass2 --- "); stat.setTimestamp(timestamp); } System.out.println("--- pass3 --- "); System.out.println("key"+entry.key); System.out.println("stat"+serializeStat(stat)); System.out.println("context"+context); context.forward(entry.key, serializeStat(stat)); System.out.println("[" + entry.key + ", " + serializeStat(stat) + "]"); iter.remove(); } } finally { context.commit(); } } @Override public void close() { this.kvStore.close(); } protected static Uplink deserialize(String json) { try { return objectMapper.readValue(json, Uplink.class); } catch (IOException e) { System.out.println(e.getMessage()); return new Uplink().setOperatorId("fake").setNetworkPartnerId("fake").setBaseStationId("fake").setTimeStampProduce(60L); } } protected static Stat deserializeStat(String json) { if (json == null) { return null; } try { return objectMapper.readValue(json, Stat.class); } catch (IOException e) { System.out.println(e.getMessage()); return new Stat().setOperatorId("fake").setNetworkPartnerId("fake").setBaseStationId("fake").setTimestamp(System.currentTimeMillis()).setCount(-1L); } } protected static String serializeStat(Stat stat) { try { String s = objectMapper.writeValueAsString(stat); return s; } catch (IOException e) { System.out.println(e.getMessage()); return "{'operatorId':'fake','networkPartnerId':'fake','baseStationId':'fake','count':-1,'timestamp':5}"; } } } De : Guozhang Wang Envoyé : lundi 19 septembre 2016 12:19:36 À : users@kafka.apache.org Objet : Re: Error kafka-stream method punctuate in context.forward() Hello Hamza, Which Kafka version are you using with this application? Also could you share your code skeleton of the StatsByDay processor implementation? Guozhang On Fri, Sep 16, 2016 at 6:58 AM, Hamza HACHANI wrote: > Good morning, > > I have a problem with a kafka-stream application. > > In fact I 've created already two kafka stream applications : > > StatsByMinute : entry topic : uplinks, out topic : statsM. > > StatsByHour : entrey topic : statsM, out topic : statsH. > > StatsByDay : entry topic : statsH, out topic : statsD. > > > > The three of these application hava naerly the same struture ( > StatsByMinute and StatsBy Hour/Stats By Day are only different in the > application ID KVstore and the mthos process() ). > > StatsBy Day and Stats BY Hour have exactly the same structure (the only > exception is the ID parameters) . > > > The Problem is that stastByMinute and StatsByHour works parfectly. > > But this this not the case for StatsByDay where i verified that i do > receive data and process it (so process works). but in the line > context.forward in punctuate there is a problem. > > I get the following error : > > > [2016-09-16 15:44:24,467] ERROR Streams application error during > processing in thread [StreamThread-1]: (org.apache.kafka.streams. > processor.internals.StreamThread) > java.lang.NullPointerException > at org.apache.kafka.streams.processor.internals. > StreamTask.forward(StreamTask.java:336) > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > at com.actility.tpk.stat.BaseProcessor.punctuate( > BaseProcessor.java:54) > at org.apache.kafka.streams.processor.internals. > StreamTask.punctuate(StreamTask.java:227) > at org.apache.kafka.streams.processor.internals. > PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > at org.apache.kafka.streams.processor.internals. > StreamTask.maybePunctu
Error kafka-stream method punctuate in context.forward()
Good morning, I have a problem with a kafka-stream application. In fact I 've created already two kafka stream applications : StatsByMinute : entry topic : uplinks, out topic : statsM. StatsByHour : entrey topic : statsM, out topic : statsH. StatsByDay : entry topic : statsH, out topic : statsD. The three of these application hava naerly the same struture ( StatsByMinute and StatsBy Hour/Stats By Day are only different in the application ID KVstore and the mthos process() ). StatsBy Day and Stats BY Hour have exactly the same structure (the only exception is the ID parameters) . The Problem is that stastByMinute and StatsByHour works parfectly. But this this not the case for StatsByDay where i verified that i do receive data and process it (so process works). but in the line context.forward in punctuate there is a problem. I get the following error : [2016-09-16 15:44:24,467] ERROR Streams application error during processing in thread [StreamThread-1]: (org.apache.kafka.streams.processor.internals.StreamThread) java.lang.NullPointerException at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:336) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at com.actility.tpk.stat.BaseProcessor.punctuate(BaseProcessor.java:54) at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:227) at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:212) at org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:407) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) Exception in thread "StreamThread-1" java.lang.NullPointerException at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:336) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at com.actility.tpk.stat.BaseProcessor.punctuate(BaseProcessor.java:54) at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:227) at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:212) at org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:407) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
RE: Re : A specific use case
Thanks Guozhang Wang. Hamza De : Guozhang Wang Envoyé : jeudi 4 août 2016 06:58:22 À : users@kafka.apache.org Objet : Re: Re : A specific use case Yeah, if you can buffer yourself in the process() function and then rely on punctuate() for generating the outputs that would resolve your issue. Remember that punctuate() function itself is event-time driven so if you do not have any data coming in then it may not be triggered. Details: https://github.com/apache/kafka/pull/1689 Guozhang On Wed, Aug 3, 2016 at 8:53 PM, Hamza HACHANI wrote: > Hi, > Yes in fact . > And ï found à solution. > It was in editing the method punctuate in kafka stream processor. > > - Message de réponse - > De : "Guozhang Wang" > Pour : "users@kafka.apache.org" > Objet : A specific use case > Date : mer., août 3, 2016 23:38 > > Hello Hamza, > > By saying "broker" I think you are actually referring to a Kafka Streams > instance? > > > Guozhang > > On Mon, Aug 1, 2016 at 1:01 AM, Hamza HACHANI > wrote: > > > Good morning, > > > > I'm working on a specific use case. In fact i'm receiving messages from > an > > operator network and trying to do statistics on their number per > > minute,perhour,per day ... > > > > I would like to create a broker that receives the messages and generates > a > > message every minute. These producted messages are consumed by a consumer > > from in one hand and also se,t to an other topic which receives them and > > generates messages every minute. > > > > I've been doing that for a while without a success. In fact the first > > broker in any time it receives a messages ,it produces one and send it to > > the other topic. > > > > My question is ,what i'm trying to do,Is it possible without passing by > an > > intermediate java processus which is out of kafka. > > > > If yes , How ? > > > > Thanks In advance. > > > > > > -- > -- Guozhang > -- -- Guozhang
Re : A specific use case
Hi, Yes in fact . And ï found à solution. It was in editing the method punctuate in kafka stream processor. - Message de réponse - De : "Guozhang Wang" Pour : "users@kafka.apache.org" Objet : A specific use case Date : mer., août 3, 2016 23:38 Hello Hamza, By saying "broker" I think you are actually referring to a Kafka Streams instance? Guozhang On Mon, Aug 1, 2016 at 1:01 AM, Hamza HACHANI wrote: > Good morning, > > I'm working on a specific use case. In fact i'm receiving messages from an > operator network and trying to do statistics on their number per > minute,perhour,per day ... > > I would like to create a broker that receives the messages and generates a > message every minute. These producted messages are consumed by a consumer > from in one hand and also se,t to an other topic which receives them and > generates messages every minute. > > I've been doing that for a while without a success. In fact the first > broker in any time it receives a messages ,it produces one and send it to > the other topic. > > My question is ,what i'm trying to do,Is it possible without passing by an > intermediate java processus which is out of kafka. > > If yes , How ? > > Thanks In advance. > -- -- Guozhang
A specific use case
Good morning, I'm working on a specific use case. In fact i'm receiving messages from an operator network and trying to do statistics on their number per minute,perhour,per day ... I would like to create a broker that receives the messages and generates a message every minute. These producted messages are consumed by a consumer from in one hand and also se,t to an other topic which receives them and generates messages every minute. I've been doing that for a while without a success. In fact the first broker in any time it receives a messages ,it produces one and send it to the other topic. My question is ,what i'm trying to do,Is it possible without passing by an intermediate java processus which is out of kafka. If yes , How ? Thanks In advance.
RE: Kafka streams Issue
Thanks i will try that. Hamza De : Tauzell, Dave Envoyé : vendredi 29 juillet 2016 03:18:47 À : users@kafka.apache.org Objet : RE: Kafka streams Issue Let's say you currently have: Procesing App---> OUTPUT TOPIC ---> output consumer You would ideally like the processing app to only write to the output topic every minute, but cannot easily do this. So what you might be able to do is: Processing App ---> INTERMIDIATE OUTPUT TOPIC ---> Coalesce Process --->>= OUTPUT TOPIC The Coalesce Process is an application that does something like: Bucket = new list() Consumer = createConsumer() While( message = Cosumer.next() ) { Window = calculate current window If message is after Window: Send Bucket to OUTPUT TOPIC Else Add message to Bucket } Dave Tauzell | Senior Software Engineer | Surescripts O: 651.855.3042 | www.surescripts.com<http://www.surescripts.com> | dave.tauz...@surescripts.com Connect with us: Twitter I LinkedIn I Facebook I YouTube -Original Message- From: Hamza HACHANI [mailto:hamza.hach...@supcom.tn] Sent: Friday, July 29, 2016 9:53 AM To: users@kafka.apache.org Subject: RE: Kafka streams Issue Hi Dave, Could you explain a little bit much your idea ? I can't figure out what you are suggesting. Thank you -Hamza De : Tauzell, Dave Envoyé : vendredi 29 juillet 2016 02:39:53 À : users@kafka.apache.org Objet : RE: Kafka streams Issue You could send the message immediately to an intermediary topic. Then have a consumer of that topic that pull messages off and waits until the minute is up. -Dave Dave Tauzell | Senior Software Engineer | Surescripts O: 651.855.3042 | www.surescripts.com<http://www.surescripts.com> | dave.tauz...@surescripts.com Connect with us: Twitter I LinkedIn I Facebook I YouTube -Original Message- From: Hamza HACHANI [mailto:hamza.hach...@supcom.tn] Sent: Friday, July 29, 2016 9:36 AM To: users@kafka.apache.org Subject: Kafka streams Issue > Good morning, > > I'm an ICT student in TELECOM BRRETAGNE (a french school). > I did follow your presentation in Youtube and i found them really > intresting. > I'm trying to do some stuffs with Kafka. And now it has been about 3 > days that I'm blocked. > I'm trying to control the time in which my processing application send > data to the output topic . > What i'm trying to do is to make the application process data from the > input topic all the time but send the messages only at the end of a > minute/an hour/a month (the notion of windowing). > For the moment what i managed to do is that the application instead of > sending data only at the end of the minute,it send it anytime it does > receive it from the input topic. > Have you any suggestions to help me? > I would be really gratfeul. Preliminary answer for now: > For the moment what i managed to do is that the application instead of sending data only at the end > of the minute,it send it anytime it does receive it from the input topic. This is actually the expected behavior at the moment. The main reason for this behavior is that, in stream processing, we never know whether there is still late-arriving data to be received. For example, imagine you have 1-minute windows based on event-time. Here, it may happen that, after the first 1 minute window has passed, another record arrives five minutes later but, according to the record's event-time, it should have still been part of the first 1-minute window. In this case, what we typically want to happen is that the first 1-window will be updated/reprocessed with the late-arriving record included. In other words, just because 1 minute has passed (= the 1-minute window is "done") it does not mean that actually all the data for that time interval has been processed already -- so sending only a single update after 1 minute has passed would even produce incorrect results in many cases. For this reason you currently see a downstream update anytime there is a new incoming data record ("send it anytime it does receive it from the input topic"). So the point here is due ensure correctness of processing. That said, one known drawback of the current behavior is that users haven't been able to control (read: decrease/reduce) the rate/volume of the resulting downstream updates. For example, if you have an input topic with a rate of 1 million msg/s (which is easy for Kafka), some users want to aggregate/window results primarily to reduce the input rate to a lower numbers (e.g. 1 thousand msg/s) so that the data can be fed from Kafka to other systems that might not scale as well as Kafka. To help these use cases we will have a new configuration parameter in the next major version of Kafka that allows you to control the rate/volume
RE: Kafka streams Issue
Hi Dave, Could you explain a little bit much your idea ? I can't figure out what you are suggesting. Thank you -Hamza De : Tauzell, Dave Envoyé : vendredi 29 juillet 2016 02:39:53 À : users@kafka.apache.org Objet : RE: Kafka streams Issue You could send the message immediately to an intermediary topic. Then have a consumer of that topic that pull messages off and waits until the minute is up. -Dave Dave Tauzell | Senior Software Engineer | Surescripts O: 651.855.3042 | www.surescripts.com<http://www.surescripts.com> | dave.tauz...@surescripts.com Connect with us: Twitter I LinkedIn I Facebook I YouTube -Original Message- From: Hamza HACHANI [mailto:hamza.hach...@supcom.tn] Sent: Friday, July 29, 2016 9:36 AM To: users@kafka.apache.org Subject: Kafka streams Issue > Good morning, > > I'm an ICT student in TELECOM BRRETAGNE (a french school). > I did follow your presentation in Youtube and i found them really > intresting. > I'm trying to do some stuffs with Kafka. And now it has been about 3 > days that I'm blocked. > I'm trying to control the time in which my processing application send > data to the output topic . > What i'm trying to do is to make the application process data from the > input topic all the time but send the messages only at the end of a > minute/an hour/a month (the notion of windowing). > For the moment what i managed to do is that the application instead of > sending data only at the end of the minute,it send it anytime it does > receive it from the input topic. > Have you any suggestions to help me? > I would be really gratfeul. Preliminary answer for now: > For the moment what i managed to do is that the application instead of sending data only at the end > of the minute,it send it anytime it does receive it from the input topic. This is actually the expected behavior at the moment. The main reason for this behavior is that, in stream processing, we never know whether there is still late-arriving data to be received. For example, imagine you have 1-minute windows based on event-time. Here, it may happen that, after the first 1 minute window has passed, another record arrives five minutes later but, according to the record's event-time, it should have still been part of the first 1-minute window. In this case, what we typically want to happen is that the first 1-window will be updated/reprocessed with the late-arriving record included. In other words, just because 1 minute has passed (= the 1-minute window is "done") it does not mean that actually all the data for that time interval has been processed already -- so sending only a single update after 1 minute has passed would even produce incorrect results in many cases. For this reason you currently see a downstream update anytime there is a new incoming data record ("send it anytime it does receive it from the input topic"). So the point here is due ensure correctness of processing. That said, one known drawback of the current behavior is that users haven't been able to control (read: decrease/reduce) the rate/volume of the resulting downstream updates. For example, if you have an input topic with a rate of 1 million msg/s (which is easy for Kafka), some users want to aggregate/window results primarily to reduce the input rate to a lower numbers (e.g. 1 thousand msg/s) so that the data can be fed from Kafka to other systems that might not scale as well as Kafka. To help these use cases we will have a new configuration parameter in the next major version of Kafka that allows you to control the rate/volume of downstream updates. Here, the point is to help users optimize resource usage rather than correctness of processing. This new parameter should also help you with your use case. But even this new parameter is not based on strict time behavior or time windows. This e-mail and any files transmitted with it are confidential, may contain sensitive information, and are intended solely for the use of the individual or entity to whom they are addressed. If you have received this e-mail in error, please notify the sender by reply e-mail immediately and destroy all copies of the e-mail and any attachments.
Kafka streams Issue
> Good morning, > > I'm an ICT student in TELECOM BRRETAGNE (a french school). > I did follow your presentation in Youtube and i found them really > intresting. > I'm trying to do some stuffs with Kafka. And now it has been about 3 days > that I'm blocked. > I'm trying to control the time in which my processing application send > data to the output topic . > What i'm trying to do is to make the application process data from the > input topic all the time but send the messages only at the end of a > minute/an hour/a month (the notion of windowing). > For the moment what i managed to do is that the application instead of > sending data only at the end of the minute,it send it anytime it does > receive it from the input topic. > Have you any suggestions to help me? > I would be really gratfeul. Preliminary answer for now: > For the moment what i managed to do is that the application instead of sending data only at the end > of the minute,it send it anytime it does receive it from the input topic. This is actually the expected behavior at the moment. The main reason for this behavior is that, in stream processing, we never know whether there is still late-arriving data to be received. For example, imagine you have 1-minute windows based on event-time. Here, it may happen that, after the first 1 minute window has passed, another record arrives five minutes later but, according to the record's event-time, it should have still been part of the first 1-minute window. In this case, what we typically want to happen is that the first 1-window will be updated/reprocessed with the late-arriving record included. In other words, just because 1 minute has passed (= the 1-minute window is "done") it does not mean that actually all the data for that time interval has been processed already -- so sending only a single update after 1 minute has passed would even produce incorrect results in many cases. For this reason you currently see a downstream update anytime there is a new incoming data record ("send it anytime it does receive it from the input topic"). So the point here is due ensure correctness of processing. That said, one known drawback of the current behavior is that users haven't been able to control (read: decrease/reduce) the rate/volume of the resulting downstream updates. For example, if you have an input topic with a rate of 1 million msg/s (which is easy for Kafka), some users want to aggregate/window results primarily to reduce the input rate to a lower numbers (e.g. 1 thousand msg/s) so that the data can be fed from Kafka to other systems that might not scale as well as Kafka. To help these use cases we will have a new configuration parameter in the next major version of Kafka that allows you to control the rate/volume of downstream updates. Here, the point is to help users optimize resource usage rather than correctness of processing. This new parameter should also help you with your use case. But even this new parameter is not based on strict time behavior or time windows.