Re: Is this a decent use case for Kafka Streams?
Unf this notion isn't applicable: "...At the end of a time window..." If you comb through the archives of this group you'll see many questions about notifications for the 'end of an aggregation window' and a similar number of replies from the Kafka group stating that such a notion doesn't really exist. Each window is kept open so that late arriving records can be incorporated. You can specify the lifetime of a given window but you don't get any sort of signal when it expires. A record that arrives after said expiration will trigger a new window to be created. On Wed, Jul 12, 2017 at 5:06 PM, Stephen Powis wrote: > Hey! I was hoping I could get some input from people more experienced with > Kafka Streams to determine if they'd be a good use case/solution for me. > > I have multi-tenant clients submitting data to a Kafka topic that they want > ETL'd to a third party service. I'd like to batch and group these by > tenant over a time window, somewhere between 1 and 5 minutes. At the end > of a time window then issue an API request to the third party service for > each tenant sending the batch of data over. > > Other points of note: > - Ideally we'd have exactly-once semantics, sending data multiple times > would typically be bad. But we'd need to gracefully handle things like API > request errors / service outages. > > - We currently use Storm for doing stream processing, but the long running > time-windows and potentially large amount of data stored in memory make me > a bit nervous to use it for this. > > Thoughts? Thanks in Advance! > Stephen >
network errors?
What do these messages mean: WARN kafka.network.Processor - Attempting to send response via channel for which there is no open connection, connection id 2
"... since it is no longer fetchable"
Attempting to run a KStream app and seeing lots of this sort of error message: > Resuming partition -# > Pausing partition -# > Not returning fetched records for assigned partition -# since it is no longer fetchable This cycles through all the partitions. It seems to get _some_ data from the topic but clearly it's struggling. I've tried restarting each broker in sequence and the logs aren't showing anything abnormal. Using **kafkacat** I can see that there is lots of data available.
joining two windowed aggregations
I want to collect data in two windowed groups - 4 hours with a one hour overlap and a 5 minute / 1 minute. I want to compare the values in the _oldest_ window for each group. Seems like this would be a standard join operation but Im not clear on how to limit which window the join operates on. I could keep a timestamp in each aggregate and if it isn't what I want (IE < 4 hours old) then ignore the join but this seems v inefficient. Likely Im missing the big-picture here again w/re KStreams. I keep running into situations where it seems like Kafka Streams would be a great tool but it just doesn't quite fit. Kind of like having a drawer with mixed metric/std wrenches.
kafka consumers as kubernetes pods - best practice
Im looking for suggestions as to how to manage k-consumers when they are run as kubernetes pods - especially in an auto-scaling environment. Looking at the output of our logging it seems like we're dropping data when a pod is moved between hosts despite doing (what I believe is) an orderly shutdown. Im seeing values for consumer lag drop precipitously when a pod is restarted - as if it were committing an offset at the end of the queue instead of where it should have left off / picked up. This is an investigation-in-progress. Not entirely sure what's happening here - it just looks wrong.
Joining on non-keyed values - how to lookup fields
Id like to further my immersion in kafka-as-database by doing more extensive key/val joins. Specifically there are many instances in the DB world where one is given a numeric field and needs to lookup the appropriate string translation / value. Imagine a record of student/class data where al the courses are numbered and one must determine class / instructor names for a hard copy. Something akin to select from schedules left join classes on schedules.classid = classes.id left join teachers on schedules.teacherid = teachers.id left join textbooks on schedules.textbookid = textbooks.id ... and so on. In the KTable world (AFIACT) this is only possible for the key the source record uses. So called "internal" values can't be looked up. I could imagine running each record through a 'map' cycle to rearrange the key for each lookup column, remap and repeat but this seems a bit onerous. Perhaps using a Process step one could use additional streams? Dunno. Using log-compaction these 'matching/lookup' topics could be kept available. Was reading this missive ( https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Non-key+KTable-KTable+Joins). Seems like the right direction but misses this point. Any thoughts on this? Am I missing an obvious solution? (I hope so - this would be a cool use case)
Re: Understanding ReadOnlyWindowStore.fetch
I remain more than mystified about the workings of the StateStore. I tried making aggregations with a 1minute window, 10 second advance and a _12 hour_ retention (which is longer than the retention.ms of the topic). I still couldn't get more than a 15% hit rate on the StateStore. Are there configuration settings? Some properties file to setup RocksDB? Im not getting any errors - just not getting any data. On Wed, Mar 29, 2017 at 12:52 PM, Jon Yeargers wrote: > So '.until()' is based on clock time / elapsed time (IE record age) / > something else? > > The fact that Im seeing lots of records come through that can't be found > in the Store - these are 'old' and already expired? > > Going forward - it would be useful to have different forms of '.until()' > so one could consume old records (EG if one was catching up from lag) > without having to worry about them immediately disappearing. > > On Wed, Mar 29, 2017 at 10:37 AM, Damian Guy wrote: > >> Jon, >> >> You should be able to query anything that has not expired, i.e., based on >> TimeWindows.until(..). >> >> Thanks, >> Damian >> >> On Wed, 29 Mar 2017 at 17:24 Jon Yeargers >> wrote: >> >> > To be a bit more specific: >> > >> > If I call this: KTable kt = >> > sourceStream.groupByKey().reduce(..., "somekeystore"); >> > >> > and then call this: >> > >> > kt.forEach()-> ... >> > >> > Can I assume that everything that comes out will be available in >> > "somekeystore"? If not, what subset should I expect to find there? >> > >> > On Wed, Mar 29, 2017 at 8:34 AM, Jon Yeargers > > >> > wrote: >> > >> > > But if a key shows up in a KTable->forEach should it be available in >> the >> > > StateStore (from the KTable)? >> > > >> > > On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll >> > > wrote: >> > > >> > >> Jon, >> > >> >> > >> there's a related example, using a window store and a key-value >> store, >> > at >> > >> https://github.com/confluentinc/examples/blob/3.2.x/kafka- >> > >> streams/src/test/java/io/confluent/examples/streams/Val >> > >> idateStateWithInteractiveQueriesLambdaIntegrationTest.java >> > >> (this is for Confluent 3.2 / Kafka 0.10.2). >> > >> >> > >> -Michael >> > >> >> > >> >> > >> >> > >> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers < >> jon.yearg...@cedexis.com >> > > >> > >> wrote: >> > >> >> > >> > Im only running one instance (locally) to keep things simple. >> > >> > >> > >> > Reduction: >> > >> > >> > >> > KTable, String> hourAggStore = >> > >> > sourceStream.groupByKey().reduce(rowReducer, >> > >> > TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 * >> > >> > 1000).until(70 * 60 * 1000L), >> > >> > "HourAggStore"); >> > >> > >> > >> > then I get values to look for via: >> > >> > >> > >> > hourAggStore.foreach((k, v) -> { >> > >> > LogLine logLine = objectMapper.readValue(v, >> > >> logLine.class); >> > >> > LOGGER.debug("{}", k.key()); >> > >> > }); >> > >> > >> > >> > Ive kept it easy by requesting everything from 0 to >> > >> > 'System.currentTimeMillis()'. Retrieval is done using a snip from >> your >> > >> > sample code "windowedByKey". >> > >> > >> > >> > Requests are sent in via curl and output through the same channel. >> I >> > >> pass >> > >> > in the key and ask for any values. >> > >> > >> > >> > Ive looked at the values passed in / out of the reduction function >> and >> > >> they >> > >> > look sane. >> > >> > >> > >> > My assumption is that if a value shows up in the 'forEach' loop >> this >> > >> > implies it exists in the StateStore. Accurate? >> > >> > >> > >> > In fact, only about one in 10 requests actual
Re: Understanding ReadOnlyWindowStore.fetch
So '.until()' is based on clock time / elapsed time (IE record age) / something else? The fact that Im seeing lots of records come through that can't be found in the Store - these are 'old' and already expired? Going forward - it would be useful to have different forms of '.until()' so one could consume old records (EG if one was catching up from lag) without having to worry about them immediately disappearing. On Wed, Mar 29, 2017 at 10:37 AM, Damian Guy wrote: > Jon, > > You should be able to query anything that has not expired, i.e., based on > TimeWindows.until(..). > > Thanks, > Damian > > On Wed, 29 Mar 2017 at 17:24 Jon Yeargers > wrote: > > > To be a bit more specific: > > > > If I call this: KTable kt = > > sourceStream.groupByKey().reduce(..., "somekeystore"); > > > > and then call this: > > > > kt.forEach()-> ... > > > > Can I assume that everything that comes out will be available in > > "somekeystore"? If not, what subset should I expect to find there? > > > > On Wed, Mar 29, 2017 at 8:34 AM, Jon Yeargers > > wrote: > > > > > But if a key shows up in a KTable->forEach should it be available in > the > > > StateStore (from the KTable)? > > > > > > On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll > > > wrote: > > > > > >> Jon, > > >> > > >> there's a related example, using a window store and a key-value store, > > at > > >> https://github.com/confluentinc/examples/blob/3.2.x/kafka- > > >> streams/src/test/java/io/confluent/examples/streams/Val > > >> idateStateWithInteractiveQueriesLambdaIntegrationTest.java > > >> (this is for Confluent 3.2 / Kafka 0.10.2). > > >> > > >> -Michael > > >> > > >> > > >> > > >> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers < > jon.yearg...@cedexis.com > > > > > >> wrote: > > >> > > >> > Im only running one instance (locally) to keep things simple. > > >> > > > >> > Reduction: > > >> > > > >> > KTable, String> hourAggStore = > > >> > sourceStream.groupByKey().reduce(rowReducer, > > >> > TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 * > > >> > 1000).until(70 * 60 * 1000L), > > >> > "HourAggStore"); > > >> > > > >> > then I get values to look for via: > > >> > > > >> > hourAggStore.foreach((k, v) -> { > > >> > LogLine logLine = objectMapper.readValue(v, > > >> logLine.class); > > >> > LOGGER.debug("{}", k.key()); > > >> > }); > > >> > > > >> > Ive kept it easy by requesting everything from 0 to > > >> > 'System.currentTimeMillis()'. Retrieval is done using a snip from > your > > >> > sample code "windowedByKey". > > >> > > > >> > Requests are sent in via curl and output through the same channel. I > > >> pass > > >> > in the key and ask for any values. > > >> > > > >> > Ive looked at the values passed in / out of the reduction function > and > > >> they > > >> > look sane. > > >> > > > >> > My assumption is that if a value shows up in the 'forEach' loop this > > >> > implies it exists in the StateStore. Accurate? > > >> > > > >> > In fact, only about one in 10 requests actually return any values. > No > > >> > errors - just no data. > > >> > > > >> > > > >> > > > >> > On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy > > >> wrote: > > >> > > > >> > > Hi Jon, > > >> > > > > >> > > If you are able to get a handle on the store, i.e., via > > >> > > KafkaStreams.store(...) and call fetch without any exceptions, > then > > >> the > > >> > > store is available. > > >> > > The time params to fetch are the boundaries to search for windows > > for > > >> the > > >> > > given key. They relate to the start time of the window, so if you > > did > > >> > > fetch(key, t1, t2) - it will find all the windows for
Re: Understanding ReadOnlyWindowStore.fetch
To be a bit more specific: If I call this: KTable kt = sourceStream.groupByKey().reduce(..., "somekeystore"); and then call this: kt.forEach()-> ... Can I assume that everything that comes out will be available in "somekeystore"? If not, what subset should I expect to find there? On Wed, Mar 29, 2017 at 8:34 AM, Jon Yeargers wrote: > But if a key shows up in a KTable->forEach should it be available in the > StateStore (from the KTable)? > > On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll > wrote: > >> Jon, >> >> there's a related example, using a window store and a key-value store, at >> https://github.com/confluentinc/examples/blob/3.2.x/kafka- >> streams/src/test/java/io/confluent/examples/streams/Val >> idateStateWithInteractiveQueriesLambdaIntegrationTest.java >> (this is for Confluent 3.2 / Kafka 0.10.2). >> >> -Michael >> >> >> >> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers >> wrote: >> >> > Im only running one instance (locally) to keep things simple. >> > >> > Reduction: >> > >> > KTable, String> hourAggStore = >> > sourceStream.groupByKey().reduce(rowReducer, >> > TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 * >> > 1000).until(70 * 60 * 1000L), >> > "HourAggStore"); >> > >> > then I get values to look for via: >> > >> > hourAggStore.foreach((k, v) -> { >> > LogLine logLine = objectMapper.readValue(v, >> logLine.class); >> > LOGGER.debug("{}", k.key()); >> > }); >> > >> > Ive kept it easy by requesting everything from 0 to >> > 'System.currentTimeMillis()'. Retrieval is done using a snip from your >> > sample code "windowedByKey". >> > >> > Requests are sent in via curl and output through the same channel. I >> pass >> > in the key and ask for any values. >> > >> > Ive looked at the values passed in / out of the reduction function and >> they >> > look sane. >> > >> > My assumption is that if a value shows up in the 'forEach' loop this >> > implies it exists in the StateStore. Accurate? >> > >> > In fact, only about one in 10 requests actually return any values. No >> > errors - just no data. >> > >> > >> > >> > On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy >> wrote: >> > >> > > Hi Jon, >> > > >> > > If you are able to get a handle on the store, i.e., via >> > > KafkaStreams.store(...) and call fetch without any exceptions, then >> the >> > > store is available. >> > > The time params to fetch are the boundaries to search for windows for >> the >> > > given key. They relate to the start time of the window, so if you did >> > > fetch(key, t1, t2) - it will find all the windows for key that start >> in >> > the >> > > inclusive time range t1 - t2. >> > > >> > > Are you running more than one instance? If yes, then you want to make >> > sure >> > > that you are querying the correct instance. For that you can use: >> > > KafkaStreams.metadataForKey(...) to find the instance that has the >> key >> > you >> > > are looking for. >> > > >> > > Thanks, >> > > Damian >> > > >> > > >> > > >> > > On Tue, 28 Mar 2017 at 22:37 Jon Yeargers >> > > wrote: >> > > >> > > > Im probing about trying to find a way to solve my aggregation -> db >> > > issue. >> > > > Looking at the '.fetch()' function Im wondering about the >> 'timeFrom' >> > and >> > > > 'timeTo' params as not a lot is mentioned about 'proper' usage. >> > > > >> > > > The test in >> > > > >> > > > https://github.com/confluentinc/examples/blob/ >> > > master/kafka-streams/src/test/java/io/confluent/examples/ >> > > streams/interactivequeries/WordCountInteractiveQueriesExa >> > > mpleTest.java#L200-L212 >> > > > makes it appear that the params are boundaries and that it will >> return >> > an >> > > > inclusive list of every key/window combination. Truth? >> > > > >> > > > My tests to this end haven't returned anything. >> > > > >> > > > Im watching the values coming out of the KTable so I >> > can >> > > > send them back as request params. What Ive tried: >> > > > >> > > > - Window.key(), Window.key().start() and Window.key().end() >> > > > - Window.key(), (Window.key().start() - 1) and (Window.key().end() >> + 1) >> > > > - Window.key(), 0 and Window.key().end() >> > > > - Window.key(), 0 and (Window.key().end() + 1) >> > > > >> > > > None of these seem to hit anything in the StateStore. >> > > > >> > > > Is there a delay before Store values become available for >> '.fetch()'? >> > > > >> > > >> > >> > >
Re: Understanding ReadOnlyWindowStore.fetch
But if a key shows up in a KTable->forEach should it be available in the StateStore (from the KTable)? On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll wrote: > Jon, > > there's a related example, using a window store and a key-value store, at > https://github.com/confluentinc/examples/blob/3. > 2.x/kafka-streams/src/test/java/io/confluent/examples/streams/ > ValidateStateWithInteractiveQueriesLambdaIntegrationTest.java > (this is for Confluent 3.2 / Kafka 0.10.2). > > -Michael > > > > On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers > wrote: > > > Im only running one instance (locally) to keep things simple. > > > > Reduction: > > > > KTable, String> hourAggStore = > > sourceStream.groupByKey().reduce(rowReducer, > > TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 * > > 1000).until(70 * 60 * 1000L), > > "HourAggStore"); > > > > then I get values to look for via: > > > > hourAggStore.foreach((k, v) -> { > > LogLine logLine = objectMapper.readValue(v, > logLine.class); > > LOGGER.debug("{}", k.key()); > > }); > > > > Ive kept it easy by requesting everything from 0 to > > 'System.currentTimeMillis()'. Retrieval is done using a snip from your > > sample code "windowedByKey". > > > > Requests are sent in via curl and output through the same channel. I pass > > in the key and ask for any values. > > > > Ive looked at the values passed in / out of the reduction function and > they > > look sane. > > > > My assumption is that if a value shows up in the 'forEach' loop this > > implies it exists in the StateStore. Accurate? > > > > In fact, only about one in 10 requests actually return any values. No > > errors - just no data. > > > > > > > > On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy > wrote: > > > > > Hi Jon, > > > > > > If you are able to get a handle on the store, i.e., via > > > KafkaStreams.store(...) and call fetch without any exceptions, then the > > > store is available. > > > The time params to fetch are the boundaries to search for windows for > the > > > given key. They relate to the start time of the window, so if you did > > > fetch(key, t1, t2) - it will find all the windows for key that start in > > the > > > inclusive time range t1 - t2. > > > > > > Are you running more than one instance? If yes, then you want to make > > sure > > > that you are querying the correct instance. For that you can use: > > > KafkaStreams.metadataForKey(...) to find the instance that has the key > > you > > > are looking for. > > > > > > Thanks, > > > Damian > > > > > > > > > > > > On Tue, 28 Mar 2017 at 22:37 Jon Yeargers > > > wrote: > > > > > > > Im probing about trying to find a way to solve my aggregation -> db > > > issue. > > > > Looking at the '.fetch()' function Im wondering about the 'timeFrom' > > and > > > > 'timeTo' params as not a lot is mentioned about 'proper' usage. > > > > > > > > The test in > > > > > > > > https://github.com/confluentinc/examples/blob/ > > > master/kafka-streams/src/test/java/io/confluent/examples/ > > > streams/interactivequeries/WordCountInteractiveQueriesExa > > > mpleTest.java#L200-L212 > > > > makes it appear that the params are boundaries and that it will > return > > an > > > > inclusive list of every key/window combination. Truth? > > > > > > > > My tests to this end haven't returned anything. > > > > > > > > Im watching the values coming out of the KTable so I > > can > > > > send them back as request params. What Ive tried: > > > > > > > > - Window.key(), Window.key().start() and Window.key().end() > > > > - Window.key(), (Window.key().start() - 1) and (Window.key().end() + > 1) > > > > - Window.key(), 0 and Window.key().end() > > > > - Window.key(), 0 and (Window.key().end() + 1) > > > > > > > > None of these seem to hit anything in the StateStore. > > > > > > > > Is there a delay before Store values become available for '.fetch()'? > > > > > > > > > >
Re: Understanding ReadOnlyWindowStore.fetch
Im only running one instance (locally) to keep things simple. Reduction: KTable, String> hourAggStore = sourceStream.groupByKey().reduce(rowReducer, TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 * 1000).until(70 * 60 * 1000L), "HourAggStore"); then I get values to look for via: hourAggStore.foreach((k, v) -> { LogLine logLine = objectMapper.readValue(v, logLine.class); LOGGER.debug("{}", k.key()); }); Ive kept it easy by requesting everything from 0 to 'System.currentTimeMillis()'. Retrieval is done using a snip from your sample code "windowedByKey". Requests are sent in via curl and output through the same channel. I pass in the key and ask for any values. Ive looked at the values passed in / out of the reduction function and they look sane. My assumption is that if a value shows up in the 'forEach' loop this implies it exists in the StateStore. Accurate? In fact, only about one in 10 requests actually return any values. No errors - just no data. On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy wrote: > Hi Jon, > > If you are able to get a handle on the store, i.e., via > KafkaStreams.store(...) and call fetch without any exceptions, then the > store is available. > The time params to fetch are the boundaries to search for windows for the > given key. They relate to the start time of the window, so if you did > fetch(key, t1, t2) - it will find all the windows for key that start in the > inclusive time range t1 - t2. > > Are you running more than one instance? If yes, then you want to make sure > that you are querying the correct instance. For that you can use: > KafkaStreams.metadataForKey(...) to find the instance that has the key you > are looking for. > > Thanks, > Damian > > > > On Tue, 28 Mar 2017 at 22:37 Jon Yeargers > wrote: > > > Im probing about trying to find a way to solve my aggregation -> db > issue. > > Looking at the '.fetch()' function Im wondering about the 'timeFrom' and > > 'timeTo' params as not a lot is mentioned about 'proper' usage. > > > > The test in > > > > https://github.com/confluentinc/examples/blob/ > master/kafka-streams/src/test/java/io/confluent/examples/ > streams/interactivequeries/WordCountInteractiveQueriesExa > mpleTest.java#L200-L212 > > makes it appear that the params are boundaries and that it will return an > > inclusive list of every key/window combination. Truth? > > > > My tests to this end haven't returned anything. > > > > Im watching the values coming out of the KTable so I can > > send them back as request params. What Ive tried: > > > > - Window.key(), Window.key().start() and Window.key().end() > > - Window.key(), (Window.key().start() - 1) and (Window.key().end() + 1) > > - Window.key(), 0 and Window.key().end() > > - Window.key(), 0 and (Window.key().end() + 1) > > > > None of these seem to hit anything in the StateStore. > > > > Is there a delay before Store values become available for '.fetch()'? > > >
Understanding ReadOnlyWindowStore.fetch
Im probing about trying to find a way to solve my aggregation -> db issue. Looking at the '.fetch()' function Im wondering about the 'timeFrom' and 'timeTo' params as not a lot is mentioned about 'proper' usage. The test in https://github.com/confluentinc/examples/blob/master/kafka-streams/src/test/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExampleTest.java#L200-L212 makes it appear that the params are boundaries and that it will return an inclusive list of every key/window combination. Truth? My tests to this end haven't returned anything. Im watching the values coming out of the KTable so I can send them back as request params. What Ive tried: - Window.key(), Window.key().start() and Window.key().end() - Window.key(), (Window.key().start() - 1) and (Window.key().end() + 1) - Window.key(), 0 and Window.key().end() - Window.key(), 0 and (Window.key().end() + 1) None of these seem to hit anything in the StateStore. Is there a delay before Store values become available for '.fetch()'?
WindowStore and retention
How long does a given value persist in a WindowStore? Does it obey the '.until()' param of a windowed aggregation/ reduction? Please say yes.
using a state store for deduplication
Ive been (re)reading this document( http://docs.confluent.io/3.2.0/streams/developer-guide.html#state-stores) hoping to better understand StateStores. At the top of the section there is a tantalizing note implying that one could do deduplication using a store. At present we using Redis for this as it gives us a shared location. Ive been of the mind that a given store was local to a streams instance. To truly support deduplication I would think one would need access to _all_ the data for a topic and not just on a per-partition basis. Am I completely misunderstanding this?
Iterating stream windows
Im hoping to support external queries into a windowed state store aggregator. Thanks to a previous question here I see where to use a ReadOnlyWindowStore but Im not clear on how to define the boundaries for the call. Assumie I have a one hour window with a 5 minute 'slide' between new windows. If an arbitrary request for the 'latest' values comes in I want to return the window thats closest to - but not outside - its 60 minute boundary. To me this implies I need to iterate over available windows (those that haven't hit their 'retention' value). Does such a function exist? I can envision a sol'n using a guava timed cache but Im trying v hard not to break out of the full-kafka world.
Re: YASSQ (yet another state store question)
Also - if I run this on two hosts - what does it imply if the response to 'streams.allMetadata()' from one host includes both instances but the other host only knows about itself? On Sun, Mar 26, 2017 at 5:58 AM, Jon Yeargers wrote: > If the '.state()' function returns "RUNNING" and I still get this > exception? > > On Fri, Mar 24, 2017 at 1:56 PM, Eno Thereska > wrote: > >> Hi Jon, >> >> This is expected, see this: https://groups.google.com/foru >> m/?pli=1#!searchin/confluent-platform/migrated$20to$ >> 20another$20instance%7Csort:relevance/confluent-platform/ >> LglWC_dZDKw/qsPuCRT_DQAJ <https://groups.google.com/for >> um/?pli=1#!searchin/confluent-platform/migrated$20to$ >> 20another$20instance|sort:relevance/confluent-platform/ >> LglWC_dZDKw/qsPuCRT_DQAJ>. >> >> Thanks >> Eno >> > On 24 Mar 2017, at 20:51, Jon Yeargers >> wrote: >> > >> > I've setup a KTable as follows: >> > >> > KTable, String> outTable = sourceStream.groupByKey(). >> > reduce(rowReducer, >> >TimeWindows.of(5 * 60 * 1000L).advanceBy(1 * 60 * >> > 1000).until(10 * 60 * 1000L), >> >"AggStore"); >> > >> > I can confirm its presence via 'streams.allMetadata()' (accessible >> through >> > a simple httpserver). >> > >> > When I call 'ReadOnlyKeyValueStore store = >> > kafkaStreams.store("AggStore", QueryableStoreTypes.keyValueStore());' >> > >> > I get this exception: >> > >> > org.apache.kafka.streams.errors.InvalidStateStoreException: the state >> > store, AggStore, may have migrated to another instance. >> >at >> > org.apache.kafka.streams.state.internals.QueryableStoreProvi >> der.getStore(QueryableStoreProvider.java:49) >> >at >> > org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:378) >> >at >> > com.cedexis.videokafka.videohouraggregator.RequestHandler. >> handle(RequestHandler.java:97) >> >at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79) >> >at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83) >> >at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82) >> >at >> > sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(Se >> rverImpl.java:675) >> >at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79) >> >at sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:6 >> 47) >> >at >> > sun.net.httpserver.ServerImpl$DefaultExecutor.execute(Server >> Impl.java:158) >> >at >> > sun.net.httpserver.ServerImpl$Dispatcher.handle(ServerImpl.java:431) >> >at sun.net.httpserver.ServerImpl$Dispatcher.run(ServerImpl.java >> :396) >> >at java.lang.Thread.run(Thread.java:745) >> > >> > >> > ... except.. there is only one instance.. running locally. >> >> >
Re: YASSQ (yet another state store question)
If the '.state()' function returns "RUNNING" and I still get this exception? On Fri, Mar 24, 2017 at 1:56 PM, Eno Thereska wrote: > Hi Jon, > > This is expected, see this: https://groups.google.com/ > forum/?pli=1#!searchin/confluent-platform/migrated$ > 20to$20another$20instance%7Csort:relevance/confluent- > platform/LglWC_dZDKw/qsPuCRT_DQAJ <https://groups.google.com/ > forum/?pli=1#!searchin/confluent-platform/migrated$ > 20to$20another$20instance|sort:relevance/confluent- > platform/LglWC_dZDKw/qsPuCRT_DQAJ>. > > Thanks > Eno > > On 24 Mar 2017, at 20:51, Jon Yeargers wrote: > > > > I've setup a KTable as follows: > > > > KTable, String> outTable = sourceStream.groupByKey(). > > reduce(rowReducer, > >TimeWindows.of(5 * 60 * 1000L).advanceBy(1 * 60 * > > 1000).until(10 * 60 * 1000L), > >"AggStore"); > > > > I can confirm its presence via 'streams.allMetadata()' (accessible > through > > a simple httpserver). > > > > When I call 'ReadOnlyKeyValueStore store = > > kafkaStreams.store("AggStore", QueryableStoreTypes.keyValueStore());' > > > > I get this exception: > > > > org.apache.kafka.streams.errors.InvalidStateStoreException: the state > > store, AggStore, may have migrated to another instance. > >at > > org.apache.kafka.streams.state.internals.QueryableStoreProvider. > getStore(QueryableStoreProvider.java:49) > >at > > org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:378) > >at > > com.cedexis.videokafka.videohouraggregator.RequestHandler.handle( > RequestHandler.java:97) > >at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79) > >at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83) > >at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82) > >at > > sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle( > ServerImpl.java:675) > >at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79) > >at sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java: > 647) > >at > > sun.net.httpserver.ServerImpl$DefaultExecutor.execute( > ServerImpl.java:158) > >at > > sun.net.httpserver.ServerImpl$Dispatcher.handle(ServerImpl.java:431) > >at sun.net.httpserver.ServerImpl$Dispatcher.run(ServerImpl. > java:396) > >at java.lang.Thread.run(Thread.java:745) > > > > > > ... except.. there is only one instance.. running locally. > >
YASSQ (yet another state store question)
I've setup a KTable as follows: KTable, String> outTable = sourceStream.groupByKey(). reduce(rowReducer, TimeWindows.of(5 * 60 * 1000L).advanceBy(1 * 60 * 1000).until(10 * 60 * 1000L), "AggStore"); I can confirm its presence via 'streams.allMetadata()' (accessible through a simple httpserver). When I call 'ReadOnlyKeyValueStore store = kafkaStreams.store("AggStore", QueryableStoreTypes.keyValueStore());' I get this exception: org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, AggStore, may have migrated to another instance. at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:49) at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:378) at com.cedexis.videokafka.videohouraggregator.RequestHandler.handle(RequestHandler.java:97) at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79) at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83) at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82) at sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:675) at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79) at sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:647) at sun.net.httpserver.ServerImpl$DefaultExecutor.execute(ServerImpl.java:158) at sun.net.httpserver.ServerImpl$Dispatcher.handle(ServerImpl.java:431) at sun.net.httpserver.ServerImpl$Dispatcher.run(ServerImpl.java:396) at java.lang.Thread.run(Thread.java:745) ... except.. there is only one instance.. running locally.
Re: APPLICATION_SERVER_CONFIG ?
You make some great cases for your architecture. To be clear - Ive been proselytizing for kafka since I joined this company last year. I think my largest issue is rethinking some preexisting notions about streaming to make them work in the kstream universe. On Fri, Mar 24, 2017 at 6:07 AM, Michael Noll wrote: > > If I understand this correctly: assuming I have a simple aggregator > > distributed across n-docker instances each instance will _also_ need to > > support some sort of communications process for allowing access to its > > statestore (last param from KStream.groupby.aggregate). > > Yes. > > See > http://docs.confluent.io/current/streams/developer- > guide.html#your-application-and-interactive-queries > . > > > - The tombstoning facilities of redis or C* would lend themselves well to > > implementing a 'true' rolling aggregation > > What is a 'true' rolling aggregation, and how could Redis or C* help with > that in a way that Kafka can't? (Honest question.) > > > > I get that RocksDB has a small footprint but given the choice of > > implementing my own RPC / gossip-like process for data sharing and using > a > > well tested one (ala C* or redis) I would almost always opt for the > latter. > > [...] > > Just my $0.02. I would love to hear why Im missing the 'big picture'. The > > kstreams architecture seems rife with potential. > > One question is, for example: can the remote/central DB of your choice > (Redis, C*) handle as many qps as Kafka/Kafka Streams/RocksDB can handle? > Over the network? At the same low latency? Also, what happens if the > remote DB is unavailable? Do you wait and retry? Discard? Accept the > fact that your app's processing latency will now go through the roof? I > wrote about some such scenarios at > https://www.confluent.io/blog/distributed-real-time-joins- > and-aggregations-on-user-activity-events-using-kafka-streams/ > . > > One big advantage (for many use cases, not for) with Kafka/Kafka Streams is > that you can leverage fault-tolerant *local* state that may also be > distributed across app instances. Local state is much more efficient and > faster when doing stateful processing such as joins or aggregations. You > don't need to worry about an external system, whether it's up and running, > whether its version is still compatible with your app, whether it can scale > as much as your app/Kafka Streams/Kafka/the volume of your input data. > > Also, note that some users have actually opted to run hybrid setups: Some > processing output is sent to a remote data store like Cassandra (e.g. via > Kafka Connect), some processing output is exposed directly through > interactive queries. It's not like your forced to pick only one approach. > > > > - Typical microservices would separate storing / retrieving data > > I'd rather argue that for microservices you'd oftentimes prefer to *not* > use a remote DB, and rather do everything inside your microservice whatever > the microservice needs to do (perhaps we could relax this to "do everything > in a way that your microservices is in full, exclusive control", i.e. it > doesn't necessarily need to be *inside*, but arguably it would be better if > it actually is). > See e.g. the article > https://www.confluent.io/blog/data-dichotomy-rethinking-the- > way-we-treat-data-and-services/ > that lists some of the reasoning behind this school of thinking. Again, > YMMV. > > Personally, I think there's no simple true/false here. The decisions > depend on what you need, what your context is, etc. Anyways, since you > already have some opinions for the one side, I wanted to share some food > for thought for the other side of the argument. :-) > > Best, > Michael > > > > > > On Fri, Mar 24, 2017 at 1:25 PM, Jon Yeargers > wrote: > > > If I understand this correctly: assuming I have a simple aggregator > > distributed across n-docker instances each instance will _also_ need to > > support some sort of communications process for allowing access to its > > statestore (last param from KStream.groupby.aggregate). > > > > How would one go about substituting a separated db (EG redis) for the > > statestore? > > > > Some advantages to decoupling: > > - It would seem like having a centralized process like this would > alleviate > > the need to execute multiple requests for a given kv pair (IE "who has > this > > data?" and subsequent requests to retrieve it). > > - it would take some pressure off of each node to maintain a large disk > > store > > - Typical microservices
Re: APPLICATION_SERVER_CONFIG ?
If I understand this correctly: assuming I have a simple aggregator distributed across n-docker instances each instance will _also_ need to support some sort of communications process for allowing access to its statestore (last param from KStream.groupby.aggregate). How would one go about substituting a separated db (EG redis) for the statestore? Some advantages to decoupling: - It would seem like having a centralized process like this would alleviate the need to execute multiple requests for a given kv pair (IE "who has this data?" and subsequent requests to retrieve it). - it would take some pressure off of each node to maintain a large disk store - Typical microservices would separate storing / retrieving data - It would raise some eyebrows if a spec called for a mysql/nosql instance to be installed with every docker container - The tombstoning facilities of redis or C* would lend themselves well to implementing a 'true' rolling aggregation I get that RocksDB has a small footprint but given the choice of implementing my own RPC / gossip-like process for data sharing and using a well tested one (ala C* or redis) I would almost always opt for the latter. (Footnote: Our implementations already heavily use redis/memcached for deduplication of kafka messages so it would seem a small step to use the same to store aggregation results.) Just my $0.02. I would love to hear why Im missing the 'big picture'. The kstreams architecture seems rife with potential. On Thu, Mar 23, 2017 at 3:17 PM, Matthias J. Sax wrote: > The config does not "do" anything. It's metadata that get's broadcasted > to other Streams instances for IQ feature. > > See this blog post for more details: > https://www.confluent.io/blog/unifying-stream-processing- > and-interactive-queries-in-apache-kafka/ > > Happy to answer any follow up question. > > > -Matthias > > On 3/23/17 11:51 AM, Jon Yeargers wrote: > > What does this config param do? > > > > I see it referenced / used in some samples and here ( > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 67%3A+Queryable+state+for+Kafka+Streams > > ) > > > >
APPLICATION_SERVER_CONFIG ?
What does this config param do? I see it referenced / used in some samples and here ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams )
Getting current value of aggregated key
If I have an aggregation : KTable, VideoLogLine> outTable = sourceStream.groupByKey().reduce(rowReducer, TimeWindows.of(60 * 60 * 1000L).until(10 * 60 * 1000L), "HourAggStore"); how would I go about getting some value from this with a separate process? I have the "HourAggStore" but Im not clear how to retrieve anything from it.
Re: clearing an aggregation?
I get that the windows are aligned along seconds but this doesn't really help with true clock alignment (IE top of the hour, midnight, etc). I can imagine a strategy using overlapping windows. One would (hypothetically) walk through the list until a window that spanned the desired time was found. Since apparently there isn't a way to iterate through Windowed KTables Im guessing that this sort of 'aggregate and clear' approach still requires an external datastore (like Redis). Please correct me if Im wrong. On Mon, Mar 20, 2017 at 9:29 AM, Michael Noll wrote: > Jon, > > the windowing operation of Kafka's Streams API (in its DSL) aligns > time-based windows to the epoch [1]: > > Quoting from e.g. hopping windows (sometimes called sliding windows in > other technologies): > > > Hopping time windows are aligned to the epoch, with the lower interval > bound > > being inclusive and the upper bound being exclusive. “Aligned to the > epoch” > > means that the first window starts at timestamp zero. > > For example, hopping windows with a size of 5000ms and an advance > interval > > (“hop”) of 3000ms have predictable window boundaries > `[0;5000),[3000;8000),...` > > — and not `[1000;6000),[4000;9000),...` or even something “random” like > > `[1452;6452),[4452;9452),...`. > > Would that help you? > > -Michael > > > > [1] http://docs.confluent.io/current/streams/developer-guide.html > > > On Mon, Mar 20, 2017 at 12:51 PM, Jon Yeargers > wrote: > > > Is this possible? Im wondering about gathering data from a stream into a > > series of windowed aggregators: minute, hour and day. A separate process > > would start at fixed intervals, query the appropriate state store for > > available values and then hopefully clear / zero / reset everything for > the > > next interval. > > > > I could use the retention period setting but I would (somehow) need to > > guarantee that the windows would reset on clock boundaries and not based > on > > start time for the app. > > >
clearing an aggregation?
Is this possible? Im wondering about gathering data from a stream into a series of windowed aggregators: minute, hour and day. A separate process would start at fixed intervals, query the appropriate state store for available values and then hopefully clear / zero / reset everything for the next interval. I could use the retention period setting but I would (somehow) need to guarantee that the windows would reset on clock boundaries and not based on start time for the app.
understanding consumer rebalance trigger(s)
Im wondering what the parameters are to instantiate a consumer rebalance. I have a topic that turns roughly 50K / minute across 6 partitions. Each is serviced by a separate dockerized consumer. Roughly every 8-12 min this goes into a rebalance that may take up to a minute. When it returns it often puts some or all partitions on to a single consumer (leaving others idle). This may persist for a minute while it tries another arrangement. Eventually after 2-3 tries it will evenly distribute the partitions.. for a few minutes until it does another misguided attempt. As a result we have lag increasing from 0 to ~450K and back to 0 on a cycle. The data rate is assumed to be roughly consistent through these cycles. Resultant graph of lag is a sawtooth shape. Using 0.10.0.1 3 brokers Also - is there some way to set / control consumer 'assignment'? Or to 'suggest' a setting?
Re: hitting the throughput limit on a cluster?
Thanks for looking at this issue. I checked the max IOPs for this disk and we're only at about 10%. I can add more disks to spread out the work. What IOWait values should I be aiming for? Also - what do you set openfiles to? I have it at 65535 but I just read a doc that suggested > 100K is better On Tue, Feb 21, 2017 at 10:45 AM, Todd Palino wrote: > So I think the important thing to look at here is the IO wait on your > system. You’re hitting disk throughput issues, and that’s what you most > likely need to resolve. So just from what you’ve described, I think the > only thing that is going to get you more performance is more spindles (or > faster spindles). This is either more disks or more brokers, but at the end > of it you need to eliminate the disk IO bottleneck. > > -Todd > > > On Tue, Feb 21, 2017 at 7:29 AM, Jon Yeargers > wrote: > > > Running 3x 8core on google compute. > > > > Topic has 16 partitions (replication factor 2) and is consumed by 16 > docker > > containers on individual hosts. > > > > System seems to max out at around 4 messages / minute. Each message > is > > ~12K - compressed (snappy) JSON. > > > > Recently moved from 12 to the above 16 partitions with no change in > > throughput. > > > > Also tried increased the consumption capacity on each container by 50%. > No > > effect. > > > > Network is running at ~6Gb/sec (measured using iperf3). Broker load is > > ~1.5. IOWait % is 5-10 (via sar). > > > > What are my options for adding throughput? > > > > - more brokers? > > - avro/protobuf messaging? > > - more disks / broker? (1 / host presently) > > - jumbo frames? > > > > (transparent huge pages is disabled) > > > > > > Looking at this article ( > > https://engineering.linkedin.com/kafka/benchmarking-apache- > > kafka-2-million-writes-second-three-cheap-machines) > > it would appear that for our message size we are at the max. This would > > argue that we need to shrink the message size - so perhaps switching to > > avro is the next step? > > > > > > -- > *Todd Palino* > Staff Site Reliability Engineer > Data Infrastructure Streaming > > > > linkedin.com/in/toddpalino >
hitting the throughput limit on a cluster?
Running 3x 8core on google compute. Topic has 16 partitions (replication factor 2) and is consumed by 16 docker containers on individual hosts. System seems to max out at around 4 messages / minute. Each message is ~12K - compressed (snappy) JSON. Recently moved from 12 to the above 16 partitions with no change in throughput. Also tried increased the consumption capacity on each container by 50%. No effect. Network is running at ~6Gb/sec (measured using iperf3). Broker load is ~1.5. IOWait % is 5-10 (via sar). What are my options for adding throughput? - more brokers? - avro/protobuf messaging? - more disks / broker? (1 / host presently) - jumbo frames? (transparent huge pages is disabled) Looking at this article ( https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines) it would appear that for our message size we are at the max. This would argue that we need to shrink the message size - so perhaps switching to avro is the next step?
conflicts between consumer groups? seeing many duplicate records
Is it possible that using the same group name for two topics could cause a conflict? I have a situation where Im seeing vast numbers of records (more than 2x) get duplicated in a topic. I was looking at consumer lag using 'kafka-consumer-groups ... --new-consumer' and noticed that I had another app using the same group name for a different (much smaller) topic. Would increasing partitions have an effect like this? Added 33% more partitions to a topic yesterday and all sorts of havoc occurred. using 0.10.1.0
Re: KTable and cleanup.policy=compact
If Im doing a KStream.leftJoin(KTable) how would I set this configuration for just the KTable portion? IE I have KStream = KStreamBuilder.stream() KTable = KStreamBuilder.table() ... (join occurs.. data flows.. ppl are brought closer together.. there is peace in the valley.. for me... ) ... KafkaStreams = new KafkaStream(KStreamBuilder, config_with_cleanup_policy_or_not?) KafkaStream.start On Wed, Feb 8, 2017 at 12:30 PM, Eno Thereska wrote: > Yeah makes sense. I was looking at it from the point of view of keeping > all data forever. > > Eno > > > On 8 Feb 2017, at 20:27, Matthias J. Sax wrote: > > > > Yes, that could happen if a key was not updated for a longer period than > > topic retention time. > > > > If you want to force a changelog creation, you can do a dummy aggregate > > instead of using KStreamBuilder#table() > > > > > >> KTable table = KStreamBuilder.stream("topic").groupByKey().reduce(new > Reducer() { > >>@Override > >>public Object apply(Object oldValue, Object newValue) { > >>return newValue; > >>} > >> }, "someStoreName"); > > > > > > -Matthias > > > > > > On 2/8/17 11:39 AM, Mathieu Fenniak wrote: > >> I think there could be correctness implications... the default > >> cleanup.policy of delete would mean that topic entries past the > retention > >> policy might have been removed. If you scale up the application, new > >> application instances won't be able to restore a complete table into its > >> local state store. An operation like a join against that KTable would > find > >> no records where there should be record. > >> > >> Mathieu > >> > >> > >> On Wed, Feb 8, 2017 at 12:15 PM, Eno Thereska > >> wrote: > >> > >>> If you fail to set the policy to compact, there shouldn't be any > >>> correctness implications, however your topics will grow larger than > >>> necessary. > >>> > >>> Eno > >>> > >>>> On 8 Feb 2017, at 18:56, Jon Yeargers > wrote: > >>>> > >>>> What are the ramifications of failing to do this? > >>>> > >>>> On Tue, Feb 7, 2017 at 9:16 PM, Matthias J. Sax < > matth...@confluent.io> > >>>> wrote: > >>>> > >>>>> Yes, that is correct. > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> > >>>>> On 2/7/17 6:39 PM, Mathieu Fenniak wrote: > >>>>>> Hey kafka users, > >>>>>> > >>>>>> Is it correct that a Kafka topic that is used for a KTable should be > >>> set > >>>>> to > >>>>>> cleanup.policy=compact? > >>>>>> > >>>>>> I've never noticed until today that the KStreamBuilder#table() > >>>>>> documentation says: "However, no internal changelog topic is created > >>>>> since > >>>>>> the original input topic can be used for recovery"... [1], which > seems > >>>>> like > >>>>>> it is only true if the topic is configured for compaction. > Otherwise > >>> the > >>>>>> original input topic won't necessarily contain the data necessary > for > >>>>>> recovery of the state store. > >>>>>> > >>>>>> [1] > >>>>>> https://github.com/apache/kafka/blob/e108a8b4ed4512b021f9326cf07951 > >>>>> 7523c83060/streams/src/main/java/org/apache/kafka/streams/ > >>>>> kstream/KStreamBuilder.java#L355 > >>>>>> > >>>>>> Thanks, > >>>>>> > >>>>>> Mathieu > >>>>>> > >>>>> > >>>>> > >>> > >>> > >> > > > >
Re: KTable and cleanup.policy=compact
What are the ramifications of failing to do this? On Tue, Feb 7, 2017 at 9:16 PM, Matthias J. Sax wrote: > Yes, that is correct. > > > -Matthias > > > On 2/7/17 6:39 PM, Mathieu Fenniak wrote: > > Hey kafka users, > > > > Is it correct that a Kafka topic that is used for a KTable should be set > to > > cleanup.policy=compact? > > > > I've never noticed until today that the KStreamBuilder#table() > > documentation says: "However, no internal changelog topic is created > since > > the original input topic can be used for recovery"... [1], which seems > like > > it is only true if the topic is configured for compaction. Otherwise the > > original input topic won't necessarily contain the data necessary for > > recovery of the state store. > > > > [1] > > https://github.com/apache/kafka/blob/e108a8b4ed4512b021f9326cf07951 > 7523c83060/streams/src/main/java/org/apache/kafka/streams/ > kstream/KStreamBuilder.java#L355 > > > > Thanks, > > > > Mathieu > > > >
"auto offset commit failed"
This message seems to come and go for various consumers: WARN o.a.k.c.c.i.ConsumerCoordinator - Auto offset commit failed for group : Commit offsets failed with retriable exception. You should retry committing offsets. Since Im not tracking offsets - how would I go about retrying them?
Re: Understanding output of KTable->KTable join
If I KStream.leftJoin(Ktable) this article ( https://www.confluent.io/blog/distributed-real-time-joins-and-aggregations-on-user-activity-events-using-kafka-streams/) seems to suggest that I could have one:many. (ktable:kstream) Accurate? On Mon, Jan 30, 2017 at 4:35 PM, Matthias J. Sax wrote: > If you join two KTables, one-to-many join is currently not supported > (only one-to-one, ie, primary key join). > > In upcoming 0.10.2 there will be global-KTables that allow something > similar to one-to many joins -- however, only for KStream-GlobalKTable > joins, so not sure if this can help you. > > About : yes, it indicates that there was no join computed, > because no matching key was found. Cf. > https://cwiki.apache.org/confluence/display/KAFKA/ > Kafka+Streams+Join+Semantics > > Not sure what your keys are, the output you shared is hard to read... > (eg., 20bebc12136be4226b29c5d1b6183d8ed2b117c5) > > > We might add one-to-many KTable-GlobalKTable joins in 0.10.3 though. For > now, you would need to build a custom Processor and implement the join > by yourself. > > There is another JIRA for foreign-key join feature (unrelated to > GlobalKTable): https://issues.apache.org/jira/browse/KAFKA-3705 > > Maybe the discussion helps you do implement you own join. > > > -Matthias > > On 1/30/17 11:05 AM, Jon Yeargers wrote: > > I want to do a one:many join between two streams. There should be ~ 1:100 > > with < 1% having no match. > > > > My topology is relatively simple: > > > > KTable1.join(KTable2)->to("other topic") > >\ > > \---> toStream().print() > > > > In the join it takes both Value1 and Value2 as JSON, converts them back > to > > Java Objects and combines them. This is returned as the JSON > representation > > of a new Object. > > > > If either value was NULL or unable to convert back to its source Object > an > > exception would be thrown. > > > > The output sent to the debugger looks like this for many thousands of > rows > > > > [KTABLE-TOSTREAM-09]: 20bebc12136be4226b29c5d1b6183d8ed2b117c5 , > > null > > [KTABLE-TOSTREAM-09]: c6f038b5182b8a2409a52be71f171d54e3b4 , > > null > > [KTABLE-TOSTREAM-09]: f4b0aa0516c37c2725ce409cc5766df9a942950f , > > null > > [KTABLE-TOSTREAM-09]: e7d8912ac1b660d21d1dd94955386fb9561abbab , > > null > > > > Then I will get many more that are matched. > > > > Questions: > > > > 1. Im assuming the ",null" indicates no match was found. This is a > problem. > > The source of the data is well understood and is < 1% unmatched. If > either > > object is null it throws an exception - which is doesn't. > > 2. Is this the appropriate way to do a one:many join? > > > >
Understanding output of KTable->KTable join
I want to do a one:many join between two streams. There should be ~ 1:100 with < 1% having no match. My topology is relatively simple: KTable1.join(KTable2)->to("other topic") \ \---> toStream().print() In the join it takes both Value1 and Value2 as JSON, converts them back to Java Objects and combines them. This is returned as the JSON representation of a new Object. If either value was NULL or unable to convert back to its source Object an exception would be thrown. The output sent to the debugger looks like this for many thousands of rows [KTABLE-TOSTREAM-09]: 20bebc12136be4226b29c5d1b6183d8ed2b117c5 , null [KTABLE-TOSTREAM-09]: c6f038b5182b8a2409a52be71f171d54e3b4 , null [KTABLE-TOSTREAM-09]: f4b0aa0516c37c2725ce409cc5766df9a942950f , null [KTABLE-TOSTREAM-09]: e7d8912ac1b660d21d1dd94955386fb9561abbab , null Then I will get many more that are matched. Questions: 1. Im assuming the ",null" indicates no match was found. This is a problem. The source of the data is well understood and is < 1% unmatched. If either object is null it throws an exception - which is doesn't. 2. Is this the appropriate way to do a one:many join?
Re: Strategy for true random producer keying
(cont'd) meant to say mod%partition count of System.currentTimeMillis(). Having said that - is there any disadvantage to true random distribution of traffic for a topic? On Tue, Jan 24, 2017 at 11:17 AM, Jon Yeargers wrote: > It may be picking a random partition but it sticks with it indefinitely > despite there being a significant disparity in traffic. I need to break it > up in some different fashion. Maybe just a hash of > System.currentTimeMillis()? > > > > On Tue, Jan 24, 2017 at 10:52 AM, Avi Flax > wrote: > >> >> > On Jan 24, 2017, at 11:18, Jon Yeargers >> wrote: >> > >> > If I don't specify a key when I call send a value to kafka (something >> akin >> > to 'kafkaProducer.send(new ProducerRecord<>(TOPIC_PRODUCE, >> jsonView))') how >> > is it keyed? >> >> IIRC, in this case the key is null; i.e. there is no key. >> >> > I am producing to a topic from an external feed. It appears to be >> heavily >> > biased towards certain values and as a result I have 2-3 partitions that >> > are lagging heavily where the rest are staying current. >> >> Hmm, according to the docs this shouldn’t matter: >> >> > If the key is null, then a random broker partition is picked. >> >> https://kafka.apache.org/documentation/#impl_producer >> >> You might want to double-check your code and confirm that it is indeed >> sending no keys… i.e. maybe it’s actually using an empty string as a key, >> or something like that. >> >> > Since I don't use >> > the keys in my consumers Im wondering if I could randomize these values >> > somehow to better distribute the load. >> >> As per the above docs, this _should_ already be the case, based on what >> you’ve described. >> >> That said, if you continue to have trouble, then you can introduce your >> own implementation of kafka.producer.Partitioner, and again as per the docs: >> >> > A custom partitioning strategy can also be plugged in using the >> partitioner.class config parameter. >> >> Also, it so happens that I have implemented a custom random partitioning >> strategy through an alternate approach by using the overloaded >> ProducerRecord constructor that accepts a partition ID. You can easily get >> the set of partition IDs from the Producer with the partitionsFor method. >> >> HTH! >> Avi >> >> >> Software Architect @ Park Assist » http://tech.parkassist.com/ > > >
Re: Strategy for true random producer keying
It may be picking a random partition but it sticks with it indefinitely despite there being a significant disparity in traffic. I need to break it up in some different fashion. Maybe just a hash of System.currentTimeMillis()? On Tue, Jan 24, 2017 at 10:52 AM, Avi Flax wrote: > > > On Jan 24, 2017, at 11:18, Jon Yeargers > wrote: > > > > If I don't specify a key when I call send a value to kafka (something > akin > > to 'kafkaProducer.send(new ProducerRecord<>(TOPIC_PRODUCE, jsonView))') > how > > is it keyed? > > IIRC, in this case the key is null; i.e. there is no key. > > > I am producing to a topic from an external feed. It appears to be heavily > > biased towards certain values and as a result I have 2-3 partitions that > > are lagging heavily where the rest are staying current. > > Hmm, according to the docs this shouldn’t matter: > > > If the key is null, then a random broker partition is picked. > > https://kafka.apache.org/documentation/#impl_producer > > You might want to double-check your code and confirm that it is indeed > sending no keys… i.e. maybe it’s actually using an empty string as a key, > or something like that. > > > Since I don't use > > the keys in my consumers Im wondering if I could randomize these values > > somehow to better distribute the load. > > As per the above docs, this _should_ already be the case, based on what > you’ve described. > > That said, if you continue to have trouble, then you can introduce your > own implementation of kafka.producer.Partitioner, and again as per the docs: > > > A custom partitioning strategy can also be plugged in using the > partitioner.class config parameter. > > Also, it so happens that I have implemented a custom random partitioning > strategy through an alternate approach by using the overloaded > ProducerRecord constructor that accepts a partition ID. You can easily get > the set of partition IDs from the Producer with the partitionsFor method. > > HTH! > Avi > > > Software Architect @ Park Assist » http://tech.parkassist.com/
Strategy for true random producer keying
If I don't specify a key when I call send a value to kafka (something akin to 'kafkaProducer.send(new ProducerRecord<>(TOPIC_PRODUCE, jsonView))') how is it keyed? I am producing to a topic from an external feed. It appears to be heavily biased towards certain values and as a result I have 2-3 partitions that are lagging heavily where the rest are staying current. Since I don't use the keys in my consumers Im wondering if I could randomize these values somehow to better distribute the load.
Re: Messages are lost
Make sure you don't have an orphaned process holding onto the various kafka/zk folders. If it won't respond and you can't kill it then this might have happened. On Tue, Jan 24, 2017 at 6:46 AM, Ghosh, Achintya (Contractor) < achintya_gh...@comcast.com> wrote: > Can anyone please answer this? > > Thanks > Achintya > > -Original Message- > From: Ghosh, Achintya (Contractor) [mailto:achintya_gh...@comcast.com] > Sent: Monday, January 23, 2017 1:51 PM > To: users@kafka.apache.org > Subject: RE: Messages are lost > > Version 0.10 and I don’t have the thread dump but have the KafkaServer log > where the error is there. > > Thanks > Achintya > > -Original Message- > From: Apurva Mehta [mailto:apu...@confluent.io] > Sent: Monday, January 23, 2017 12:49 PM > To: users@kafka.apache.org > Subject: Re: Messages are lost > > What version of kafka have you deployed? Can you post a thread dump of the > hung broker? > > On Fri, Jan 20, 2017 at 12:14 PM, Ghosh, Achintya (Contractor) < > achintya_gh...@comcast.com> wrote: > > > Hi there, > > > > I see the below exception in one of my node's log( cluster with 3 > > nodes) and then the node is stopped to responding(it's hung state , I > > mean if I do > > ps-ef|grep kafka , I see the Kafka process but it is not responding) > > ps-ef|and we > > lost around 100 messages: > > > > > > 1. What could be the reason for this exception ? My broker ID is > > unique so what is the solution for this issue? > > > > [2017-01-19 15:56:23,644] ERROR Error handling event ZkEvent[New > > session event sent to > > kafka.server.KafkaHealthcheck$SessionExpireListener@2d74e7af] > > (org.I0Itec.zkclient.ZkEventThread) > > java.lang.RuntimeException: A broker is already registered on the path > > /brokers/ids/2. This probably indicates that you either have > > configured a brokerid that is already in use, or else you have > > shutdown this broker and restarted it faster than the zookeeper > > timeout so it appears to be re-registering. > > at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils. > > scala:305) > > at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils. > > scala:291) > > at kafka.server.KafkaHealthcheck. > > register(KafkaHealthcheck.scala:70) > > at kafka.server.KafkaHealthcheck$SessionExpireListener. > > handleNewSession(KafkaHealthcheck.scala:104) > > at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735) > > at org.I0Itec.zkclient.ZkEventThread.run( > > ZkEventThread.java:71) > > > > > > > > 2. As we lost 100 messages for each topic and I don't see any > > exception in our application log, so how we can track the exception > > and will make sure the we'll not loose any data(consumer end). > > > > Thanks > > Achintya > > >
Re: Consumer not associating? Perpetual rebalance
FWIW - (for some distant observer): I think my topic / consumer was too slow for the default commit interval. I added these lines to the above config and it seems to be working ok: // These are likely the default but Im adding them ... anyway... consumerProperties.put("enable.auto.commit", "true"); consumerProperties.put("auto.commit.interval.ms", "1000"); // this is the critical view (I think) consumerProperties.put("max.poll.records", "10"); On Tue, Jan 10, 2017 at 8:55 AM, Jon Yeargers wrote: > Single app with single consumer. Pulling ~30 records / min. > > When I enter 'kafka-topics ... --new-consumer --group > --describe' it always tells me "Consumer group is rebalancing". > > If I enter "kafka-consumer-offset-checker ...--topic --group > "it responds with appropriate consumer position(s) but tells > me "owner" is "none". > > I know my app is consuming records and if I stop/start it it picks up > where it left off. > > Why is it marked as 'rebalancing'? > > > > Consumer setup: > > > props.put("bootstrap.servers", Main.KAFKA_IP); > props.put("group.id", groupId); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("value.deserializer", StringDeserializer.class. > getName()); > props.put("session.timeout.ms", "6"); > props.put("request.timeout.ms", "60001"); > > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > > Using 0.10.1.0 brokers > 0.10.2.0-SNAPSHOT (current trunk) as client library >
Consumer not associating? Perpetual rebalance
Single app with single consumer. Pulling ~30 records / min. When I enter 'kafka-topics ... --new-consumer --group --describe' it always tells me "Consumer group is rebalancing". If I enter "kafka-consumer-offset-checker ...--topic --group "it responds with appropriate consumer position(s) but tells me "owner" is "none". I know my app is consuming records and if I stop/start it it picks up where it left off. Why is it marked as 'rebalancing'? Consumer setup: props.put("bootstrap.servers", Main.KAFKA_IP); props.put("group.id", groupId); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); props.put("session.timeout.ms", "6"); props.put("request.timeout.ms", "60001"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); Using 0.10.1.0 brokers 0.10.2.0-SNAPSHOT (current trunk) as client library
Re: What to do when it won't rebalance "properly"
Along these same lines - I have a topic with a single consumer. When I try to look at lag ("kafka-topics .. --group ... --describe") I get the message "Consumer group is rebalancing" This continues in perpetuity despite stop/(re)start the consumer and cycle all (3) brokers. On Sat, Jan 7, 2017 at 7:48 AM, Jon Yeargers wrote: > Have been messing about with Kubernetes on google-cloud. Launched a pod > with 6 consumer nodes and watched the lag using 'kafka-topics .. > --new-consumer --describe'. Topic has assigned all (12 in this case) nodes > to the same consumer while the other 5 are sitting idle. > > This has been the case for ~20 minutes now. No apparent change forthcoming. > > I even tried killing the one consumer to see if it would trigger a > "proper" rebalance. Nope. It went back and assigned everyone to another > (single) consumer. > > Using 0.10.2.0-SNAPSHOT - latest from trunk > > (also tried with 0.10.1.0) > >
What to do when it won't rebalance "properly"
Have been messing about with Kubernetes on google-cloud. Launched a pod with 6 consumer nodes and watched the lag using 'kafka-topics .. --new-consumer --describe'. Topic has assigned all (12 in this case) nodes to the same consumer while the other 5 are sitting idle. This has been the case for ~20 minutes now. No apparent change forthcoming. I even tried killing the one consumer to see if it would trigger a "proper" rebalance. Nope. It went back and assigned everyone to another (single) consumer. Using 0.10.2.0-SNAPSHOT - latest from trunk (also tried with 0.10.1.0)
0.10.2.0-SNAPSHOT - rocksdb exception(s)
2017-01-01 18:19:13,206 [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group RtDetailBreako utProcessor failed on partition assignment org.apache.kafka.streams.errors.ProcessorStateException: Error opening store table_stream-201701011700 at location /mnt/RtDetailBreakoutProcessor/RtDetailBreakoutProcessor/0_1/table_stream/ table_stream-201701011700 at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:187) at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:156) at org.apache.kafka.streams.state.internals.RocksDBWindowStore$Segment.openDB(RocksDBWindowStore.java:72) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.getOrCreateSegment(RocksDBWindowStore.java:388) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.putInternal(RocksDBWindowStore.java:319) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.access$000(RocksDBWindowStore.java:51) at org.apache.kafka.streams.state.internals.RocksDBWindowStore$1.restore(RocksDBWindowStore.java:206) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:238) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:122) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:200) at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:65) at org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:65) at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:794) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1222) at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1195) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:897) at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:71) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:240) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1039) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1004) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:570) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:359) Caused by: org.rocksdb.RocksDBException: IO error: lock /mnt/RtDetailBreakoutProcessor/RtDetailBreakoutProcessor/0_1/table_stream/table_stream-201701011700/LOCK: No locks available at org.rocksdb.RocksDB.open(Native Method) at org.rocksdb.RocksDB.open(RocksDB.java:184) at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:180)
0.10.2.0-SNAPSHOT - "log end offset should not change while restoring"
java.lang.IllegalStateException: task [0_6] Log end offset of RtDetailBreakoutProcessor-table_stream-changelog-6 should not change while restoring: old end offset 26883455, current offset 2 6883467 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:248) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:122) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:200) at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:65) at org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:65) at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:794) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1222) at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1195) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:897) at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:71) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:240) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1039) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1004) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:570) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:359)
EOF exceptions - 0.10.2.0-SNAPSHOT
(Ive been testing against the latest from github as 0.10.1.1 is too buggy) Seeing quite a few of these this morning: 2017-01-01 16:56:53,299 [StreamThread-1] DEBUG o.a.kafka.common.network.Selector - Connection with / disconnected java.io.EOFException: null at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:341) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:235) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:181) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:309) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1039) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1004) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:570) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:359)
Re: Interesting error message du jour
K - so no big deal. TY On Fri, Dec 30, 2016 at 12:13 PM, Ewen Cheslack-Postava wrote: > Jon, > > This looks the same as https://issues.apache.org/jira/browse/KAFKA-4563, > although for a different invalid transition. The temporary fix suggested > there is to simply convert the exception to log a warning, which should be > a pretty trivial patch against trunk. It seems there are some transitions > that haven't fully been thought through even if they may actually be valid, > so patching this for now may be the easiest way to unblock your > development. > > -Ewen > > On Fri, Dec 30, 2016 at 9:45 AM, Jon Yeargers > wrote: > > > Attaching the debug log > > > > On Fri, Dec 30, 2016 at 6:39 AM, Jon Yeargers > > wrote: > > > >> Using 0.10.2.0-snapshot: > >> > >> java.lang.IllegalStateException: Incorrect state transition from > >> ASSIGNING_PARTITIONS to ASSIGNING_PARTITIONS > >> > >> at org.apache.kafka.streams.processor.internals.StreamThread. > >> setState(StreamThread.java:163) > >> > >> at org.apache.kafka.streams.processor.internals.StreamThread.se > >> tStateWhenNotInPendingShutdown(StreamThread.java:175) > >> > >> at org.apache.kafka.streams.processor.internals.StreamThread. > >> access$200(StreamThread.java:71) > >> > >> at org.apache.kafka.streams.processor.internals.StreamThread$1. > >> onPartitionsAssigned(StreamThread.java:234) > >> > >> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina > >> tor.onJoinComplete(ConsumerCoordinator.java:230) > >> > >> at org.apache.kafka.clients.consumer.internals.AbstractCoordina > >> tor.joinGroupIfNeeded(AbstractCoordinator.java:314) > >> > >> at org.apache.kafka.clients.consumer.internals.AbstractCoordina > >> tor.ensureActiveGroup(AbstractCoordinator.java:278) > >> > >> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina > >> tor.poll(ConsumerCoordinator.java:261) > >> > >> at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce( > >> KafkaConsumer.java:1039) > >> > >> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaCo > >> nsumer.java:1004) > >> > >> at org.apache.kafka.streams.processor.internals.StreamThread. > >> runLoop(StreamThread.java:569) > >> > >> at org.apache.kafka.streams.processor.internals.StreamThread. > >> run(StreamThread.java:358) > >> > > > > >
Re: Interesting error message du jour
Attaching the debug log On Fri, Dec 30, 2016 at 6:39 AM, Jon Yeargers wrote: > Using 0.10.2.0-snapshot: > > java.lang.IllegalStateException: Incorrect state transition from > ASSIGNING_PARTITIONS to ASSIGNING_PARTITIONS > > at org.apache.kafka.streams.processor.internals. > StreamThread.setState(StreamThread.java:163) > > at org.apache.kafka.streams.processor.internals.StreamThread. > setStateWhenNotInPendingShutdown(StreamThread.java:175) > > at org.apache.kafka.streams.processor.internals. > StreamThread.access$200(StreamThread.java:71) > > at org.apache.kafka.streams.processor.internals.StreamThread$1. > onPartitionsAssigned(StreamThread.java:234) > > at org.apache.kafka.clients.consumer.internals. > ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230) > > at org.apache.kafka.clients.consumer.internals. > AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314) > > at org.apache.kafka.clients.consumer.internals. > AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278) > > at org.apache.kafka.clients.consumer.internals. > ConsumerCoordinator.poll(ConsumerCoordinator.java:261) > > at org.apache.kafka.clients.consumer.KafkaConsumer. > pollOnce(KafkaConsumer.java:1039) > > at org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:1004) > > at org.apache.kafka.streams.processor.internals. > StreamThread.runLoop(StreamThread.java:569) > > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:358) > errors.log.gz Description: GNU Zip compressed data
Interesting error message du jour
Using 0.10.2.0-snapshot: java.lang.IllegalStateException: Incorrect state transition from ASSIGNING_PARTITIONS to ASSIGNING_PARTITIONS at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:163) at org.apache.kafka.streams.processor.internals.StreamThread.setStateWhenNotInPendingShutdown(StreamThread.java:175) at org.apache.kafka.streams.processor.internals.StreamThread.access$200(StreamThread.java:71) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1039) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1004) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:569) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:358)
Re: Memory / resource leak in 0.10.1.1 release
FWIW: I went through and removed all the 'custom' serdes from my code and replaced them with 'string serdes'. The memory leak problem went away. The code is a bit more cumbersome now as it's constantly flipping back and forth between Objects and JSON.. but that seems to be what it takes to keep it running. On Thu, Dec 29, 2016 at 9:42 PM, Guozhang Wang wrote: > Hello Jon, > > It is hard to tell, since I cannot see how is your Aggregate() function is > implemented as well. > > Note that the deserializer of transactionSerde is used in both `aggregate` > and `KstreamBuilder.stream`, while the serializer of transactionSerde is > only used in `aggregate`, so if you suspect the transactionSerde is the > root cause, to narrow it down you can leave the topology as > > > KStream transactionKStream = kStreamBuilder.stream( > stringSerde,transactionSerde,TOPIC); > > transactionKStream.to(TOPIC-2); > > where TOPIC-2 should be pre-created. > > The above topology will also trigger both the serializer and deserializer > of the transactionSerde, and if this topology also leads to memory leak, > then it means it is not relevant to your aggregate function. > > > Guozhang > > > On Sun, Dec 25, 2016 at 4:15 AM, Jon Yeargers > wrote: > > > I narrowed this problem down to this part of the topology (and yes, it's > > 100% repro - for me): > > > > KStream transactionKStream = > > kStreamBuilder.stream(stringSerde,transactionSerde,TOPIC); > > > > KTable, SumRecordCollector> ktAgg = > > transactionKStream.groupByKey().aggregate( > > SumRecordCollector::new, > > new Aggregate(), > > TimeWindows.of(20 * 60 * 1000L), > > collectorSerde, "table_stream"); > > > > Given that this is a pretty trivial, well-traveled piece of Kafka I can't > > imagine it has a memory leak. > > > > So Im guessing that the serde I'm using is causing a problem somehow. The > > 'transactionSerde' is just to get/set JSON into the 'SumRecord' object. > > That Object is just a bunch of String and int fields so nothing > interesting > > there either. > > > > I'm attaching the two parts of the transactionSerde to see if anyone has > > suggestions on how to find / fix this. > > > > > > > > On Thu, Dec 22, 2016 at 9:26 AM, Jon Yeargers > > wrote: > > > >> Yes - that's the one. It's 100% reproducible (for me). > >> > >> > >> On Thu, Dec 22, 2016 at 8:03 AM, Damian Guy > wrote: > >> > >>> Hi Jon, > >>> > >>> Is this for the topology where you are doing something like: > >>> > >>> topology: kStream -> groupByKey.aggregate(minute) -> foreach > >>> \-> groupByKey.aggregate(hour) -> foreach > >>> > >>> I'm trying to understand how i could reproduce your problem. I've not > >>> seen > >>> any such issues with 0.10.1.1, but then i'm not sure what you are > doing. > >>> > >>> Thanks, > >>> Damian > >>> > >>> On Thu, 22 Dec 2016 at 15:26 Jon Yeargers > >>> wrote: > >>> > >>> > Im still hitting this leak with the released version of 0.10.1.1. > >>> > > >>> > Process mem % grows over the course of 10-20 minutes and eventually > >>> the OS > >>> > kills it. > >>> > > >>> > Messages like this appear in /var/log/messages: > >>> > > >>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.793692] java > invoked > >>> > oom-killer: gfp_mask=0x24201ca, order=0, oom_score_adj=0 > >>> > > >>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.798383] java > >>> cpuset=/ > >>> > mems_allowed=0 > >>> > > >>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.801079] CPU: 0 > PID: > >>> 9550 > >>> > Comm: java Tainted: GE 4.4.19-29.55.amzn1.x86_64 #1 > >>> > > >>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] Hardware > >>> name: > >>> > Xen HVM domU, BIOS 4.2.amazon 11/11/2016 > >>> > > >>> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] > >>> > 88071c517a70 812c958f 88071c517c58 > >>> > > >>> > Dec 22 1
Re: Memory / resource leak in 0.10.1.1 release
I narrowed this problem down to this part of the topology (and yes, it's 100% repro - for me): KStream transactionKStream = kStreamBuilder.stream(stringSerde,transactionSerde,TOPIC); KTable, SumRecordCollector> ktAgg = transactionKStream.groupByKey().aggregate( SumRecordCollector::new, new Aggregate(), TimeWindows.of(20 * 60 * 1000L), collectorSerde, "table_stream"); Given that this is a pretty trivial, well-traveled piece of Kafka I can't imagine it has a memory leak. So Im guessing that the serde I'm using is causing a problem somehow. The 'transactionSerde' is just to get/set JSON into the 'SumRecord' object. That Object is just a bunch of String and int fields so nothing interesting there either. I'm attaching the two parts of the transactionSerde to see if anyone has suggestions on how to find / fix this. On Thu, Dec 22, 2016 at 9:26 AM, Jon Yeargers wrote: > Yes - that's the one. It's 100% reproducible (for me). > > > On Thu, Dec 22, 2016 at 8:03 AM, Damian Guy wrote: > >> Hi Jon, >> >> Is this for the topology where you are doing something like: >> >> topology: kStream -> groupByKey.aggregate(minute) -> foreach >> \-> groupByKey.aggregate(hour) -> foreach >> >> I'm trying to understand how i could reproduce your problem. I've not seen >> any such issues with 0.10.1.1, but then i'm not sure what you are doing. >> >> Thanks, >> Damian >> >> On Thu, 22 Dec 2016 at 15:26 Jon Yeargers >> wrote: >> >> > Im still hitting this leak with the released version of 0.10.1.1. >> > >> > Process mem % grows over the course of 10-20 minutes and eventually the >> OS >> > kills it. >> > >> > Messages like this appear in /var/log/messages: >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.793692] java invoked >> > oom-killer: gfp_mask=0x24201ca, order=0, oom_score_adj=0 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.798383] java cpuset=/ >> > mems_allowed=0 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.801079] CPU: 0 PID: >> 9550 >> > Comm: java Tainted: GE 4.4.19-29.55.amzn1.x86_64 #1 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] Hardware >> name: >> > Xen HVM domU, BIOS 4.2.amazon 11/11/2016 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] >> > 88071c517a70 812c958f 88071c517c58 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] >> > 88071c517b00 811ce76d 8109db14 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] >> > 810b2d91 0010 817d0fe9 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] Call Trace: >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] >> > [] dump_stack+0x63/0x84 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] >> > [] dump_header+0x5e/0x1d8 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] >> > [] ? set_next_entity+0xa4/0x710 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] >> > [] ? __raw_callee_save___pv_queued_ >> spin_unlock+0x11/0x20 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] >> > [] oom_kill_process+0x205/0x3d0 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] >> > [] out_of_memory+0x431/0x480 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] >> > [] __alloc_pages_nodemask+0x91e/0xa60 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] >> > [] alloc_pages_current+0x88/0x120 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] >> > [] __page_cache_alloc+0xb4/0xc0 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] >> > [] filemap_fault+0x188/0x3e0 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] >> > [] ext4_filemap_fault+0x36/0x50 [ext4] >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] >> > [] __do_fault+0x3d/0x70 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] >> > [] handle_mm_fault+0xf27/0x1870 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] >> > [] ? __raw_callee_save___pv_queued_ >> spin_unlock+0x11/0x20 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] >> > [] __do_page_fault+0x183/0x3f0 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] >> > [] do_page_fault+0x22/0x30 >> > >> > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] >> > [] page_fault+0x28/0x30 >> > >> > >
Re: Memory / resource leak in 0.10.1.1 release
Yes - that's the one. It's 100% reproducible (for me). On Thu, Dec 22, 2016 at 8:03 AM, Damian Guy wrote: > Hi Jon, > > Is this for the topology where you are doing something like: > > topology: kStream -> groupByKey.aggregate(minute) -> foreach > \-> groupByKey.aggregate(hour) -> foreach > > I'm trying to understand how i could reproduce your problem. I've not seen > any such issues with 0.10.1.1, but then i'm not sure what you are doing. > > Thanks, > Damian > > On Thu, 22 Dec 2016 at 15:26 Jon Yeargers > wrote: > > > Im still hitting this leak with the released version of 0.10.1.1. > > > > Process mem % grows over the course of 10-20 minutes and eventually the > OS > > kills it. > > > > Messages like this appear in /var/log/messages: > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.793692] java invoked > > oom-killer: gfp_mask=0x24201ca, order=0, oom_score_adj=0 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.798383] java cpuset=/ > > mems_allowed=0 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.801079] CPU: 0 PID: > 9550 > > Comm: java Tainted: GE 4.4.19-29.55.amzn1.x86_64 #1 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] Hardware name: > > Xen HVM domU, BIOS 4.2.amazon 11/11/2016 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] > > 88071c517a70 812c958f 88071c517c58 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] > > 88071c517b00 811ce76d 8109db14 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] > > 810b2d91 0010 817d0fe9 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] Call Trace: > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] > > [] dump_stack+0x63/0x84 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] > > [] dump_header+0x5e/0x1d8 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] > > [] ? set_next_entity+0xa4/0x710 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] > > [] ? __raw_callee_save___pv_queued_ > spin_unlock+0x11/0x20 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] > > [] oom_kill_process+0x205/0x3d0 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] > > [] out_of_memory+0x431/0x480 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] > > [] __alloc_pages_nodemask+0x91e/0xa60 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] > > [] alloc_pages_current+0x88/0x120 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] > > [] __page_cache_alloc+0xb4/0xc0 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] > > [] filemap_fault+0x188/0x3e0 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] > > [] ext4_filemap_fault+0x36/0x50 [ext4] > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] > > [] __do_fault+0x3d/0x70 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] > > [] handle_mm_fault+0xf27/0x1870 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] > > [] ? __raw_callee_save___pv_queued_ > spin_unlock+0x11/0x20 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] > > [] __do_page_fault+0x183/0x3f0 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] > > [] do_page_fault+0x22/0x30 > > > > Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] > > [] page_fault+0x28/0x30 > > >
Memory / resource leak in 0.10.1.1 release
Im still hitting this leak with the released version of 0.10.1.1. Process mem % grows over the course of 10-20 minutes and eventually the OS kills it. Messages like this appear in /var/log/messages: Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.793692] java invoked oom-killer: gfp_mask=0x24201ca, order=0, oom_score_adj=0 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.798383] java cpuset=/ mems_allowed=0 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.801079] CPU: 0 PID: 9550 Comm: java Tainted: GE 4.4.19-29.55.amzn1.x86_64 #1 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] Hardware name: Xen HVM domU, BIOS 4.2.amazon 11/11/2016 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] 88071c517a70 812c958f 88071c517c58 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] 88071c517b00 811ce76d 8109db14 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] 810b2d91 0010 817d0fe9 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] Call Trace: Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] [] dump_stack+0x63/0x84 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] [] dump_header+0x5e/0x1d8 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] [] ? set_next_entity+0xa4/0x710 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] [] ? __raw_callee_save___pv_queued_spin_unlock+0x11/0x20 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] [] oom_kill_process+0x205/0x3d0 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] [] out_of_memory+0x431/0x480 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] [] __alloc_pages_nodemask+0x91e/0xa60 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] [] alloc_pages_current+0x88/0x120 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] [] __page_cache_alloc+0xb4/0xc0 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] [] filemap_fault+0x188/0x3e0 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] [] ext4_filemap_fault+0x36/0x50 [ext4] Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] [] __do_fault+0x3d/0x70 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] [] handle_mm_fault+0xf27/0x1870 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] [] ? __raw_callee_save___pv_queued_spin_unlock+0x11/0x20 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] [] __do_page_fault+0x183/0x3f0 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] [] do_page_fault+0x22/0x30 Dec 22 13:31:22 ip-172-16-101-108 kernel: [2989844.805072] [] page_fault+0x28/0x30
Re: spontaneous / unwanted exits from KStream apps
Did I say memory usage was stable? Lies. After ~15min it's up to 50% - and climbing. 20 min: 63% Eventually the OS killed it. Didn't generate a log this time though. Found this snip in /var/log/messages: Dec 21 12:56:19 ip-172-16-101-108 kernel: [2901342.207241] java invoked oom-killer: gfp_mask=0x24201ca, order=0, oom_score_adj=0 Some stack dump bits as well. This bug (or one v much like it) was fixed in the 0.10.2.0-SNAPSHOT I was previously using. The unhappy build comes from 0.10.1.1-RC1 and trunk. On Wed, Dec 21, 2016 at 4:32 AM, Jon Yeargers wrote: > Found this treasure lurking in my app folder: > > Shows that the process was OOM-killed by the OS. Have restarted to see if > it will reproduce - so far memory usage seems stable. > > > > On Wed, Dec 21, 2016 at 3:05 AM, Jon Yeargers > wrote: > >> I upgraded my app(s) to 0.10.1.1-rc1 but I'm still seeing these >> error-free shutdowns. Something is taking my app down after varying lengths >> of time (10 minutes to several hours). Doesn't matter if Im running one or >> many instances. >> >> Suggestions on where to look? I've sent several debug logs. >> >> >> >
Re: spontaneous / unwanted exits from KStream apps
Found this treasure lurking in my app folder: Shows that the process was OOM-killed by the OS. Have restarted to see if it will reproduce - so far memory usage seems stable. On Wed, Dec 21, 2016 at 3:05 AM, Jon Yeargers wrote: > I upgraded my app(s) to 0.10.1.1-rc1 but I'm still seeing these error-free > shutdowns. Something is taking my app down after varying lengths of time > (10 minutes to several hours). Doesn't matter if Im running one or many > instances. > > Suggestions on where to look? I've sent several debug logs. > > >
spontaneous / unwanted exits from KStream apps
I upgraded my app(s) to 0.10.1.1-rc1 but I'm still seeing these error-free shutdowns. Something is taking my app down after varying lengths of time (10 minutes to several hours). Doesn't matter if Im running one or many instances. Suggestions on where to look? I've sent several debug logs.
Re: effect of high IOWait on KStream app?
Could this delay be contributing to the various issues Im seeing? - extended / repeating rebalances - broker connection drops On Sat, Dec 17, 2016 at 10:27 AM, Eno Thereska wrote: > Yeah, the numbers for the streams tests seem to be low. For reference, > here is what I get when I run it on my laptop, with Kafka co-located > (Macbook pro, 16GB, SSD). These are rounded up with no decimal places: > > > Producer Performance [MB/sec write]: 40 > > Consumer Performance [MB/sec read]: 126 > > > Streams Performance [MB/sec read]: 81 > > Streams Performance [MB/sec read+write]: 45 > > Streams Performance [MB/sec read+store]: 22 > > Streams KStreamKTable LeftJoin Performance [MB/s joined]: 51 > > Streams KStreamKStream LeftJoin Performance [MB/s joined]: 11 > > Streams KTableKTable LeftJoin Performance [MB/s joined]: 12 > > I haven't tried this on AWS unfortunately so I don't know what to expect > there. > > Eno > > > > On 17 Dec 2016, at 15:39, Jon Yeargers wrote: > > > > stateDir=/tmp/kafka-streams-simple-benchmark > > > > numRecords=1000 > > > > SLF4J: Class path contains multiple SLF4J bindings. > > > > SLF4J: Found binding in > > [jar:file:/home/ec2-user/kafka/streams/build/dependant- > libs-2.10.6/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/ > StaticLoggerBinder.class] > > > > SLF4J: Found binding in > > [jar:file:/home/ec2-user/kafka/tools/build/dependant- > libs-2.10.6/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/ > StaticLoggerBinder.class] > > > > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > > explanation. > > > > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > > > > [2016-12-17 15:26:15,011] WARN Error while fetching metadata with > > correlation id 1 : {simpleBenchmarkSourceTopic=LEADER_NOT_AVAILABLE} > > (org.apache.kafka.clients.NetworkClient:709) > > > > Producer Performance [MB/sec write]: 87.52977203130317 > > > > Consumer Performance [MB/sec read]: 88.05408180729077 > > > > Streams Performance [MB/sec read]: 23.306380376435413 > > > > [2016-12-17 15:27:22,722] WARN Error while fetching metadata with > > correlation id 1 : {simpleBenchmarkSinkTopic=LEADER_NOT_AVAILABLE} > > (org.apache.kafka.clients.NetworkClient:709) > > > > Streams Performance [MB/sec read+write]: 20.37099360560648 > > > > Streams Performance [MB/sec read+store]: 11.918550778354337 > > > > Initializing kStreamTopic joinSourceTopic1kStreamKTable > > > > [2016-12-17 15:29:39,597] WARN Error while fetching metadata with > > correlation id 1 : {joinSourceTopic1kStreamKTable=LEADER_NOT_AVAILABLE} > > (org.apache.kafka.clients.NetworkClient:709) > > > > Initializing kTableTopic joinSourceTopic2kStreamKTable > > > > [2016-12-17 15:29:50,589] WARN Error while fetching metadata with > > correlation id 1 : {joinSourceTopic2kStreamKTable=LEADER_NOT_AVAILABLE} > > (org.apache.kafka.clients.NetworkClient:709) > > > > Streams KStreamKTable LeftJoin Performance [MB/s joined]: > 14.690136622553428 > > > > Initializing kStreamTopic joinSourceTopic1kStreamKStream > > > > [2016-12-17 15:31:12,583] WARN Error while fetching metadata with > > correlation id 1 : {joinSourceTopic1kStreamKStream=LEADER_NOT_AVAILABLE} > > (org.apache.kafka.clients.NetworkClient:709) > > > > Initializing kStreamTopic joinSourceTopic2kStreamKStream > > > > [2016-12-17 15:31:23,534] WARN Error while fetching metadata with > > correlation id 1 : {joinSourceTopic2kStreamKStream=LEADER_NOT_AVAILABLE} > > (org.apache.kafka.clients.NetworkClient:709) > > > > Streams KStreamKStream LeftJoin Performance [MB/s joined]: > 8.647640177490924 > > > > Initializing kTableTopic joinSourceTopic1kTableKTable > > > > [2016-12-17 15:33:34,586] WARN Error while fetching metadata with > > correlation id 1 : {joinSourceTopic1kTableKTable=LEADER_NOT_AVAILABLE} > > (org.apache.kafka.clients.NetworkClient:709) > > > > Initializing kTableTopic joinSourceTopic2kTableKTable > > > > [2016-12-17 15:33:45,520] WARN Error while fetching metadata with > > correlation id 1 : {joinSourceTopic2kTableKTable=LEADER_NOT_AVAILABLE} > > (org.apache.kafka.clients.NetworkClient:709) > > > > Streams KTableKTable LeftJoin Performance [MB/s joined]: > 6.530348031376133 > > > > On Sat, Dec 17, 2016 at 6:39 AM, Jon Yeargers > > wrote: > > > >> I'd be happy to but the AWS AMI (default) i'm using is fighting this at > >> every turn. Will keep t
Re: checking consumer lag on KStreams app?
3 PM, Damian Guy > > >>> wrote: > > >>>> > > >>>>> Hi Sachin, > > >>>>> > > >>>>> You should use the kafka-consumer-groups.sh command. The > > >>>>> ConsumerOffsetChecker is deprecated and is only for the old > consumer. > > >>>>> > > >>>>> Thanks, > > >>>>> Damian > > >>>>> > > >>>>> On Mon, 12 Dec 2016 at 14:32 Sachin Mittal > > >> wrote: > > >>>>> > > >>>>>> Hi, > > >>>>>> I have a streams application running with application id test. > > >>>>>> When I try to check consumer lag like you suggested I get the > > >>> following > > >>>>>> issue: > > >>>>>> > > >>>>>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker > > >> --zookeeper > > >>>>>> localhost:2181 --group test > > >>>>>> [2016-12-12 10:26:01,348] WARN WARNING: ConsumerOffsetChecker is > > >>>>>> deprecated and will be dropped in releases following 0.9.0. Use > > >>>>>> ConsumerGroupCommand instead. (kafka.tools. > ConsumerOffsetChecker$) > > >>>>>> SLF4J: Class path contains multiple SLF4J bindings. > > >>>>>> SLF4J: Found binding in > > >>>>>> > > >>>>>> [jar:file:/home/testuser/kafka/kafka_2.10-0.10.0.1/ > > >>>>> libs/logback-classic-1.0.3.jar!/org/slf4j/impl/ > > >>> StaticLoggerBinder.class] > > >>>>>> SLF4J: Found binding in > > >>>>>> > > >>>>>> [jar:file:/home/testuser/kafka/kafka_2.10-0.10.0.1/ > > >>>>> libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/ > > >>> StaticLoggerBinder.class] > > >>>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for > > >> an > > >>>>>> explanation. > > >>>>>> SLF4J: Actual binding is of type > > >>>>>> [ch.qos.logback.classic.selector.DefaultContextSelector] > > >>>>>> Exiting due to: org.apache.zookeeper.KeeperException$ > > >>> NoNodeException: > > >>>>>> KeeperErrorCode = NoNode for /consumers/test/owners. > > >>>>>> > > >>>>>> Please let me know where I may be going wrong. > > >>>>>> I have the kafka logs set in folder > > >>>>>> /data01/testuser/kafka-logs > > >>>>>> > > >>>>>> Under kafka-logs I see many folders with name something like > > >>>>>> consumer_offsets_* > > >>>>>> > > >>>>>> I have the stream dir set in folder > > >>>>>> /data01/testuser/kafka-streams/test > > >>>>>> > > >>>>>> Thanks > > >>>>>> Sachin > > >>>>>> > > >>>>>> > > >>>>>> On Sun, Dec 11, 2016 at 2:19 AM, Matthias J. Sax < > > >>>> matth...@confluent.io> > > >>>>>> wrote: > > >>>>>> > > >>>>>>> It's basically just a consumer as any other. The application.id > > >> is > > >>>>> used > > >>>>>>> as consumer group.id. > > >>>>>>> > > >>>>>>> So just use the available tools you do use to check consumer lag. > > >>>>>>> > > >>>>>>> > > >>>>>>> -Matthias > > >>>>>>> > > >>>>>>> On 12/9/16 5:49 PM, Jon Yeargers wrote: > > >>>>>>>> How would this be done? > > >>>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > > > > > >
Re: effect of high IOWait on KStream app?
stateDir=/tmp/kafka-streams-simple-benchmark numRecords=1000 SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/ec2-user/kafka/streams/build/dependant-libs-2.10.6/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/ec2-user/kafka/tools/build/dependant-libs-2.10.6/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] [2016-12-17 15:26:15,011] WARN Error while fetching metadata with correlation id 1 : {simpleBenchmarkSourceTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:709) Producer Performance [MB/sec write]: 87.52977203130317 Consumer Performance [MB/sec read]: 88.05408180729077 Streams Performance [MB/sec read]: 23.306380376435413 [2016-12-17 15:27:22,722] WARN Error while fetching metadata with correlation id 1 : {simpleBenchmarkSinkTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:709) Streams Performance [MB/sec read+write]: 20.37099360560648 Streams Performance [MB/sec read+store]: 11.918550778354337 Initializing kStreamTopic joinSourceTopic1kStreamKTable [2016-12-17 15:29:39,597] WARN Error while fetching metadata with correlation id 1 : {joinSourceTopic1kStreamKTable=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:709) Initializing kTableTopic joinSourceTopic2kStreamKTable [2016-12-17 15:29:50,589] WARN Error while fetching metadata with correlation id 1 : {joinSourceTopic2kStreamKTable=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:709) Streams KStreamKTable LeftJoin Performance [MB/s joined]: 14.690136622553428 Initializing kStreamTopic joinSourceTopic1kStreamKStream [2016-12-17 15:31:12,583] WARN Error while fetching metadata with correlation id 1 : {joinSourceTopic1kStreamKStream=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:709) Initializing kStreamTopic joinSourceTopic2kStreamKStream [2016-12-17 15:31:23,534] WARN Error while fetching metadata with correlation id 1 : {joinSourceTopic2kStreamKStream=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:709) Streams KStreamKStream LeftJoin Performance [MB/s joined]: 8.647640177490924 Initializing kTableTopic joinSourceTopic1kTableKTable [2016-12-17 15:33:34,586] WARN Error while fetching metadata with correlation id 1 : {joinSourceTopic1kTableKTable=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:709) Initializing kTableTopic joinSourceTopic2kTableKTable [2016-12-17 15:33:45,520] WARN Error while fetching metadata with correlation id 1 : {joinSourceTopic2kTableKTable=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:709) Streams KTableKTable LeftJoin Performance [MB/s joined]: 6.530348031376133 On Sat, Dec 17, 2016 at 6:39 AM, Jon Yeargers wrote: > I'd be happy to but the AWS AMI (default) i'm using is fighting this at > every turn. Will keep trying. > > On Sat, Dec 17, 2016 at 2:46 AM, Eno Thereska > wrote: > >> Jon, >> >> It's hard to tell. Would you be willing to run a simple benchmark and >> report back the numbers? The benchmark is called SimpleBenchmark.java, it's >> included with the source, and it will start a couple of streams apps. It >> requires a ZK and a broker to be up. Then you run it: >> org.apache.kafka.streams.perf.SimpleBenchmark >> . >> >> Thanks >> Eno >> > On 16 Dec 2016, at 20:00, Jon Yeargers >> wrote: >> > >> > Looking for reasons why my installations seem to be generating so many >> > issues: >> > >> > Starting an app which is >> > >> > stream->aggregate->filter->foreach >> > >> > While it's running the system in question (AWS) averages >10% IOWait >> with >> > spikes to 60-70%. The CPU load is in the range of 3/2/1 (8 core >> machine w/ >> > 16G RAM) >> > >> > Could this IO delay be causing problems? The 'kafka-streams' folder is >> on a >> > separate EBS optimized volume with 2500 dedicated IOPs. >> >> >
Re: effect of high IOWait on KStream app?
I'd be happy to but the AWS AMI (default) i'm using is fighting this at every turn. Will keep trying. On Sat, Dec 17, 2016 at 2:46 AM, Eno Thereska wrote: > Jon, > > It's hard to tell. Would you be willing to run a simple benchmark and > report back the numbers? The benchmark is called SimpleBenchmark.java, it's > included with the source, and it will start a couple of streams apps. It > requires a ZK and a broker to be up. Then you run it: > org.apache.kafka.streams.perf.SimpleBenchmark > . > > Thanks > Eno > > On 16 Dec 2016, at 20:00, Jon Yeargers wrote: > > > > Looking for reasons why my installations seem to be generating so many > > issues: > > > > Starting an app which is > > > > stream->aggregate->filter->foreach > > > > While it's running the system in question (AWS) averages >10% IOWait with > > spikes to 60-70%. The CPU load is in the range of 3/2/1 (8 core machine > w/ > > 16G RAM) > > > > Could this IO delay be causing problems? The 'kafka-streams' folder is > on a > > separate EBS optimized volume with 2500 dedicated IOPs. > >
effect of high IOWait on KStream app?
Looking for reasons why my installations seem to be generating so many issues: Starting an app which is stream->aggregate->filter->foreach While it's running the system in question (AWS) averages >10% IOWait with spikes to 60-70%. The CPU load is in the range of 3/2/1 (8 core machine w/ 16G RAM) Could this IO delay be causing problems? The 'kafka-streams' folder is on a separate EBS optimized volume with 2500 dedicated IOPs.
Re: What makes a KStream app exit?
And these bugs would cause the behaviors Im seeing? On Fri, Dec 16, 2016 at 10:45 AM, Matthias J. Sax wrote: > We just discovered a couple of bugs with regard to standby tasks... Not > all bug fix PRs got merged yet. > > You can try running on trunk to get those fixes. Should only be a few > days until the fixes get merged. > > > -Matthias > > On 12/16/16 9:10 AM, Jon Yeargers wrote: > > Have started having this issue with another KStream based app. Digging > > through logs I ran across this message: > > > > When I've seen it before it certainly does kill the application. At the > end > > of the SNIP you can see the exit process starting. > > > > > > 2016-12-16 17:04:51,507 [StreamThread-1] DEBUG > > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] > creating > > new standby task 0_0 > > > > 2016-12-16 17:04:51,507 [StreamThread-1] INFO > > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] > Creating > > new standby task 0_0 with assigned partitions [[rtdetail_breakout-0]] > > > > 2016-12-16 17:04:51,508 [StreamThread-1] INFO > > o.a.k.s.p.internals.StandbyTask - standby-task [0_0] Initializing state > > stores > > > > 2016-12-16 17:04:51,508 [StreamThread-1] DEBUG > > o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor > fetching > > committed offsets for partitions: [rtdetail_breakout-0] > > > > 2016-12-16 17:04:51,819 [StreamThread-1] ERROR > > o.a.k.c.c.i.ConsumerCoordinator - User provided listener > > org.apache.kafka.streams.processor.internals.StreamThread$1 for group > > RtDetailBreakoutProcessor fa > > > > iled on partition assignment > > > > java.lang.UnsupportedOperationException: null > > > > at > > org.apache.kafka.streams.processor.internals.StandbyContextImpl. > recordCollector(StandbyContextImpl.java:81) > > > > at > > org.apache.kafka.streams.state.internals.StoreChangeLogger.( > StoreChangeLogger.java:54) > > > > at > > org.apache.kafka.streams.state.internals.StoreChangeLogger.( > StoreChangeLogger.java:46) > > > > at > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init( > RocksDBWindowStore.java:197) > > > > at > > org.apache.kafka.streams.state.internals.MeteredWindowStore.init( > MeteredWindowStore.java:66) > > > > at > > org.apache.kafka.streams.state.internals.CachingWindowStore.init( > CachingWindowStore.java:64) > > > > at > > org.apache.kafka.streams.processor.internals.AbstractTask. > initializeStateStores(AbstractTask.java:86) > > > > at > > org.apache.kafka.streams.processor.internals.StandbyTask.( > StandbyTask.java:68) > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread. > createStandbyTask(StreamThread.java:733) > > > > at > > org.apache.kafka.streams.processor.internals. > StreamThread.addStandbyTasks(StreamThread.java:757) > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread.access$200( > StreamThread.java:69) > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread$1. > onPartitionsAssigned(StreamThread.java:125) > > > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > onJoinComplete(ConsumerCoordinator.java:229) > > > > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > joinGroupIfNeeded(AbstractCoordinator.java:313) > > > > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > ensureActiveGroup(AbstractCoordinator.java:277) > > > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > ConsumerCoordinator.java:260) > > > > at > > org.apache.kafka.clients.consumer.KafkaConsumer. > pollOnce(KafkaConsumer.java:1013) > > > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:979) > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:442) > > > > at > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:242) > > > > 2016-12-16 17:04:51,820 [StreamThread-1] DEBUG > > o.a.k.c.c.i.ConsumerCoordinator - Group RtDetailBreakoutProcessor > fetching > > committed offsets for partitions: [rtdetail_breakout-2, > > rtdet
Re: What makes a KStream app exit?
2016-12-16 17:04:51,821 [StreamThread-1] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Shutting down 2016-12-16 17:04:51,821 [StreamThread-1] DEBUG o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] shutdownTasksAndState: shutting down all active tasks [[0_1, 0_2, 1_1, 0_5, 0_6]] and standby tasks [[]] On Fri, Dec 16, 2016 at 4:53 AM, Jon Yeargers wrote: > FWIW I put a .warn in my shutdown hook - to make sure it wasn't being > called unknowingly. > > Runtime.getRuntime().addShutdownHook(new Thread(() -> { > try { > LOGGER.warn("ShutdownHook"); > kafkaStreams.close(); > } catch (Exception e) { > // ignored > } > })); > > > Ran another test and the app closed after ~40min. The above message > appears 3rd from the end (several seconds after the shutdown process has > commenced). > > (attaching log section) > > This has *got* to be something that I've setup improperly... I just can't > seem to see it. > > On Fri, Dec 16, 2016 at 2:10 AM, Jon Yeargers > wrote: > >> Im seeing instances where my apps are exiting (gracefully, mind you) >> without any obvious errors or cause. I have debug logs from many instances >> of this and have yet to find a reason to explain what's happening. >> >> - nothing in the app log >> - nothing in /var/log/messages (IE not OOM killed) >> - not being closed via /etc/init.d >> - nothing in the broker logs >> >> Running 0.10.1.0 >> >> example log: >> >> https://drive.google.com/file/d/0B0KWMAAevNSteTh4aWs3SkRURU0 >> /view?usp=sharing >> > >
Re: What makes a KStream app exit?
FWIW I put a .warn in my shutdown hook - to make sure it wasn't being called unknowingly. Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { LOGGER.warn("ShutdownHook"); kafkaStreams.close(); } catch (Exception e) { // ignored } })); Ran another test and the app closed after ~40min. The above message appears 3rd from the end (several seconds after the shutdown process has commenced). (attaching log section) This has *got* to be something that I've setup improperly... I just can't seem to see it. On Fri, Dec 16, 2016 at 2:10 AM, Jon Yeargers wrote: > Im seeing instances where my apps are exiting (gracefully, mind you) > without any obvious errors or cause. I have debug logs from many instances > of this and have yet to find a reason to explain what's happening. > > - nothing in the app log > - nothing in /var/log/messages (IE not OOM killed) > - not being closed via /etc/init.d > - nothing in the broker logs > > Running 0.10.1.0 > > example log: > > https://drive.google.com/file/d/0B0KWMAAevNSteTh4aWs3SkRURU0/ > view?usp=sharing >
What makes a KStream app exit?
Im seeing instances where my apps are exiting (gracefully, mind you) without any obvious errors or cause. I have debug logs from many instances of this and have yet to find a reason to explain what's happening. - nothing in the app log - nothing in /var/log/messages (IE not OOM killed) - not being closed via /etc/init.d - nothing in the broker logs Running 0.10.1.0 example log: https://drive.google.com/file/d/0B0KWMAAevNSteTh4aWs3SkRURU0/view?usp=sharing
"support" topics for KStreams
What's the retention settings for these (-changelog and -replication)? Im wondering about the relentless rebalancing issues Im facing and wondering if it has anything to do with consumers that lag too far behind. If I delete all the topics associated with a KStream project and restart it there are no rebalance issues. Everything is fast and responsive. Over the course of 6-10 hours of execution the rebalances take longer and longer until eventually the app(s) stop responding at all. Just curious.
RocksDB - no locks available exception
Attached is a debug log showing this exception. Question: is it typical to have so many disconnections from brokers? This log also includes the exception "Log end offset should not change while restoring" errors.log.gz Description: GNU Zip compressed data
Re: Another odd error
Update: the app ran well for several hours.. until I tried to update it. I copied a new build up to one machine (of five) and then we went back to near-endless-rebalance. After about an hour I ended up killing the other four instances and watching the first (new one). It took 90 minutes before it started consuming anything. This morning I copied / started two more. 60 minutes and still waiting for rebalance to conclude. Obviously its impractical to delete the topic(s) before updating the consumer software. What am I doing wrong thats causing all this waiting? On Wed, Dec 14, 2016 at 9:28 AM, Jon Yeargers wrote: > In a turn of events - this morning I was about to throw in the proverbial > towel on Kafka. In a last ditch effort I killed all but one instance of my > app, put it back to a single thread (why offer the option if it's not > advised?) and deleted every last topic that had any relation to this app. > > I restarted it on a single machine and it magically worked. It's been > running for more than an hour now and hasn't been stuck in 'rebalance-land' > at all. > > I'll keep watching it and see how it goes. > > On Wed, Dec 14, 2016 at 6:13 AM, Damian Guy wrote: > >> We do recommend one thread per instance of the app. However, it should >> also >> work with multiple threads. >> I can't debug the problem any further without the logs from the other >> apps. >> We'd need to try and see if another instance still has task 1_3 open ( i >> suspect it does ) >> >> Thanks, >> Damian >> >> On Wed, 14 Dec 2016 at 13:20 Jon Yeargers >> wrote: >> >> > What should I do about this? One thread per app? >> > >> > On Wed, Dec 14, 2016 at 4:11 AM, Damian Guy >> wrote: >> > >> > > That is correct >> > > >> > > On Wed, 14 Dec 2016 at 12:09 Jon Yeargers >> > > wrote: >> > > >> > > > I have the app running on 5 machines. Is that what you mean? >> > > > >> > > > On Wed, Dec 14, 2016 at 1:38 AM, Damian Guy >> > > wrote: >> > > > >> > > > > Hi Jon, >> > > > > >> > > > > Do you have more than one instance of the app running? The reason >> i >> > ask >> > > > is >> > > > > because the task (task 1_3) that fails with the >> > > > > "java.lang.IllegalStateException" in this log is previously >> running >> > > as a >> > > > > Standby Task. This would mean the active task for this store would >> > have >> > > > > been running elsewhere, but i don't see that in the logs. The >> > exception >> > > > > occurs as StreamThread-1 starts to run task 1_3 as an active task. >> > The >> > > > > exception might indicate that another thread/instance is still >> > writing >> > > to >> > > > > the changelog topic for the State Store. >> > > > > >> > > > > Thanks, >> > > > > Damian >> > > > > >> > > > > On Tue, 13 Dec 2016 at 17:23 Jon Yeargers < >> jon.yearg...@cedexis.com> >> > > > > wrote: >> > > > > >> > > > > > As near as I can see it's rebalancing constantly. >> > > > > > >> > > > > > I'll up that value and see what happens. >> > > > > > >> > > > > > On Tue, Dec 13, 2016 at 9:04 AM, Damian Guy < >> damian@gmail.com> >> > > > > wrote: >> > > > > > >> > > > > > > Hi Jon, >> > > > > > > >> > > > > > > I haven't had much of a chance to look at the logs in detail >> too >> > > much >> > > > > > yet, >> > > > > > > but i have noticed that your app seems to be rebalancing >> > > frequently. >> > > > > It >> > > > > > > seems that it is usually around the 300 second mark, which >> > usually >> > > > > would >> > > > > > > mean that poll hasn't been called for at least that long. You >> > might >> > > > > want >> > > > > > to >> > > > > > > try setting the config ConsumerConfig.MAX_POLL_INTERVAL_CONFIG >> to >> > > > > > > something >> > > > > > > h
Re: Another odd error
In a turn of events - this morning I was about to throw in the proverbial towel on Kafka. In a last ditch effort I killed all but one instance of my app, put it back to a single thread (why offer the option if it's not advised?) and deleted every last topic that had any relation to this app. I restarted it on a single machine and it magically worked. It's been running for more than an hour now and hasn't been stuck in 'rebalance-land' at all. I'll keep watching it and see how it goes. On Wed, Dec 14, 2016 at 6:13 AM, Damian Guy wrote: > We do recommend one thread per instance of the app. However, it should also > work with multiple threads. > I can't debug the problem any further without the logs from the other apps. > We'd need to try and see if another instance still has task 1_3 open ( i > suspect it does ) > > Thanks, > Damian > > On Wed, 14 Dec 2016 at 13:20 Jon Yeargers > wrote: > > > What should I do about this? One thread per app? > > > > On Wed, Dec 14, 2016 at 4:11 AM, Damian Guy > wrote: > > > > > That is correct > > > > > > On Wed, 14 Dec 2016 at 12:09 Jon Yeargers > > > wrote: > > > > > > > I have the app running on 5 machines. Is that what you mean? > > > > > > > > On Wed, Dec 14, 2016 at 1:38 AM, Damian Guy > > > wrote: > > > > > > > > > Hi Jon, > > > > > > > > > > Do you have more than one instance of the app running? The reason i > > ask > > > > is > > > > > because the task (task 1_3) that fails with the > > > > > "java.lang.IllegalStateException" in this log is previously > running > > > as a > > > > > Standby Task. This would mean the active task for this store would > > have > > > > > been running elsewhere, but i don't see that in the logs. The > > exception > > > > > occurs as StreamThread-1 starts to run task 1_3 as an active task. > > The > > > > > exception might indicate that another thread/instance is still > > writing > > > to > > > > > the changelog topic for the State Store. > > > > > > > > > > Thanks, > > > > > Damian > > > > > > > > > > On Tue, 13 Dec 2016 at 17:23 Jon Yeargers < > jon.yearg...@cedexis.com> > > > > > wrote: > > > > > > > > > > > As near as I can see it's rebalancing constantly. > > > > > > > > > > > > I'll up that value and see what happens. > > > > > > > > > > > > On Tue, Dec 13, 2016 at 9:04 AM, Damian Guy < > damian@gmail.com> > > > > > wrote: > > > > > > > > > > > > > Hi Jon, > > > > > > > > > > > > > > I haven't had much of a chance to look at the logs in detail > too > > > much > > > > > > yet, > > > > > > > but i have noticed that your app seems to be rebalancing > > > frequently. > > > > > It > > > > > > > seems that it is usually around the 300 second mark, which > > usually > > > > > would > > > > > > > mean that poll hasn't been called for at least that long. You > > might > > > > > want > > > > > > to > > > > > > > try setting the config ConsumerConfig.MAX_POLL_INTERVAL_CONFIG > to > > > > > > > something > > > > > > > higher than 30 (which is the default). > > > > > > > > > > > > > > I'll continue to look at your logs and get back to you. > > > > > > > Thanks, > > > > > > > Damian > > > > > > > > > > > > > > On Tue, 13 Dec 2016 at 15:02 Jon Yeargers < > > > jon.yearg...@cedexis.com> > > > > > > > wrote: > > > > > > > > > > > > > > > attached is a log with lots of disconnections and a small > > amount > > > of > > > > > > > > actual, useful activity. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Dec 13, 2016 at 4:57 AM, Jon Yeargers < > > > > > > jon.yearg...@cedexis.com> > > > > > > > > wrote: > > > &g
Re: Another odd error
What should I do about this? One thread per app? On Wed, Dec 14, 2016 at 4:11 AM, Damian Guy wrote: > That is correct > > On Wed, 14 Dec 2016 at 12:09 Jon Yeargers > wrote: > > > I have the app running on 5 machines. Is that what you mean? > > > > On Wed, Dec 14, 2016 at 1:38 AM, Damian Guy > wrote: > > > > > Hi Jon, > > > > > > Do you have more than one instance of the app running? The reason i ask > > is > > > because the task (task 1_3) that fails with the > > > "java.lang.IllegalStateException" in this log is previously running > as a > > > Standby Task. This would mean the active task for this store would have > > > been running elsewhere, but i don't see that in the logs. The exception > > > occurs as StreamThread-1 starts to run task 1_3 as an active task. The > > > exception might indicate that another thread/instance is still writing > to > > > the changelog topic for the State Store. > > > > > > Thanks, > > > Damian > > > > > > On Tue, 13 Dec 2016 at 17:23 Jon Yeargers > > > wrote: > > > > > > > As near as I can see it's rebalancing constantly. > > > > > > > > I'll up that value and see what happens. > > > > > > > > On Tue, Dec 13, 2016 at 9:04 AM, Damian Guy > > > wrote: > > > > > > > > > Hi Jon, > > > > > > > > > > I haven't had much of a chance to look at the logs in detail too > much > > > > yet, > > > > > but i have noticed that your app seems to be rebalancing > frequently. > > > It > > > > > seems that it is usually around the 300 second mark, which usually > > > would > > > > > mean that poll hasn't been called for at least that long. You might > > > want > > > > to > > > > > try setting the config ConsumerConfig.MAX_POLL_INTERVAL_CONFIG to > > > > > something > > > > > higher than 30 (which is the default). > > > > > > > > > > I'll continue to look at your logs and get back to you. > > > > > Thanks, > > > > > Damian > > > > > > > > > > On Tue, 13 Dec 2016 at 15:02 Jon Yeargers < > jon.yearg...@cedexis.com> > > > > > wrote: > > > > > > > > > > > attached is a log with lots of disconnections and a small amount > of > > > > > > actual, useful activity. > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Dec 13, 2016 at 4:57 AM, Jon Yeargers < > > > > jon.yearg...@cedexis.com> > > > > > > wrote: > > > > > > > > > > > > n/m - I understand the logging issue now. Am generating a new > one. > > > Will > > > > > > send shortly. > > > > > > > > > > > > On Tue, Dec 13, 2016 at 4:55 AM, Jon Yeargers < > > > > jon.yearg...@cedexis.com> > > > > > > wrote: > > > > > > > > > > > > Yes - saw that one. There were plenty of smaller records > available > > > > > though. > > > > > > > > > > > > I sent another log this morning with the level set to DEBUG. > > > Hopefully > > > > > you > > > > > > rec'd it. > > > > > > > > > > > > On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy < > damian@gmail.com> > > > > > wrote: > > > > > > > > > > > > HI Jon, > > > > > > > > > > > > It looks like you have the logging level for KafkaStreams set to > at > > > > least > > > > > > WARN. I can only see ERROR level logs being produced from > Streams. > > > > > > > > > > > > However, i did notice an issue in the logs (not related to your > > > > specific > > > > > > error but you will need to fix anyway): > > > > > > > > > > > > There are lots of messages like: > > > > > > task [2_9] Error sending record to topic > > > > > > PRTMinuteAgg-prt_hour_agg_stream-changelog > > > > > > org.apache.kafka.common.errors.RecordTooLargeException: The > message > > > is > > > > > > 2381750 bytes when ser
Re: Another odd error
I have the app running on 5 machines. Is that what you mean? On Wed, Dec 14, 2016 at 1:38 AM, Damian Guy wrote: > Hi Jon, > > Do you have more than one instance of the app running? The reason i ask is > because the task (task 1_3) that fails with the > "java.lang.IllegalStateException" in this log is previously running as a > Standby Task. This would mean the active task for this store would have > been running elsewhere, but i don't see that in the logs. The exception > occurs as StreamThread-1 starts to run task 1_3 as an active task. The > exception might indicate that another thread/instance is still writing to > the changelog topic for the State Store. > > Thanks, > Damian > > On Tue, 13 Dec 2016 at 17:23 Jon Yeargers > wrote: > > > As near as I can see it's rebalancing constantly. > > > > I'll up that value and see what happens. > > > > On Tue, Dec 13, 2016 at 9:04 AM, Damian Guy > wrote: > > > > > Hi Jon, > > > > > > I haven't had much of a chance to look at the logs in detail too much > > yet, > > > but i have noticed that your app seems to be rebalancing frequently. > It > > > seems that it is usually around the 300 second mark, which usually > would > > > mean that poll hasn't been called for at least that long. You might > want > > to > > > try setting the config ConsumerConfig.MAX_POLL_INTERVAL_CONFIG to > > > something > > > higher than 30 (which is the default). > > > > > > I'll continue to look at your logs and get back to you. > > > Thanks, > > > Damian > > > > > > On Tue, 13 Dec 2016 at 15:02 Jon Yeargers > > > wrote: > > > > > > > attached is a log with lots of disconnections and a small amount of > > > > actual, useful activity. > > > > > > > > > > > > > > > > On Tue, Dec 13, 2016 at 4:57 AM, Jon Yeargers < > > jon.yearg...@cedexis.com> > > > > wrote: > > > > > > > > n/m - I understand the logging issue now. Am generating a new one. > Will > > > > send shortly. > > > > > > > > On Tue, Dec 13, 2016 at 4:55 AM, Jon Yeargers < > > jon.yearg...@cedexis.com> > > > > wrote: > > > > > > > > Yes - saw that one. There were plenty of smaller records available > > > though. > > > > > > > > I sent another log this morning with the level set to DEBUG. > Hopefully > > > you > > > > rec'd it. > > > > > > > > On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy > > > wrote: > > > > > > > > HI Jon, > > > > > > > > It looks like you have the logging level for KafkaStreams set to at > > least > > > > WARN. I can only see ERROR level logs being produced from Streams. > > > > > > > > However, i did notice an issue in the logs (not related to your > > specific > > > > error but you will need to fix anyway): > > > > > > > > There are lots of messages like: > > > > task [2_9] Error sending record to topic > > > > PRTMinuteAgg-prt_hour_agg_stream-changelog > > > > org.apache.kafka.common.errors.RecordTooLargeException: The message > is > > > > 2381750 bytes when serialized which is larger than the maximum > > > > > > > > This means you need to add some extra config to your StreamsConfig: > > > > config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, > > > > expectedMaximumMessageSizeBytes) > > > > > > > > You will also possible need to adjust the broker properties and > > > > increase message.max.bytes > > > > - it will need to be at least as large as the setting above. > > > > > > > > At the moment all of the change-logs for your state-stores are being > > > > dropped due to this issue. > > > > > > > > Thanks, > > > > Damian > > > > > > > > > > > > > > > > On Tue, 13 Dec 2016 at 11:32 Jon Yeargers > > > > wrote: > > > > > > > > > (am attaching a debug log - note that app terminated with no > further > > > > > messages) > > > > > > > > > > topology: kStream -> groupByKey.aggregate(minute) -> foreach > > > > > \-> groupByKey.aggregate(hour) -> > > foreach > > > >
Re: How does 'TimeWindows.of().until()' work?
are keeping default until which is 1 day. > >>> > >>> Our record's times tamp extractor has a field which increases with > time. > >>> However for short time we cannot guarantee the time stamp is always > >>> increases. So at the boundary ie after 24 hrs we can get records which > >> are > >>> beyond that windows retention period. > >>> > >>> Then it happens like it is mentioned above and our aggregation fails. > >>> > >>> So just to sum up when we get record > >>> 24h + 1 sec (it deletes older window and since the new record belongs > to > >>> the new window its gets created) > >>> Now when we get next record of 24 hs - 1 sec since older window is > >> dropped > >>> it does not get aggregated in that bucket. > >>> > >>> I suggest we have another setting next to until call retain which > retains > >>> the older windows into next window. > >>> > >>> I think at stream window boundary level it should use a concept of > >> sliding > >>> window. So we can define window like > >>> > >>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 * > >> 1000l).untill(7 > >>> * 24 * 3600 * 1000l).retain(900 * 1000l) > >>> > >>> So after 7 days it retains the data covered by windows in last 15 > minutes > >>> which rolls over the data in them to next window. This way streams work > >>> continuously. > >>> > >>> Please let us know your thoughts on this. > >>> > >>> On another side question on this there is a setting: > >>> > >>> windowstore.changelog.additional.retention.ms > >>> I is not clear what is does. Is this the default for until? > >>> > >>> Thanks > >>> Sachin > >>> > >>> > >>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax < > matth...@confluent.io > >>> > >>> wrote: > >>> > >>>> Windows are created on demand, ie, each time a new record arrives and > >>>> there is no window yet for it, a new window will get created. > >>>> > >>>> Windows are accepting data until their retention time (that you can > >>>> configure via .until()) passed. Thus, you will have many windows being > >>>> open in parallel. > >>>> > >>>> If you read older data, they will just be put into the corresponding > >>>> windows (as long as window retention time did not pass). If a window > was > >>>> discarded already, a new window with this single (later arriving) > record > >>>> will get created, the computation will be triggered, you get a result, > >>>> and afterwards the window is deleted again (as it's retention time > >>>> passed already). > >>>> > >>>> The retention time is driven by "stream-time", in internal tracked > time > >>>> that only progressed in forward direction. It gets it value from the > >>>> timestamps provided by TimestampExtractor -- thus, per default it will > >>>> be event-time. > >>>> > >>>> -Matthias > >>>> > >>>> On 12/11/16 3:47 PM, Jon Yeargers wrote: > >>>>> I've read this and still have more questions than answers. If my data > >>>> skips > >>>>> about (timewise) what determines when a given window will start / > stop > >>>>> accepting new data? What if Im reading data from some time ago? > >>>>> > >>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax < > >> matth...@confluent.io> > >>>>> wrote: > >>>>> > >>>>>> Please have a look here: > >>>>>> > >>>>>> http://docs.confluent.io/current/streams/developer- > >>>>>> guide.html#windowing-a-stream > >>>>>> > >>>>>> If you have further question, just follow up :) > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> > >>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote: > >>>>>>> Ive added the 'until()' clause to some aggregation steps and it's > >>>> working > >>>>>>> wonders for keeping the size of the state store in useful > >> boundaries... > >>>>>> But > >>>>>>> Im not 100% clear on how it works. > >>>>>>> > >>>>>>> What is implied by the '.until()' clause? What determines when to > >> stop > >>>>>>> receiving further data - is it clock time (since the window was > >>>> created)? > >>>>>>> It seems problematic for it to refer to EventTime as this may > bounce > >>>> all > >>>>>>> over the place. For non-overlapping windows a given record can only > >>>> fall > >>>>>>> into a single aggregation period - so when would a value get > >> discarded? > >>>>>>> > >>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 * > >>>>>> 1000L).until(10 * > >>>>>>> 1000L))' - but what is this accomplishing? > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>>> > >>> > >> > >> > > > >
reasonable KStream app config settings?
My app seems to be continuously rebalancing. If I said it processed data maybe 3 minutes / hour I wouldn't be exaggerating. Surely this isn't normal behavior. My config is: config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" ); config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName()); config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2"); config.put(StreamsConfig.STATE_DIR_CONFIG, "/mnt/PRTMinuteAgg"); config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2"); config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5242880"); config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60"); Does this seem reasonable / rational? Some default that I shouldn't rely on?
Re: Another odd error
As near as I can see it's rebalancing constantly. I'll up that value and see what happens. On Tue, Dec 13, 2016 at 9:04 AM, Damian Guy wrote: > Hi Jon, > > I haven't had much of a chance to look at the logs in detail too much yet, > but i have noticed that your app seems to be rebalancing frequently. It > seems that it is usually around the 300 second mark, which usually would > mean that poll hasn't been called for at least that long. You might want to > try setting the config ConsumerConfig.MAX_POLL_INTERVAL_CONFIG to > something > higher than 30 (which is the default). > > I'll continue to look at your logs and get back to you. > Thanks, > Damian > > On Tue, 13 Dec 2016 at 15:02 Jon Yeargers > wrote: > > > attached is a log with lots of disconnections and a small amount of > > actual, useful activity. > > > > > > > > On Tue, Dec 13, 2016 at 4:57 AM, Jon Yeargers > > wrote: > > > > n/m - I understand the logging issue now. Am generating a new one. Will > > send shortly. > > > > On Tue, Dec 13, 2016 at 4:55 AM, Jon Yeargers > > wrote: > > > > Yes - saw that one. There were plenty of smaller records available > though. > > > > I sent another log this morning with the level set to DEBUG. Hopefully > you > > rec'd it. > > > > On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy > wrote: > > > > HI Jon, > > > > It looks like you have the logging level for KafkaStreams set to at least > > WARN. I can only see ERROR level logs being produced from Streams. > > > > However, i did notice an issue in the logs (not related to your specific > > error but you will need to fix anyway): > > > > There are lots of messages like: > > task [2_9] Error sending record to topic > > PRTMinuteAgg-prt_hour_agg_stream-changelog > > org.apache.kafka.common.errors.RecordTooLargeException: The message is > > 2381750 bytes when serialized which is larger than the maximum > > > > This means you need to add some extra config to your StreamsConfig: > > config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, > > expectedMaximumMessageSizeBytes) > > > > You will also possible need to adjust the broker properties and > > increase message.max.bytes > > - it will need to be at least as large as the setting above. > > > > At the moment all of the change-logs for your state-stores are being > > dropped due to this issue. > > > > Thanks, > > Damian > > > > > > > > On Tue, 13 Dec 2016 at 11:32 Jon Yeargers > > wrote: > > > > > (am attaching a debug log - note that app terminated with no further > > > messages) > > > > > > topology: kStream -> groupByKey.aggregate(minute) -> foreach > > > \-> groupByKey.aggregate(hour) -> foreach > > > > > > > > > config: > > > > > > Properties config = new Properties(); > > > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); > > > config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > ZOOKEEPER_IP); > > > config.put(StreamsConfig.APPLICATION_ID_CONFIG, > "PRTMinuteAgg" ); > > > config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > > AggKey.class.getName()); > > > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > > > Serdes.String().getClass().getName()); > > > config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); > > > config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2"); > > > config.put(StreamsConfig.STATE_DIR_CONFIG, > "/mnt/PRTMinuteAgg"); > > > > > > config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2"); > > > > > > > > > On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang > > wrote: > > > > > > Jon, > > > > > > To help investigating this issue, could you let me know 1) your > topology > > > sketch and 2) your app configs? For example did you enable caching in > > your > > > apps with the cache.max.bytes.buffering config? > > > > > > > > > Guozhang > > > > > > > > > On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers < > jon.yearg...@cedexis.com> > > > wrote: > > > > > > > I get this one quite a bit. It kills my app after a short time of > > > running. > > > > Driving me nuts. > >
KStreams app - frequent broker dis/re connects
Watching the debug output on an app - wondering why it spends nearly all of its time rebalancing. Noticed that it seems to drop / recreate connections to brokers pretty frequently. No error messages to speak of though. Connect / timeout / related settings in the consumer are all default. How much connection-thrashing is considered typical?
Re: Another odd error
n/m - I understand the logging issue now. Am generating a new one. Will send shortly. On Tue, Dec 13, 2016 at 4:55 AM, Jon Yeargers wrote: > Yes - saw that one. There were plenty of smaller records available though. > > I sent another log this morning with the level set to DEBUG. Hopefully you > rec'd it. > > On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy wrote: > >> HI Jon, >> >> It looks like you have the logging level for KafkaStreams set to at least >> WARN. I can only see ERROR level logs being produced from Streams. >> >> However, i did notice an issue in the logs (not related to your specific >> error but you will need to fix anyway): >> >> There are lots of messages like: >> task [2_9] Error sending record to topic >> PRTMinuteAgg-prt_hour_agg_stream-changelog >> org.apache.kafka.common.errors.RecordTooLargeException: The message is >> 2381750 bytes when serialized which is larger than the maximum >> >> This means you need to add some extra config to your StreamsConfig: >> config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, >> expectedMaximumMessageSizeBytes) >> >> You will also possible need to adjust the broker properties and >> increase message.max.bytes >> - it will need to be at least as large as the setting above. >> >> At the moment all of the change-logs for your state-stores are being >> dropped due to this issue. >> >> Thanks, >> Damian >> >> >> On Tue, 13 Dec 2016 at 11:32 Jon Yeargers >> wrote: >> >> > (am attaching a debug log - note that app terminated with no further >> > messages) >> > >> > topology: kStream -> groupByKey.aggregate(minute) -> foreach >> > \-> groupByKey.aggregate(hour) -> foreach >> > >> > >> > config: >> > >> > Properties config = new Properties(); >> > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); >> > config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, >> ZOOKEEPER_IP); >> > config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" >> ); >> > config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >> > AggKey.class.getName()); >> > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >> > Serdes.String().getClass().getName()); >> > config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); >> > config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2"); >> > config.put(StreamsConfig.STATE_DIR_CONFIG, >> "/mnt/PRTMinuteAgg"); >> > >> > config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2"); >> > >> > >> > On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang >> wrote: >> > >> > Jon, >> > >> > To help investigating this issue, could you let me know 1) your topology >> > sketch and 2) your app configs? For example did you enable caching in >> your >> > apps with the cache.max.bytes.buffering config? >> > >> > >> > Guozhang >> > >> > >> > On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers > > >> > wrote: >> > >> > > I get this one quite a bit. It kills my app after a short time of >> > running. >> > > Driving me nuts. >> > > >> > > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax < >> matth...@confluent.io> >> > > wrote: >> > > >> > > > Not sure about this one. >> > > > >> > > > Can you describe what you do exactly? Can you reproduce the issue? >> We >> > > > definitely want to investigate this. >> > > > >> > > > -Matthias >> > > > >> > > > On 12/10/16 4:17 PM, Jon Yeargers wrote: >> > > > > (Am reporting these as have moved to 0.10.1.0-cp2) >> > > > > >> > > > > ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided listener >> > > > > org.apache.kafka.streams.processor.internals.StreamThread$1 for >> group >> > > > > MinuteAgg failed on partition assignment >> > > > > >> > > > > java.lang.IllegalStateException: task [1_9] Log end offset >> should not >> > > > > change while restoring >> > > > > >> > > > > at >> > > > > org.apache.kafka.streams.processor.internals.Pro
Re: Another odd error
Yes - saw that one. There were plenty of smaller records available though. I sent another log this morning with the level set to DEBUG. Hopefully you rec'd it. On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy wrote: > HI Jon, > > It looks like you have the logging level for KafkaStreams set to at least > WARN. I can only see ERROR level logs being produced from Streams. > > However, i did notice an issue in the logs (not related to your specific > error but you will need to fix anyway): > > There are lots of messages like: > task [2_9] Error sending record to topic > PRTMinuteAgg-prt_hour_agg_stream-changelog > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 2381750 bytes when serialized which is larger than the maximum > > This means you need to add some extra config to your StreamsConfig: > config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, > expectedMaximumMessageSizeBytes) > > You will also possible need to adjust the broker properties and > increase message.max.bytes > - it will need to be at least as large as the setting above. > > At the moment all of the change-logs for your state-stores are being > dropped due to this issue. > > Thanks, > Damian > > > On Tue, 13 Dec 2016 at 11:32 Jon Yeargers > wrote: > > > (am attaching a debug log - note that app terminated with no further > > messages) > > > > topology: kStream -> groupByKey.aggregate(minute) -> foreach > > \-> groupByKey.aggregate(hour) -> foreach > > > > > > config: > > > > Properties config = new Properties(); > > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); > > config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > ZOOKEEPER_IP); > > config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" > ); > > config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > AggKey.class.getName()); > > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > > Serdes.String().getClass().getName()); > > config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); > > config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2"); > > config.put(StreamsConfig.STATE_DIR_CONFIG, "/mnt/PRTMinuteAgg"); > > > > config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2"); > > > > > > On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang > wrote: > > > > Jon, > > > > To help investigating this issue, could you let me know 1) your topology > > sketch and 2) your app configs? For example did you enable caching in > your > > apps with the cache.max.bytes.buffering config? > > > > > > Guozhang > > > > > > On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers > > wrote: > > > > > I get this one quite a bit. It kills my app after a short time of > > running. > > > Driving me nuts. > > > > > > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax < > matth...@confluent.io> > > > wrote: > > > > > > > Not sure about this one. > > > > > > > > Can you describe what you do exactly? Can you reproduce the issue? We > > > > definitely want to investigate this. > > > > > > > > -Matthias > > > > > > > > On 12/10/16 4:17 PM, Jon Yeargers wrote: > > > > > (Am reporting these as have moved to 0.10.1.0-cp2) > > > > > > > > > > ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided listener > > > > > org.apache.kafka.streams.processor.internals.StreamThread$1 for > group > > > > > MinuteAgg failed on partition assignment > > > > > > > > > > java.lang.IllegalStateException: task [1_9] Log end offset should > not > > > > > change while restoring > > > > > > > > > > at > > > > > org.apache.kafka.streams.processor.internals. > ProcessorStateManager. > > > > restoreActiveState(ProcessorStateManager.java:245) > > > > > > > > > > at > > > > > org.apache.kafka.streams.processor.internals. > ProcessorStateManager. > > > > register(ProcessorStateManager.java:198) > > > > > > > > > > at > > > > > org.apache.kafka.streams.processor.internals. > > > > ProcessorContextImpl.register(ProcessorContextImpl.java:123) > > > > > > > > > > at >
Re: Another odd error
(am attaching a debug log - note that app terminated with no further messages) topology: kStream -> groupByKey.aggregate(minute) -> foreach \-> groupByKey.aggregate(hour) -> foreach config: Properties config = new Properties(); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" ); config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, AggKey.class.getName()); config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2"); config.put(StreamsConfig.STATE_DIR_CONFIG, "/mnt/PRTMinuteAgg"); config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2"); On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang wrote: > Jon, > > To help investigating this issue, could you let me know 1) your topology > sketch and 2) your app configs? For example did you enable caching in your > apps with the cache.max.bytes.buffering config? > > > Guozhang > > > On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers > wrote: > > > I get this one quite a bit. It kills my app after a short time of > running. > > Driving me nuts. > > > > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax > > wrote: > > > > > Not sure about this one. > > > > > > Can you describe what you do exactly? Can you reproduce the issue? We > > > definitely want to investigate this. > > > > > > -Matthias > > > > > > On 12/10/16 4:17 PM, Jon Yeargers wrote: > > > > (Am reporting these as have moved to 0.10.1.0-cp2) > > > > > > > > ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided listener > > > > org.apache.kafka.streams.processor.internals.StreamThread$1 for > group > > > > MinuteAgg failed on partition assignment > > > > > > > > java.lang.IllegalStateException: task [1_9] Log end offset should > not > > > > change while restoring > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager. > > > restoreActiveState(ProcessorStateManager.java:245) > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager. > > > register(ProcessorStateManager.java:198) > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals. > > > ProcessorContextImpl.register(ProcessorContextImpl.java:123) > > > > > > > > at > > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init( > > > RocksDBWindowStore.java:206) > > > > > > > > at > > > > org.apache.kafka.streams.state.internals.MeteredWindowStore.init( > > > MeteredWindowStore.java:66) > > > > > > > > at > > > > org.apache.kafka.streams.state.internals.CachingWindowStore.init( > > > CachingWindowStore.java:64) > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals.AbstractTask. > > > initializeStateStores(AbstractTask.java:81) > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals. > > > StreamTask.(StreamTask.java:120) > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals. > > > StreamThread.createStreamTask(StreamThread.java:633) > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals. > > > StreamThread.addStreamTasks(StreamThread.java:660) > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals. > StreamThread.access$100( > > > StreamThread.java:69) > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread$1. > > > onPartitionsAssigned(StreamThread.java:124) > > > > > > > > at > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > > > onJoinComplete(ConsumerCoordinator.java:228) > > > > > > > > at > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > joinGroupIfNeeded(AbstractCoordinator.java:313) > > > > > > > > at > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > ensureActiveGroup(AbstractCoordinator.java:277) > > > > > > > > at > > > > org.apache.kafka.clients.consumer.internals. > ConsumerCoordinator.poll( > > > ConsumerCoordinator.java:259) > > > > > > > > at > > > > org.apache.kafka.clients.consumer.KafkaConsumer. > > > pollOnce(KafkaConsumer.java:1013) > > > > > > > > at > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > > KafkaConsumer.java:979) > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > > StreamThread.java:407) > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals. > > > StreamThread.run(StreamThread.java:242) > > > > > > > > > > > > > > > > -- > -- Guozhang > errors.log.gz Description: GNU Zip compressed data
Re: "Log end offset should not change while restoring"
What is the specific cache config setting? On Mon, Dec 12, 2016 at 1:49 PM, Matthias J. Sax wrote: > We discovered a few more bugs and a bug fix release 0.10.1.1 is planned > already. > > The voting started for it, and it should get release the next weeks. > > If you issues is related to this caching problem, disabling the cache > via StreamsConfig should fix the problem for now. Just set the cache > size to zero. > > > -Matthias > > > On 12/12/16 2:31 AM, Jon Yeargers wrote: > > Im seeing this error occur more frequently of late. I ran across this > > thread: > > https://groups.google.com/forum/#!topic/confluent-platform/AH5QClSNZBw > > > > > > The implication from the thread is that a fix is available. Where can I > get > > it? > > > >
Re: rocksdb error(s)
The attached error log shows several of the problems I've run into. After hitting this list (and typically much less) the app dies. On Mon, Dec 12, 2016 at 4:48 AM, Damian Guy wrote: > Just set the log level to debug and then run your app until you start > seeing the problem. > Thanks > > On Mon, 12 Dec 2016 at 12:47 Jon Yeargers > wrote: > > > I can log whatever you need. Tell me what is useful. > > > > On Mon, Dec 12, 2016 at 4:43 AM, Damian Guy > wrote: > > > > > If you provide the logs from your streams application then we might > have > > > some chance of working out what is going on. Without logs then we > really > > > don't have much hope of diagnosing the problem. > > > > > > On Mon, 12 Dec 2016 at 12:18 Jon Yeargers > > > wrote: > > > > > > > Im running as many threads as I have partitions on this topic. Just > > > curious > > > > if it would make any difference to the seemingly endless rebalancing > > > woes. > > > > > > > > So far no change. In fact, I'll often see all 10 partitions (plus the > > 2 x > > > > 10 for the two aggregations) assigned to a single thread. > > > > > > > > On Mon, Dec 12, 2016 at 4:15 AM, Jon Yeargers < > > jon.yearg...@cedexis.com> > > > > wrote: > > > > > > > > > At this moment I have 5 instances each running 2 threads. > > > > > Single instance / machine. > > > > > > > > > > Define 'full logs' ? > > > > > > > > > > On Mon, Dec 12, 2016 at 3:54 AM, Damian Guy > > > > wrote: > > > > > > > > > >> Jon, > > > > >> > > > > >> How many StreamThreads do you have running? > > > > >> How many application instances? > > > > >> Do you have more than one instance per machine? If yes, are they > > > sharing > > > > >> the same State Directory? > > > > >> Do you have full logs that can be provided so we can try and see > > > > how/what > > > > >> is happening? > > > > >> > > > > >> Thanks, > > > > >> Damian > > > > >> > > > > >> On Mon, 12 Dec 2016 at 10:17 Jon Yeargers < > jon.yearg...@cedexis.com > > > > > > > >> wrote: > > > > >> > > > > >> > No luck here. Moved all state storage to a non-tmp folder and > > > > restarted. > > > > >> > Still hitting the 'No locks available' error quite frequently. > > > > >> > > > > > >> > On Sun, Dec 11, 2016 at 3:45 PM, Jon Yeargers < > > > > jon.yearg...@cedexis.com > > > > >> > > > > > >> > wrote: > > > > >> > > > > > >> > > I moved the state folder to a separate drive and linked out to > > it. > > > > >> > > > > > > >> > > I'll try your suggestion and point directly. > > > > >> > > > > > > >> > > On Sun, Dec 11, 2016 at 2:20 PM, Matthias J. Sax < > > > > >> matth...@confluent.io> > > > > >> > > wrote: > > > > >> > > > > > > >> > >> I am not sure, but this might be related with your state > > > directory. > > > > >> > >> > > > > >> > >> You use default directory that is located in /tmp -- could it > > be, > > > > >> that > > > > >> > >> /tmp gets clean up and thus you loose files/directories? > > > > >> > >> > > > > >> > >> Try to reconfigure your state directory via StreamsConfig: > > > > >> > >> http://docs.confluent.io/current/streams/developer-guide. > > > > >> > >> html#optional-configuration-parameters > > > > >> > >> > > > > >> > >> > > > > >> > >> -Matthias > > > > >> > >> > > > > >> > >> On 12/11/16 1:28 AM, Jon Yeargers wrote: > > > > >> > >> > Seeing this appearing somewhat frequently - > > > > >> > >> > > > > > >> > >> > org.apache.kafka.strea
Re: rocksdb error(s)
I can log whatever you need. Tell me what is useful. On Mon, Dec 12, 2016 at 4:43 AM, Damian Guy wrote: > If you provide the logs from your streams application then we might have > some chance of working out what is going on. Without logs then we really > don't have much hope of diagnosing the problem. > > On Mon, 12 Dec 2016 at 12:18 Jon Yeargers > wrote: > > > Im running as many threads as I have partitions on this topic. Just > curious > > if it would make any difference to the seemingly endless rebalancing > woes. > > > > So far no change. In fact, I'll often see all 10 partitions (plus the 2 x > > 10 for the two aggregations) assigned to a single thread. > > > > On Mon, Dec 12, 2016 at 4:15 AM, Jon Yeargers > > wrote: > > > > > At this moment I have 5 instances each running 2 threads. > > > Single instance / machine. > > > > > > Define 'full logs' ? > > > > > > On Mon, Dec 12, 2016 at 3:54 AM, Damian Guy > > wrote: > > > > > >> Jon, > > >> > > >> How many StreamThreads do you have running? > > >> How many application instances? > > >> Do you have more than one instance per machine? If yes, are they > sharing > > >> the same State Directory? > > >> Do you have full logs that can be provided so we can try and see > > how/what > > >> is happening? > > >> > > >> Thanks, > > >> Damian > > >> > > >> On Mon, 12 Dec 2016 at 10:17 Jon Yeargers > > >> wrote: > > >> > > >> > No luck here. Moved all state storage to a non-tmp folder and > > restarted. > > >> > Still hitting the 'No locks available' error quite frequently. > > >> > > > >> > On Sun, Dec 11, 2016 at 3:45 PM, Jon Yeargers < > > jon.yearg...@cedexis.com > > >> > > > >> > wrote: > > >> > > > >> > > I moved the state folder to a separate drive and linked out to it. > > >> > > > > >> > > I'll try your suggestion and point directly. > > >> > > > > >> > > On Sun, Dec 11, 2016 at 2:20 PM, Matthias J. Sax < > > >> matth...@confluent.io> > > >> > > wrote: > > >> > > > > >> > >> I am not sure, but this might be related with your state > directory. > > >> > >> > > >> > >> You use default directory that is located in /tmp -- could it be, > > >> that > > >> > >> /tmp gets clean up and thus you loose files/directories? > > >> > >> > > >> > >> Try to reconfigure your state directory via StreamsConfig: > > >> > >> http://docs.confluent.io/current/streams/developer-guide. > > >> > >> html#optional-configuration-parameters > > >> > >> > > >> > >> > > >> > >> -Matthias > > >> > >> > > >> > >> On 12/11/16 1:28 AM, Jon Yeargers wrote: > > >> > >> > Seeing this appearing somewhat frequently - > > >> > >> > > > >> > >> > org.apache.kafka.streams.errors.ProcessorStateException: Error > > >> opening > > >> > >> > store minute_agg_stream-201612100812 at location > > >> > >> > /tmp/kafka-streams/MinuteAgg/1_9/minute_agg_stream/minute_ag > > >> > >> g_stream-201612100812 > > >> > >> > > > >> > >> > at > > >> > >> > org.apache.kafka.streams.state.internals.RocksDBStore.openDB > > >> > >> (RocksDBStore.java:196) > > >> > >> > > > >> > >> > at > > >> > >> > org.apache.kafka.streams.state.internals.RocksDBStore.openDB > > >> > >> (RocksDBStore.java:158) > > >> > >> > > > >> > >> > at > > >> > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore$ > > >> > >> Segment.openDB(RocksDBWindowStore.java:72) > > >> > >> > > > >> > >> > at > > >> > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore. > > >> > >> getOrCreateSegment(RocksDBWindowStore.java:402) > > >> > &
Re: rocksdb error(s)
Im running as many threads as I have partitions on this topic. Just curious if it would make any difference to the seemingly endless rebalancing woes. So far no change. In fact, I'll often see all 10 partitions (plus the 2 x 10 for the two aggregations) assigned to a single thread. On Mon, Dec 12, 2016 at 4:15 AM, Jon Yeargers wrote: > At this moment I have 5 instances each running 2 threads. > Single instance / machine. > > Define 'full logs' ? > > On Mon, Dec 12, 2016 at 3:54 AM, Damian Guy wrote: > >> Jon, >> >> How many StreamThreads do you have running? >> How many application instances? >> Do you have more than one instance per machine? If yes, are they sharing >> the same State Directory? >> Do you have full logs that can be provided so we can try and see how/what >> is happening? >> >> Thanks, >> Damian >> >> On Mon, 12 Dec 2016 at 10:17 Jon Yeargers >> wrote: >> >> > No luck here. Moved all state storage to a non-tmp folder and restarted. >> > Still hitting the 'No locks available' error quite frequently. >> > >> > On Sun, Dec 11, 2016 at 3:45 PM, Jon Yeargers > > >> > wrote: >> > >> > > I moved the state folder to a separate drive and linked out to it. >> > > >> > > I'll try your suggestion and point directly. >> > > >> > > On Sun, Dec 11, 2016 at 2:20 PM, Matthias J. Sax < >> matth...@confluent.io> >> > > wrote: >> > > >> > >> I am not sure, but this might be related with your state directory. >> > >> >> > >> You use default directory that is located in /tmp -- could it be, >> that >> > >> /tmp gets clean up and thus you loose files/directories? >> > >> >> > >> Try to reconfigure your state directory via StreamsConfig: >> > >> http://docs.confluent.io/current/streams/developer-guide. >> > >> html#optional-configuration-parameters >> > >> >> > >> >> > >> -Matthias >> > >> >> > >> On 12/11/16 1:28 AM, Jon Yeargers wrote: >> > >> > Seeing this appearing somewhat frequently - >> > >> > >> > >> > org.apache.kafka.streams.errors.ProcessorStateException: Error >> opening >> > >> > store minute_agg_stream-201612100812 at location >> > >> > /tmp/kafka-streams/MinuteAgg/1_9/minute_agg_stream/minute_ag >> > >> g_stream-201612100812 >> > >> > >> > >> > at >> > >> > org.apache.kafka.streams.state.internals.RocksDBStore.openDB >> > >> (RocksDBStore.java:196) >> > >> > >> > >> > at >> > >> > org.apache.kafka.streams.state.internals.RocksDBStore.openDB >> > >> (RocksDBStore.java:158) >> > >> > >> > >> > at >> > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore$ >> > >> Segment.openDB(RocksDBWindowStore.java:72) >> > >> > >> > >> > at >> > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore. >> > >> getOrCreateSegment(RocksDBWindowStore.java:402) >> > >> > >> > >> > at >> > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore. >> > >> putInternal(RocksDBWindowStore.java:333) >> > >> > >> > >> > at >> > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore. >> > >> access$100(RocksDBWindowStore.java:51) >> > >> > >> > >> > at >> > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore$ >> > >> 2.restore(RocksDBWindowStore.java:212) >> > >> > >> > >> > at >> > >> > org.apache.kafka.streams.processor.internals.ProcessorStateM >> > >> anager.restoreActiveState(ProcessorStateManager.java:235) >> > >> > >> > >> > at >> > >> > org.apache.kafka.streams.processor.internals.ProcessorStateM >> > >> anager.register(ProcessorStateManager.java:198) >> > >> > >> > >> > at >> > >> > org.apache.kafka.streams.processor.internals.ProcessorContex >> > >> tImpl.register(ProcessorContextImpl.java:123) >
Re: rocksdb error(s)
At this moment I have 5 instances each running 2 threads. Single instance / machine. Define 'full logs' ? On Mon, Dec 12, 2016 at 3:54 AM, Damian Guy wrote: > Jon, > > How many StreamThreads do you have running? > How many application instances? > Do you have more than one instance per machine? If yes, are they sharing > the same State Directory? > Do you have full logs that can be provided so we can try and see how/what > is happening? > > Thanks, > Damian > > On Mon, 12 Dec 2016 at 10:17 Jon Yeargers > wrote: > > > No luck here. Moved all state storage to a non-tmp folder and restarted. > > Still hitting the 'No locks available' error quite frequently. > > > > On Sun, Dec 11, 2016 at 3:45 PM, Jon Yeargers > > wrote: > > > > > I moved the state folder to a separate drive and linked out to it. > > > > > > I'll try your suggestion and point directly. > > > > > > On Sun, Dec 11, 2016 at 2:20 PM, Matthias J. Sax < > matth...@confluent.io> > > > wrote: > > > > > >> I am not sure, but this might be related with your state directory. > > >> > > >> You use default directory that is located in /tmp -- could it be, that > > >> /tmp gets clean up and thus you loose files/directories? > > >> > > >> Try to reconfigure your state directory via StreamsConfig: > > >> http://docs.confluent.io/current/streams/developer-guide. > > >> html#optional-configuration-parameters > > >> > > >> > > >> -Matthias > > >> > > >> On 12/11/16 1:28 AM, Jon Yeargers wrote: > > >> > Seeing this appearing somewhat frequently - > > >> > > > >> > org.apache.kafka.streams.errors.ProcessorStateException: Error > opening > > >> > store minute_agg_stream-201612100812 at location > > >> > /tmp/kafka-streams/MinuteAgg/1_9/minute_agg_stream/minute_ag > > >> g_stream-201612100812 > > >> > > > >> > at > > >> > org.apache.kafka.streams.state.internals.RocksDBStore.openDB > > >> (RocksDBStore.java:196) > > >> > > > >> > at > > >> > org.apache.kafka.streams.state.internals.RocksDBStore.openDB > > >> (RocksDBStore.java:158) > > >> > > > >> > at > > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore$ > > >> Segment.openDB(RocksDBWindowStore.java:72) > > >> > > > >> > at > > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore. > > >> getOrCreateSegment(RocksDBWindowStore.java:402) > > >> > > > >> > at > > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore. > > >> putInternal(RocksDBWindowStore.java:333) > > >> > > > >> > at > > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore. > > >> access$100(RocksDBWindowStore.java:51) > > >> > > > >> > at > > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore$ > > >> 2.restore(RocksDBWindowStore.java:212) > > >> > > > >> > at > > >> > org.apache.kafka.streams.processor.internals.ProcessorStateM > > >> anager.restoreActiveState(ProcessorStateManager.java:235) > > >> > > > >> > at > > >> > org.apache.kafka.streams.processor.internals.ProcessorStateM > > >> anager.register(ProcessorStateManager.java:198) > > >> > > > >> > at > > >> > org.apache.kafka.streams.processor.internals.ProcessorContex > > >> tImpl.register(ProcessorContextImpl.java:123) > > >> > > > >> > at > > >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore. > > >> init(RocksDBWindowStore.java:206) > > >> > > > >> > at > > >> > org.apache.kafka.streams.state.internals.MeteredWindowStore. > > >> init(MeteredWindowStore.java:66) > > >> > > > >> > at > > >> > org.apache.kafka.streams.state.internals.CachingWindowStore. > > >> init(CachingWindowStore.java:64) > > >> > > > >> > at > > >> > org.apache.kafka.streams.processor.internals.AbstractTask.in > > &
partition count multiples - adverse effects on rebalancing?
Just curious - how is rebalancing handled when the number of potential consumer threads isn't a multiple of the number of partitions? IE If I have 9 partitions and 6 threads - will the cluster be forever trying to balance this?
doing > 1 'parallel' operation on a stream
If I want to aggregate a stream twice using different windows do I need to split / copy / duplicate the source stream somehow? Or will this be handled without my interference?
"Log end offset should not change while restoring"
Im seeing this error occur more frequently of late. I ran across this thread: https://groups.google.com/forum/#!topic/confluent-platform/AH5QClSNZBw The implication from the thread is that a fix is available. Where can I get it?
Re: rocksdb error(s)
No luck here. Moved all state storage to a non-tmp folder and restarted. Still hitting the 'No locks available' error quite frequently. On Sun, Dec 11, 2016 at 3:45 PM, Jon Yeargers wrote: > I moved the state folder to a separate drive and linked out to it. > > I'll try your suggestion and point directly. > > On Sun, Dec 11, 2016 at 2:20 PM, Matthias J. Sax > wrote: > >> I am not sure, but this might be related with your state directory. >> >> You use default directory that is located in /tmp -- could it be, that >> /tmp gets clean up and thus you loose files/directories? >> >> Try to reconfigure your state directory via StreamsConfig: >> http://docs.confluent.io/current/streams/developer-guide. >> html#optional-configuration-parameters >> >> >> -Matthias >> >> On 12/11/16 1:28 AM, Jon Yeargers wrote: >> > Seeing this appearing somewhat frequently - >> > >> > org.apache.kafka.streams.errors.ProcessorStateException: Error opening >> > store minute_agg_stream-201612100812 at location >> > /tmp/kafka-streams/MinuteAgg/1_9/minute_agg_stream/minute_ag >> g_stream-201612100812 >> > >> > at >> > org.apache.kafka.streams.state.internals.RocksDBStore.openDB >> (RocksDBStore.java:196) >> > >> > at >> > org.apache.kafka.streams.state.internals.RocksDBStore.openDB >> (RocksDBStore.java:158) >> > >> > at >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore$ >> Segment.openDB(RocksDBWindowStore.java:72) >> > >> > at >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore. >> getOrCreateSegment(RocksDBWindowStore.java:402) >> > >> > at >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore. >> putInternal(RocksDBWindowStore.java:333) >> > >> > at >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore. >> access$100(RocksDBWindowStore.java:51) >> > >> > at >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore$ >> 2.restore(RocksDBWindowStore.java:212) >> > >> > at >> > org.apache.kafka.streams.processor.internals.ProcessorStateM >> anager.restoreActiveState(ProcessorStateManager.java:235) >> > >> > at >> > org.apache.kafka.streams.processor.internals.ProcessorStateM >> anager.register(ProcessorStateManager.java:198) >> > >> > at >> > org.apache.kafka.streams.processor.internals.ProcessorContex >> tImpl.register(ProcessorContextImpl.java:123) >> > >> > at >> > org.apache.kafka.streams.state.internals.RocksDBWindowStore. >> init(RocksDBWindowStore.java:206) >> > >> > at >> > org.apache.kafka.streams.state.internals.MeteredWindowStore. >> init(MeteredWindowStore.java:66) >> > >> > at >> > org.apache.kafka.streams.state.internals.CachingWindowStore. >> init(CachingWindowStore.java:64) >> > >> > at >> > org.apache.kafka.streams.processor.internals.AbstractTask.in >> itializeStateStores(AbstractTask.java:81) >> > >> > at >> > org.apache.kafka.streams.processor.internals.StreamTask.< >> init>(StreamTask.java:120) >> > >> > at >> > org.apache.kafka.streams.processor.internals.StreamThread. >> createStreamTask(StreamThread.java:633) >> > >> > at >> > org.apache.kafka.streams.processor.internals.StreamThread. >> addStreamTasks(StreamThread.java:660) >> > >> > at >> > org.apache.kafka.streams.processor.internals.StreamThread. >> access$100(StreamThread.java:69) >> > >> > at >> > org.apache.kafka.streams.processor.internals.StreamThread$1. >> onPartitionsAssigned(StreamThread.java:124) >> > >> > at >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordina >> tor.onJoinComplete(ConsumerCoordinator.java:228) >> > >> > at >> > org.apache.kafka.clients.consumer.internals.AbstractCoordina >> tor.joinGroupIfNeeded(AbstractCoordinator.java:313) >> > >> > at >> > org.apache.kafka.clients.consumer.internals.AbstractCoordina >> tor.ensureActiveGroup(AbstractCoordinator.java:277) >> > >> > at >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordina >> tor.poll(ConsumerCoordinat
Re: rebalancing - how to speed it up?
After an hour: it briefly popped up with 1 instance 'applied' to all 10 partitions... then it went back to rebalance for 10-15 minutes.. followed by a different instance on all partitions.. and then more rebalancing.. At no point (yet) have I seen the work get truly 'balanced' between all 5 instances. On Sun, Dec 11, 2016 at 6:04 PM, Jon Yeargers wrote: > I changed 'num.standby.replicas' to '2'. > > I started one instance and it immediately showed up in the > 'kafka-consumer-groups .. --describe' listing. > > So I started a second... and it quickly displaced the first... which never > came back. > > Started a third.. same effect. Second goes away never to return.. but now > it's tries to rebalance for a while before I see the third by itself. > > Fourth and fifth - now it's gone off to rebalance (and is seemingly stuck > there) and hasn't pulled any data for more than an hour. > > > > On Sun, Dec 11, 2016 at 2:27 PM, Matthias J. Sax > wrote: > >> No sure. >> >> How big is your state? On rebalance, state stores might move from one >> machine to another. To recreate the store on the new machine the >> underlying changelog topic must be read. This can take some time -- an >> hour seems quite long though... >> >> To avoid long state recreation periods Kafka Streams support standby >> task. Try to enable those via StreamsConfig: "num.standby.replicas" >> >> http://docs.confluent.io/current/streams/developer-guide. >> html#optional-configuration-parameters >> >> Also check out this section of the docs: >> >> http://docs.confluent.io/3.1.1/streams/architecture.html#fault-tolerance >> >> >> -Matthias >> >> >> On 12/11/16 3:14 AM, Gerrit Jansen van Vuuren wrote: >> > I don't know about speeding up rebalancing, and an hour seems to suggest >> > something is wrong with zookeeper or you're whole setup maybe. if it >> > becomes an unsolvable issue for you, you could try >> > https://github.com/gerritjvv/kafka-fast which uses a different model >> and >> > doesn't need balancing or rebalancing. >> > >> > disclojure: "Im the library author". >> > >> > >> > >> > On 11 Dec 2016 11:56 a.m., "Jon Yeargers" >> wrote: >> > >> > Is there some way to 'help it along'? It's taking an hour or more from >> when >> > I start my app to actually seeing anything consumed. >> > >> > Plenty of CPU (and IOWait) during this time so I know it's doing >> > _something_... >> > >> >> >
Re: rebalancing - how to speed it up?
I changed 'num.standby.replicas' to '2'. I started one instance and it immediately showed up in the 'kafka-consumer-groups .. --describe' listing. So I started a second... and it quickly displaced the first... which never came back. Started a third.. same effect. Second goes away never to return.. but now it's tries to rebalance for a while before I see the third by itself. Fourth and fifth - now it's gone off to rebalance (and is seemingly stuck there) and hasn't pulled any data for more than an hour. On Sun, Dec 11, 2016 at 2:27 PM, Matthias J. Sax wrote: > No sure. > > How big is your state? On rebalance, state stores might move from one > machine to another. To recreate the store on the new machine the > underlying changelog topic must be read. This can take some time -- an > hour seems quite long though... > > To avoid long state recreation periods Kafka Streams support standby > task. Try to enable those via StreamsConfig: "num.standby.replicas" > > http://docs.confluent.io/current/streams/developer-guide.html#optional- > configuration-parameters > > Also check out this section of the docs: > > http://docs.confluent.io/3.1.1/streams/architecture.html#fault-tolerance > > > -Matthias > > > On 12/11/16 3:14 AM, Gerrit Jansen van Vuuren wrote: > > I don't know about speeding up rebalancing, and an hour seems to suggest > > something is wrong with zookeeper or you're whole setup maybe. if it > > becomes an unsolvable issue for you, you could try > > https://github.com/gerritjvv/kafka-fast which uses a different model and > > doesn't need balancing or rebalancing. > > > > disclojure: "Im the library author". > > > > > > > > On 11 Dec 2016 11:56 a.m., "Jon Yeargers" > wrote: > > > > Is there some way to 'help it along'? It's taking an hour or more from > when > > I start my app to actually seeing anything consumed. > > > > Plenty of CPU (and IOWait) during this time so I know it's doing > > _something_... > > > >
Re: How does 'TimeWindows.of().until()' work?
I've read this and still have more questions than answers. If my data skips about (timewise) what determines when a given window will start / stop accepting new data? What if Im reading data from some time ago? On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax wrote: > Please have a look here: > > http://docs.confluent.io/current/streams/developer- > guide.html#windowing-a-stream > > If you have further question, just follow up :) > > > -Matthias > > > On 12/10/16 6:11 PM, Jon Yeargers wrote: > > Ive added the 'until()' clause to some aggregation steps and it's working > > wonders for keeping the size of the state store in useful boundaries... > But > > Im not 100% clear on how it works. > > > > What is implied by the '.until()' clause? What determines when to stop > > receiving further data - is it clock time (since the window was created)? > > It seems problematic for it to refer to EventTime as this may bounce all > > over the place. For non-overlapping windows a given record can only fall > > into a single aggregation period - so when would a value get discarded? > > > > Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 * > 1000L).until(10 * > > 1000L))' - but what is this accomplishing? > > > >
Re: rocksdb error(s)
I moved the state folder to a separate drive and linked out to it. I'll try your suggestion and point directly. On Sun, Dec 11, 2016 at 2:20 PM, Matthias J. Sax wrote: > I am not sure, but this might be related with your state directory. > > You use default directory that is located in /tmp -- could it be, that > /tmp gets clean up and thus you loose files/directories? > > Try to reconfigure your state directory via StreamsConfig: > http://docs.confluent.io/current/streams/developer-guide.html#optional- > configuration-parameters > > > -Matthias > > On 12/11/16 1:28 AM, Jon Yeargers wrote: > > Seeing this appearing somewhat frequently - > > > > org.apache.kafka.streams.errors.ProcessorStateException: Error opening > > store minute_agg_stream-201612100812 at location > > /tmp/kafka-streams/MinuteAgg/1_9/minute_agg_stream/minute_ > agg_stream-201612100812 > > > > at > > org.apache.kafka.streams.state.internals.RocksDBStore. > openDB(RocksDBStore.java:196) > > > > at > > org.apache.kafka.streams.state.internals.RocksDBStore. > openDB(RocksDBStore.java:158) > > > > at > > org.apache.kafka.streams.state.internals.RocksDBWindowStore$Segment. > openDB(RocksDBWindowStore.java:72) > > > > at > > org.apache.kafka.streams.state.internals.RocksDBWindowStore. > getOrCreateSegment(RocksDBWindowStore.java:402) > > > > at > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.putInternal( > RocksDBWindowStore.java:333) > > > > at > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.access$100( > RocksDBWindowStore.java:51) > > > > at > > org.apache.kafka.streams.state.internals.RocksDBWindowStore$2.restore( > RocksDBWindowStore.java:212) > > > > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager. > restoreActiveState(ProcessorStateManager.java:235) > > > > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager. > register(ProcessorStateManager.java:198) > > > > at > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.register(ProcessorContextImpl.java:123) > > > > at > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init( > RocksDBWindowStore.java:206) > > > > at > > org.apache.kafka.streams.state.internals.MeteredWindowStore.init( > MeteredWindowStore.java:66) > > > > at > > org.apache.kafka.streams.state.internals.CachingWindowStore.init( > CachingWindowStore.java:64) > > > > at > > org.apache.kafka.streams.processor.internals.AbstractTask. > initializeStateStores(AbstractTask.java:81) > > > > at > > org.apache.kafka.streams.processor.internals. > StreamTask.(StreamTask.java:120) > > > > at > > org.apache.kafka.streams.processor.internals. > StreamThread.createStreamTask(StreamThread.java:633) > > > > at > > org.apache.kafka.streams.processor.internals. > StreamThread.addStreamTasks(StreamThread.java:660) > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread.access$100( > StreamThread.java:69) > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread$1. > onPartitionsAssigned(StreamThread.java:124) > > > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > onJoinComplete(ConsumerCoordinator.java:228) > > > > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > joinGroupIfNeeded(AbstractCoordinator.java:313) > > > > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > ensureActiveGroup(AbstractCoordinator.java:277) > > > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > ConsumerCoordinator.java:259) > > > > at > > org.apache.kafka.clients.consumer.KafkaConsumer. > pollOnce(KafkaConsumer.java:1013) > > > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:979) > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:407) > > > > at > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:242) > > > > Caused by: org.rocksdb.RocksDBException: IO error: lock > > /tmp/kafka-streams/MinuteAgg/1_9/minute_agg_stream/minute_ > agg_stream-201612100812/LOCK: > > No locks available > > > > at org.rocksdb.RocksDB.open(Native Method) > > > > at org.rocksdb.RocksDB.open(RocksDB.java:184) > > > > at > > org.apache.kafka.streams.state.internals.RocksDBStore. > openDB(RocksDBStore.java:189) > > > > ... 26 common frames omitted > > > >
Re: Another odd error
I get this one quite a bit. It kills my app after a short time of running. Driving me nuts. On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax wrote: > Not sure about this one. > > Can you describe what you do exactly? Can you reproduce the issue? We > definitely want to investigate this. > > -Matthias > > On 12/10/16 4:17 PM, Jon Yeargers wrote: > > (Am reporting these as have moved to 0.10.1.0-cp2) > > > > ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided listener > > org.apache.kafka.streams.processor.internals.StreamThread$1 for group > > MinuteAgg failed on partition assignment > > > > java.lang.IllegalStateException: task [1_9] Log end offset should not > > change while restoring > > > > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager. > restoreActiveState(ProcessorStateManager.java:245) > > > > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager. > register(ProcessorStateManager.java:198) > > > > at > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.register(ProcessorContextImpl.java:123) > > > > at > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init( > RocksDBWindowStore.java:206) > > > > at > > org.apache.kafka.streams.state.internals.MeteredWindowStore.init( > MeteredWindowStore.java:66) > > > > at > > org.apache.kafka.streams.state.internals.CachingWindowStore.init( > CachingWindowStore.java:64) > > > > at > > org.apache.kafka.streams.processor.internals.AbstractTask. > initializeStateStores(AbstractTask.java:81) > > > > at > > org.apache.kafka.streams.processor.internals. > StreamTask.(StreamTask.java:120) > > > > at > > org.apache.kafka.streams.processor.internals. > StreamThread.createStreamTask(StreamThread.java:633) > > > > at > > org.apache.kafka.streams.processor.internals. > StreamThread.addStreamTasks(StreamThread.java:660) > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread.access$100( > StreamThread.java:69) > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread$1. > onPartitionsAssigned(StreamThread.java:124) > > > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > onJoinComplete(ConsumerCoordinator.java:228) > > > > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > joinGroupIfNeeded(AbstractCoordinator.java:313) > > > > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > ensureActiveGroup(AbstractCoordinator.java:277) > > > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > ConsumerCoordinator.java:259) > > > > at > > org.apache.kafka.clients.consumer.KafkaConsumer. > pollOnce(KafkaConsumer.java:1013) > > > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:979) > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:407) > > > > at > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:242) > > > >
Re: odd error message
Yes- but not 100% repro. I seem to have several issues with start / rebalance On Sun, Dec 11, 2016 at 2:16 PM, Matthias J. Sax wrote: > Hi, > > this might be a recently discovered bug. Does it happen when you > stop/restart your application? > > > -Matthias > > On 12/10/16 1:42 PM, Jon Yeargers wrote: > > This came up a few times today: > > > > 2016-12-10 18:45:52,637 [StreamThread-1] ERROR > > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] > Failed to > > create an active task %s: > > > > org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] > Error > > while creating the state manager > > > > at > > org.apache.kafka.streams.processor.internals.AbstractTask.( > AbstractTask.java:72) > > > > at > > org.apache.kafka.streams.processor.internals. > StreamTask.(StreamTask.java:90) > > > > at > > org.apache.kafka.streams.processor.internals. > StreamThread.createStreamTask(StreamThread.java:633) > > > > at > > org.apache.kafka.streams.processor.internals. > StreamThread.addStreamTasks(StreamThread.java:660) > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread.access$100( > StreamThread.java:69) > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread$1. > onPartitionsAssigned(StreamThread.java:124) > > > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > onJoinComplete(ConsumerCoordinator.java:228) > > > > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > joinGroupIfNeeded(AbstractCoordinator.java:313) > > > > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > ensureActiveGroup(AbstractCoordinator.java:277) > > > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > ConsumerCoordinator.java:259) > > > > at > > org.apache.kafka.clients.consumer.KafkaConsumer. > pollOnce(KafkaConsumer.java:1013) > > > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:979) > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:407) > > > > at > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:242) > > > > Caused by: java.io.IOException: task [0_0] Failed to lock the state > > directory: /mnt/extra/space/MinuteAgg/0_0 > > > > at > > org.apache.kafka.streams.processor.internals. > ProcessorStateManager.(ProcessorStateManager.java:101) > > > > at > > org.apache.kafka.streams.processor.internals.AbstractTask.( > AbstractTask.java:69) > > > > ... 13 common frames omitted > > > >
rebalancing - how to speed it up?
Is there some way to 'help it along'? It's taking an hour or more from when I start my app to actually seeing anything consumed. Plenty of CPU (and IOWait) during this time so I know it's doing _something_...
rocksdb error(s)
Seeing this appearing somewhat frequently - org.apache.kafka.streams.errors.ProcessorStateException: Error opening store minute_agg_stream-201612100812 at location /tmp/kafka-streams/MinuteAgg/1_9/minute_agg_stream/minute_agg_stream-201612100812 at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:196) at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:158) at org.apache.kafka.streams.state.internals.RocksDBWindowStore$Segment.openDB(RocksDBWindowStore.java:72) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.getOrCreateSegment(RocksDBWindowStore.java:402) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.putInternal(RocksDBWindowStore.java:333) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.access$100(RocksDBWindowStore.java:51) at org.apache.kafka.streams.state.internals.RocksDBWindowStore$2.restore(RocksDBWindowStore.java:212) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:235) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:198) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:206) at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66) at org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:64) at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660) at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) Caused by: org.rocksdb.RocksDBException: IO error: lock /tmp/kafka-streams/MinuteAgg/1_9/minute_agg_stream/minute_agg_stream-201612100812/LOCK: No locks available at org.rocksdb.RocksDB.open(Native Method) at org.rocksdb.RocksDB.open(RocksDB.java:184) at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:189) ... 26 common frames omitted
How does 'TimeWindows.of().until()' work?
Ive added the 'until()' clause to some aggregation steps and it's working wonders for keeping the size of the state store in useful boundaries... But Im not 100% clear on how it works. What is implied by the '.until()' clause? What determines when to stop receiving further data - is it clock time (since the window was created)? It seems problematic for it to refer to EventTime as this may bounce all over the place. For non-overlapping windows a given record can only fall into a single aggregation period - so when would a value get discarded? Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 * 1000L).until(10 * 1000L))' - but what is this accomplishing?
Another odd error
(Am reporting these as have moved to 0.10.1.0-cp2) ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group MinuteAgg failed on partition assignment java.lang.IllegalStateException: task [1_9] Log end offset should not change while restoring at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:245) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:198) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:206) at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66) at org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:64) at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660) at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
odd error message
This came up a few times today: 2016-12-10 18:45:52,637 [StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Failed to create an active task %s: org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Error while creating the state manager at org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:72) at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:90) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660) at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) Caused by: java.io.IOException: task [0_0] Failed to lock the state directory: /mnt/extra/space/MinuteAgg/0_0 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:101) at org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:69) ... 13 common frames omitted
'swap' space for KStream app - limitations?
Are there any? My app ran for a few hours and filled a 100G partition (on 5 machines). Any settings to keep this growth in check? Perhaps to estimate how much space it's going to need?
checking consumer lag on KStreams app?
How would this be done?
Re: controlling memory growth when aggregating
I updated my consumer to that build. The memory issue seems to have abated. TY! Have started seeing this exception semi-regularly though: ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group MinuteAgg failed on partition assignment java.lang.IllegalStateException: task [1_4] Log end offset should not change while restoring at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:245) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:198) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:206) at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66) at org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:64) at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660) at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) On Fri, Dec 9, 2016 at 4:29 PM, Jon Yeargers wrote: > Perhaps that's the problem. Yes - I'm still using 0.10.1.0. > > Does this involve a broker update? > > On Fri, Dec 9, 2016 at 7:12 AM, Damian Guy wrote: > >> Hi Jon, >> >> Are you using 0.10.1? There is a resource leak to do with the Window >> Iterator. The bug has been fixed on the 0.10.1 branch (which will soon be >> released as 0.10.1.1) >> and it is also fixed in the confluent fork. >> >> You can get the confluent version by using the following: >> >> >> >> confluent >> http://packages.confluent.io/maven/ >> >> >> >> org.apache.kafka >> kafka-streams >> 0.10.1.0-cp2 >> org.apache.kafka >> kafka-clients >> 0.10.1.0-cp2 >> >> >> On Thu, 8 Dec 2016 at 23:37 Jon Yeargers >> wrote: >> >> I working with JSON data that has an array member. Im aggregating values >> into this using minute long windows. >> >> I ran the app for ~10 minutes and watched it consume 40% of the memory on >> a >> box with 32G. It was still growing when I stopped it. At this point it had >> created ~800 values each of which was < 1Mb in size (owing to the >> limitations on message size set at the broker). (I wrote all the values >> into Redis so I could count them and check the aggregation). >> >> 1. Why is it consuming so much memory? >> 2. Is there a strategy for controlling this growth? >> >> I get that it's keeping every window open in case a new value shows up. >> Maybe some way to relax this using event time vs clock time? >> > >
Re: controlling memory growth when aggregating
Perhaps that's the problem. Yes - I'm still using 0.10.1.0. Does this involve a broker update? On Fri, Dec 9, 2016 at 7:12 AM, Damian Guy wrote: > Hi Jon, > > Are you using 0.10.1? There is a resource leak to do with the Window > Iterator. The bug has been fixed on the 0.10.1 branch (which will soon be > released as 0.10.1.1) > and it is also fixed in the confluent fork. > > You can get the confluent version by using the following: > > > > confluent > http://packages.confluent.io/maven/ > > > > org.apache.kafka > kafka-streams > 0.10.1.0-cp2 > org.apache.kafka > kafka-clients > 0.10.1.0-cp2 > > > On Thu, 8 Dec 2016 at 23:37 Jon Yeargers wrote: > > I working with JSON data that has an array member. Im aggregating values > into this using minute long windows. > > I ran the app for ~10 minutes and watched it consume 40% of the memory on a > box with 32G. It was still growing when I stopped it. At this point it had > created ~800 values each of which was < 1Mb in size (owing to the > limitations on message size set at the broker). (I wrote all the values > into Redis so I could count them and check the aggregation). > > 1. Why is it consuming so much memory? > 2. Is there a strategy for controlling this growth? > > I get that it's keeping every window open in case a new value shows up. > Maybe some way to relax this using event time vs clock time? >