Re: Data (re)processing with Kafka (new wiki page)
This page is really helpful.Thanks for putting this Some nice to have features can be (not sure for this wiki page) 1) Pause and resume without having to start and stop. It should drain all the inflight calculations before doing the actual pause and a notifier will be helpful that it is actually paused. This can be much light wait if possible rather than stopping all tasks and stores and starting them again which might take a lot of time 2) Metadata api response if it can have all the topology graphs and sub graphs with all the nodes and edges for each sub graphs with corresponding state store names then it will be easier to build some UI on top of it and also the pipe between the sub graphs which are kafka topics need to be called out and also the time semantics can be laid out on top of it. This is something like a logical view on top of the physical view which the current api has. Regards Sai On Fri, Nov 25, 2016 at 12:53 AM, Michael Nollwrote: > Thanks a lot, Matthias! > > I have already begun to provide feedback. > > -Michael > > > > On Wed, Nov 23, 2016 at 11:41 PM, Matthias J. Sax > wrote: > > > Hi, > > > > we added a new wiki page that is supposed to collect data (re)processing > > scenario with Kafka: > > > > https://cwiki.apache.org/confluence/display/KAFKA/ > Kafka+Streams+Data+(Re) > > Processing+Scenarios > > > > We added already a couple of scenarios we think might be common and want > > to invite all of you to add more. This helps to get a better overview of > > requirements to enable new use cases. > > > > We are looking forward to your feedback! > > > > > > -Matthias > > > > >
Re: Kafka streaming changelog topic max.message.bytes exception
Hi Sachin Here is a possible general approach which might work for you in abscence of any broadcast variable and everything being local state and also if you can adjust you process to do everything before doing aggregation. Basically the idea here is to use a custom topology with custom processor with a persistent state store and use it to change the data and send it to a result topic so that the aggregation becomes easier .You can change the incoming data based on the old data in this state store for a given key or process it however way you want to and then forward to a final topic. Below is the link for a sample custom processor. https://apache.googlesource.com/kafka/+/0.10.0/streams/ examples/src/main/java/org/apache/kafka/streams/examples/wordcount/ WordCountProcessorDemo.java?autodive=0%2F%2F. Note that in this class it accesses the store on init whose name should be "oldvalues" as per below pseudo code. It could be done something like below (if you use the TopologyBuilder api) Some pseudo code which you might have already seen in the docs builder.addProcessor("myprocessor",WordCountProcessorDemo::new, "sourcetopic"); StateStoreSupplier oldValueStore = Stores.create("oldvalues").withKeys( stringSerde).withValues(stringSerde) .persistent().build(); builder.addStateStore(oldValueStore,"myprocessor"); builder.addSink(toAggregationTopic) Now create the final KTable out of this toAggregationTopic which has the cleaned data. Hope this helps and ignore this if I have misunderstood your usecase Regards Sai On Thu, Nov 10, 2016 at 8:40 PM, Sachin Mittalwrote: > Hi, > On the subject of deleting values from list I have following toplogy > aggregate(old list) = new list -> left join (another list) -> output sink. > > While processing the output list I know which values in the list are old > and can be removed. > Is there any way to pass that information from downstream back to upstream? > Any thoughts around how can I pass this information. > > One thing I can think off is that I can set some global variable in the > output sink. > > So next time aggregate function is run it can lookup the global variable > and remove items from the list. > So new list = old list + new value added - old values removed. > > In spark we have something like broadcast variables to do the same. > > Is there any such similar concept in kafka streaming. > > This way we can keep the changelog topic message from growing and prevent > the max message bytes exception. > > Thanks > Sachin > > > > > > On Fri, Nov 11, 2016 at 1:02 AM, Matthias J. Sax > wrote: > > > -BEGIN PGP SIGNED MESSAGE- > > Hash: SHA512 > > > > Sachin, > > > > my commend about deleting was about deleting values from the list, not > > about deleting the whole key/value record. > > > > If you want to delete a whole key/value record it there is not update > > for it for some time, you can combine compaction with retention. You > > need to alter the configuration of the changelog topics and set > > > > cleanup.policy=compact,delete > > > > Than, retention.ms will be applied to the changelog, too. > > > > > > - -Matthias > > > > On 11/10/16 3:17 AM, Sachin Mittal wrote: > > > Hi, As per Eno suggestion I have pre-created internal changelog > > > topics with increased max.message.bytes config to handle big > > > messages that gets incremented over the time. > > > > > > As Matthias has pointed that we cannot use retention.ms setting to > > > delete older message data after a given time, is there a way to > > > purge older messages from my changelog topic. > > > > > > Remember my changelog topic is key=list of objects and this grows > > > with time. > > > > > > So I would like these to be deleted from time to time because I > > > would have already consumed the objects so that key/value can be > > > deleted. Later if I get a new object for the same key then that's a > > > new message and old data has no use for the streaming application. > > > > > > So how can I achieve the following. Would retention.bytes help > > > here? > > > > > > Is there a way if i can set expire after or something like that at > > > message level and some kafka thread would purge those messages. > > > > > > Thanks Sachin > > > > > > > > > > > > On Wed, Nov 9, 2016 at 1:42 AM, Matthias J. Sax > > > wrote: > > > > > > My two cents: > > > > > > Changelog topics are compacted topics, thus they do not have a > > > retention time (there is an exception for windowed KTable changlog > > > topics that are compacted and do have a retention time though). > > > > > > However, I do not understand how changing retention time should > > > fix the issue. If your list of values grows and exceed > > > max.message.byte you will need to increase this parameter (or > > > shrink you value). > > > > > > Besides this, Eno's answer is the way to go. In order to figure > > > out internal topic names, you can use
Re: Reopen KAFKA-4344 ?
Hi Srinivas I raised the issue and the way I got around this was to let kafka streams run on POJO way rather than some of the dependent instances being spring managed bean instances. If you create the instance of riakService and counterService in processor class instead of passing the spring managed instances to the processor class constructor your kafka streams initilization should be fine and it should create the right number of tasks with right number of processors for all the partitions. I was fine with POJO based approach as kafka streams has quite a bit of apis to query the state(of course once it is started correctly) as i am running stateful processors and i wanted to query the state data all the time. I was just using spring boot controller for the web container to proxy the kafka streams state store(ReadOnlyKeyValueStore) get apis. Alternatively you can try having prototype components for these two services (if your usecase is fine with this). Hope this helps. Regards Sai On Mon, Nov 7, 2016 at 9:08 AM, Matthias J. Saxwrote: > -BEGIN PGP SIGNED MESSAGE- > Hash: SHA512 > > KAFKA-4344 was not a bug. The issues was as wrong initialization order > of Kafka Streams by the user. > > Please double check your initialization order (and maybe read the old > email thread and JIRA comments -- it might have some relevant > information for you to fix the issue for you). > > If the problem is still there, can you please reduce your code to a > minimum example that reproduces the problem? > > Thanks! > > - -Matthias > > On 11/5/16 3:28 PM, srinivas koniki wrote: > > > > Hi, I'm still seeing the same issue with spring boot. Code is > > below, sorry code is in groovy and not fully baked. Just have > > single processor. It worked well with single partition. But when i > > increased the partitions, started seeing the error as in this > > kafka-4344. > > > > > > import com.codahale.metrics.MetricRegistry import > > org.apache.kafka.clients.consumer.ConsumerConfig import > > org.apache.kafka.clients.producer.KafkaProducer import > > org.apache.kafka.clients.producer.ProducerRecord import > > org.apache.kafka.common.serialization.Serdes import > > org.apache.kafka.streams.KafkaStreams import > > org.apache.kafka.streams.StreamsConfig import > > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster > > import org.apache.kafka.streams.processor.AbstractProcessor import > > org.apache.kafka.streams.processor.ProcessorSupplier import > > org.apache.kafka.streams.processor.TopologyBuilder import > > org.aspectj.lang.ProceedingJoinPoint import > > org.aspectj.lang.annotation.AfterReturning import > > org.aspectj.lang.annotation.Around import > > org.aspectj.lang.annotation.Aspect import > > org.aspectj.lang.annotation.Pointcut import > > org.springframework.beans.factory.annotation.Autowired import > > org.springframework.beans.factory.annotation.Value import > > org.springframework.boot.actuate.metrics.CounterService import > > org.springframework.boot.actuate.metrics.GaugeService import > > org.springframework.boot.autoconfigure.SpringBootApplication import > > org.springframework.boot.test.context.SpringBootTest import > > org.springframework.context.Lifecycle import > > org.springframework.context.annotation.Bean import > > org.springframework.context.annotation.Configuration import > > org.springframework.context.annotation.Import import > > org.springframework.context.support.PropertySourcesPlaceholderConfigur > er > > > > > import org.springframework.stereotype.Component > > import org.springframework.test.context.ContextConfiguration import > > org.springframework.util.StopWatch import spock.lang.Shared import > > spock.lang.Specification > > > > import java.util.concurrent.Future import > > java.util.stream.IntStream > > > > /** * Created by srinivas.koniki on 11/5/16. */ > > @ContextConfiguration(classes=[TestConfig, MetricsAspect, > > RiakService]) @SpringBootTest(webEnvironment = > > SpringBootTest.WebEnvironment.RANDOM_PORT) class MetricsSpec > > extends Specification{ > > > > static String kafkaTopic = 'testTopic' > > > > @Shared TestConfig testConfigRef > > > > @Autowired TestConfig testConfig > > > > @Autowired MetricRegistry metricRegistry > > > > @Autowired KafkaProducer kafkaProducer > > > > @Shared static final EmbeddedKafkaCluster CLUSTER = new > > EmbeddedKafkaCluster(1) > > > > def setupSpec() { println("Heavy init for all the tests...") > > CLUSTER.start() > > System.setProperty('broker.url',CLUSTER.bootstrapServers()) > > System.setProperty('zk.url',CLUSTER.zKConnectString()) > > System.setProperty('kafka.topic',kafkaTopic) > > CLUSTER.createTopic(kafkaTopic, 3, 1) } > > > > def cleanupSpec() { testConfigRef.stop() CLUSTER.stop() } > > > > def "Test send and receive" (){ expect: testConfig != null > > metricRegistry != null println ''+metricRegistry.getGauges() > > > > when: testConfigRef = testConfig testConfig.start() List > > futureList = new
Re: Kafka Streams fails permanently when used with an unstable network
Hi Eno Thanks for the JIRA info The change looks worth trying.Will let you know after i try it out. Regards Sai On Wed, Nov 2, 2016 at 1:33 PM, Eno Thereska <eno.there...@gmail.com> wrote: > Hi Sai, > > For your second note on rebalancing taking a long time, we have just > improved the situation in trunk after fixing this JIRA: > https://issues.apache.org/jira/browse/KAFKA-3559 < > https://issues.apache.org/jira/browse/KAFKA-3559>. Feel free to give it a > go if rebalancing time continues to be a problem. > > Thanks > Eno > > > On 31 Oct 2016, at 19:44, saiprasad mishra <saiprasadmis...@gmail.com> > wrote: > > > > Hey Guys > > > > I have noticed similar issues when network goes down on starting of kafka > > stream apps especially the store has initialized but the task > > initialization is not complete and when the network comes back the > > rebalance fails with the above error and I had to restart. as i run many > > partitions and have many tasks get initialized. > > > > Otherwise if the kafka streams app is started successfully does recover > > from network issues always as far as what I have seen so far and also > > stores do remain available. > > > > Which means some of these initialization exceptions can be categorized as > > recoverable and should be always retried. > > > > I think task 0_0 in your case was not initialized properly in the first > > place and then rebalance happened bcoz of network connectivity and it > > resulted in the above exception. > > > > On a separate note rebalance takes longer time as i have some > > intermeidiary topics and thinking it might be worse if network is slow > and > > was thinking of something like store may be available for querying > quickly > > without waiting for the full initialization of tasks > > > > Regards > > Sai > > > > > > > > > > > > > > Regards > > Sai > > > > On Mon, Oct 31, 2016 at 3:51 AM, Damian Guy <damian@gmail.com> > wrote: > > > >> Hi Frank, > >> > >> This usually means that another StreamThread has the lock for the state > >> directory. So it would seem that one of the StreamThreads hasn't shut > down > >> cleanly. If it happens again can you please take a Thread Dump so we can > >> see what is happening? > >> > >> Thanks, > >> Damian > >> > >> On Sun, 30 Oct 2016 at 10:52 Frank Lyaruu <flya...@gmail.com> wrote: > >> > >>> I have a remote Kafka cluster, to which I connect using a VPN and a > >>> not-so-great WiFi network. > >>> That means that sometimes the Kafka Client loses briefly loses > >>> connectivity. > >>> When it regains a connection after a while, I see: > >>> > >>> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot > >> be > >>> completed since the group has already rebalanced and assigned the > >>> partitions to another member. This means that the time between > subsequent > >>> calls to poll() was longer than the configured max.poll.interval.ms, > >> which > >>> typically implies that the poll loop is spending too much time message > >>> processing. You can address this either by increasing the session > timeout > >>> or by reducing the maximum size of batches returned in poll() with > >>> max.poll.records. > >>> > >>> ... > >>> > >>> Which makes sense I suppose, but this shouldn't be fatal. > >>> > >>> But then I see: > >>> [StreamThread-1] ERROR > >>> org.apache.kafka.streams.processor.internals.StreamThread - > >> stream-thread > >>> [StreamThread-1] Failed to create an active task %s: > >>> org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] > >> Error > >>> while creating the state manager > >>> > >>> at > >>> > >>> org.apache.kafka.streams.processor.internals.AbstractTask.( > >> AbstractTask.java:72) > >>> at > >>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamTask.(StreamTask.java:89) > >>> at > >>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.createStreamTask(StreamThread.java:633) > >>> at > >>> > >>> org.apache.kafka.streams.processor.internals. > &g
Re: Kafka Streams fails permanently when used with an unstable network
Hey Guys I have noticed similar issues when network goes down on starting of kafka stream apps especially the store has initialized but the task initialization is not complete and when the network comes back the rebalance fails with the above error and I had to restart. as i run many partitions and have many tasks get initialized. Otherwise if the kafka streams app is started successfully does recover from network issues always as far as what I have seen so far and also stores do remain available. Which means some of these initialization exceptions can be categorized as recoverable and should be always retried. I think task 0_0 in your case was not initialized properly in the first place and then rebalance happened bcoz of network connectivity and it resulted in the above exception. On a separate note rebalance takes longer time as i have some intermeidiary topics and thinking it might be worse if network is slow and was thinking of something like store may be available for querying quickly without waiting for the full initialization of tasks Regards Sai Regards Sai On Mon, Oct 31, 2016 at 3:51 AM, Damian Guywrote: > Hi Frank, > > This usually means that another StreamThread has the lock for the state > directory. So it would seem that one of the StreamThreads hasn't shut down > cleanly. If it happens again can you please take a Thread Dump so we can > see what is happening? > > Thanks, > Damian > > On Sun, 30 Oct 2016 at 10:52 Frank Lyaruu wrote: > > > I have a remote Kafka cluster, to which I connect using a VPN and a > > not-so-great WiFi network. > > That means that sometimes the Kafka Client loses briefly loses > > connectivity. > > When it regains a connection after a while, I see: > > > > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot > be > > completed since the group has already rebalanced and assigned the > > partitions to another member. This means that the time between subsequent > > calls to poll() was longer than the configured max.poll.interval.ms, > which > > typically implies that the poll loop is spending too much time message > > processing. You can address this either by increasing the session timeout > > or by reducing the maximum size of batches returned in poll() with > > max.poll.records. > > > > ... > > > > Which makes sense I suppose, but this shouldn't be fatal. > > > > But then I see: > > [StreamThread-1] ERROR > > org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > > [StreamThread-1] Failed to create an active task %s: > > org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] > Error > > while creating the state manager > > > > at > > > > org.apache.kafka.streams.processor.internals.AbstractTask.( > AbstractTask.java:72) > > at > > > > org.apache.kafka.streams.processor.internals. > StreamTask.(StreamTask.java:89) > > at > > > > org.apache.kafka.streams.processor.internals. > StreamThread.createStreamTask(StreamThread.java:633) > > at > > > > org.apache.kafka.streams.processor.internals. > StreamThread.addStreamTasks(StreamThread.java:660) > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.access$100( > StreamThread.java:69) > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread$1. > onPartitionsAssigned(StreamThread.java:124) > > at > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > onJoinComplete(ConsumerCoordinator.java:228) > > at > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > joinGroupIfNeeded(AbstractCoordinator.java:313) > > at > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > ensureActiveGroup(AbstractCoordinator.java:277) > > at > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > ConsumerCoordinator.java:259) > > at > > > > org.apache.kafka.clients.consumer.KafkaConsumer. > pollOnce(KafkaConsumer.java:1013) > > at > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:979) > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:407) > > at > > > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:242) > > > > Caused by: java.io.IOException: task [0_0] Failed to lock the state > > directory: > > > > /Users/frank/git/dexels.repository/com.dexels.kafka. > streams/kafka-streams/develop3-person/0_0 > > > > at > > > > org.apache.kafka.streams.processor.internals. > ProcessorStateManager.(ProcessorStateManager.java:101) > > at > > > > org.apache.kafka.streams.processor.internals.AbstractTask.( > AbstractTask.java:69) > > > > ... 13 more > > > > And my stream applications is dead. > > > > So I'm guessing that either the store wasn't closed properly or some > things > > happen out of order. > > > > Any ideas? > > > > I'm using the trunk of Kafka 0.10.2.0-SNAPSHOT, Java 1.8.0_66 on MacOS > > 10.11.6 > > > >
Re: issue with custom processor flush to rocksdb store
Yes this is similar meaning it was all about KafkaStreams not started correctly in my spring app and NOT a bug in KafkaStreams. Inside the comments in the JIRA I have mentioned what I was doing wrong. These type of exceptions largely indicate kafka streams was not started correctly Thanks for your valuable time on this Regards Sai On Wed, Oct 26, 2016 at 2:34 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Is it a similar report as https://issues.apache.org/jira/browse/KAFKA-4344 > ? > > On Tue, Oct 25, 2016 at 2:43 PM, saiprasad mishra < > saiprasadmis...@gmail.com > > wrote: > > > Hi > > This is with version 10.1.0 kafka streams (server running in remote and > > streams app running local in my laptop). > > > > > > > > I have a kafka stream pipeline like this > > > > source topic(with 10 partitions) stream -> filter for null value ->map to > > make it keyed by id ->custom processor to mystore(persistent) > > > > I am getting the below exception. This happens when the flush happens. > > If I restart the app the data i sent is actually present in rocksdb > store. > > I see the message of the keyed stream went to partition 0 on which flush > > happened correctly i guess as I see below partition 9 task failed to > flush > > not sure about the complain about timestamp() here. > > > > Can somebody explain what does this mean. > > > > > > Not sure if it has something to do with below timestamp extractor > property > > i am setting or any other time like producer create time ??? > > > > props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, > > ConsumerRecordTimestampExtractor.class); > > > > > > Regards > > Sai > > > > > > 2016-10-25 14:31:29.822000 > > org.apache.kafka.streams.processor.internals.StreamThread StreamThread-1 > > ERROR stream-thread [StreamThread-1] Failed to commit StreamTask 0_9 > state: > > > > > > org.apache.kafka.streams.errors.ProcessorStateException: task [0_9] > Failed > > to flush state store Products > > > > at > > org.apache.kafka.streams.processor.internals. > ProcessorStateManager.flush( > > ProcessorStateManager.java:331) > > ~[kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.processor.internals. > StreamTask.commit(StreamTask. > > java:275) > > ~[kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitOne( > > StreamThread.java:576) > > [kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitAll( > > StreamThread.java:562) > > [kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit( > > StreamThread.java:538) > > [kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > StreamThread.java:456) > > [kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.processor.internals. > > StreamThread.run(StreamThread.java:242) > > [kafka-streams-0.10.1.0.jar!/:?] > > > > Caused by: java.lang.IllegalStateException: This should not happen as > > timestamp() should only be called while a record is processed > > > > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl. > > timestamp(ProcessorContextImpl.java:192) > > ~[kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange( > > StoreChangeLogger.java:112) > > ~[kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.state.internals.RocksDBStore. > > flush(RocksDBStore.java:375) > > ~[kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush( > > MeteredKeyValueStore.java:175) > > ~[kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.processor.internals. > ProcessorStateManager.flush( > > ProcessorStateManager.java:329) > > ~[kafka-streams-0.10.1.0.jar!/:?] > > > > ... 6 more > > > > > > -- > -- Guozhang >
issue with custom processor flush to rocksdb store
Hi This is with version 10.1.0 kafka streams (server running in remote and streams app running local in my laptop). I have a kafka stream pipeline like this source topic(with 10 partitions) stream -> filter for null value ->map to make it keyed by id ->custom processor to mystore(persistent) I am getting the below exception. This happens when the flush happens. If I restart the app the data i sent is actually present in rocksdb store. I see the message of the keyed stream went to partition 0 on which flush happened correctly i guess as I see below partition 9 task failed to flush not sure about the complain about timestamp() here. Can somebody explain what does this mean. Not sure if it has something to do with below timestamp extractor property i am setting or any other time like producer create time ??? props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, ConsumerRecordTimestampExtractor.class); Regards Sai 2016-10-25 14:31:29.822000 org.apache.kafka.streams.processor.internals.StreamThread StreamThread-1 ERROR stream-thread [StreamThread-1] Failed to commit StreamTask 0_9 state: org.apache.kafka.streams.errors.ProcessorStateException: task [0_9] Failed to flush state store Products at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:331) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:275) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:576) [kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:562) [kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:538) [kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:456) [kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) [kafka-streams-0.10.1.0.jar!/:?] Caused by: java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.timestamp(ProcessorContextImpl.java:192) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:112) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:375) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:175) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:329) ~[kafka-streams-0.10.1.0.jar!/:?] ... 6 more
Re: Exception when accessing partition, offset and timestamp in processor class
Just created the JIRA https://issues.apache.org/jira/browse/KAFKA-4344 Regards Sai On Tue, Oct 25, 2016 at 11:59 AM, saiprasad mishra < saiprasadmis...@gmail.com> wrote: > My JIRA id is saimishra > > Regards > Sai > > On Tue, Oct 25, 2016 at 10:55 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> -BEGIN PGP SIGNED MESSAGE- >> Hash: SHA512 >> >> What is your JIRA ID? We can add you to the contributor list to give >> you permission. >> >> - -Matthias >> >> >> On 10/25/16 10:48 AM, saiprasad mishra wrote: >> > Hi Matthias Thanks for the reply. I think I don't have permission >> > for this. If you can grant me permission I can create one (my >> > handle is saimishra). Or you can go ahead and create one >> > >> > I may need permission to create JIRA as I might report more issues >> > after discussing with you over here. >> > >> > Regards Sai >> > >> > On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax >> > <matth...@confluent.io> wrote: >> > >> > Hi, >> > >> > sorry for late reply. Seems like a bug to me; within >> > Processor#process() accessing the context should work. Can you open >> > a JIRA for it? >> > >> > -Matthias >> > >> > On 10/23/16 10:28 PM, saiprasad mishra wrote: >> >>>> Sorry for the email again >> >>>> >> >>>> I was expecting it to work always when accessed from >> >>>> process() method as this corresponds to each kafka >> >>>> message/record processing. I understand illegalstate by the >> >>>> time punctuate() is called as its already batched by time >> >>>> interval >> >>>> >> >>>> Regards Sai >> >>>> >> >>>> On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra >> >>>> <saiprasadmis...@gmail.com >> >>>>> wrote: >> >>>> >> >>>>> Hi >> >>>>> >> >>>>> his is with my streaming app kafka 10.1.0. >> >>>>> >> >>>>> My flow looks something like below >> >>>>> >> >>>>> source topic stream -> filter for null value ->map to make >> >>>>> it keyed by id ->custom processor to mystore -> to another >> >>>>> topic -> ktable >> >>>>> >> >>>>> I am hitting the below type of exception in a custom >> >>>>> processor class if I try to access offset() or partition() >> >>>>> or timestamp() from the ProcessorContext in the process() >> >>>>> method. I was hoping it would return the partition and >> >>>>> offset for the enclosing topic(in this case source topic) >> >>>>> where its consuming from or -1 based on the api docs. >> >>>>> >> >>>>> Looks like only in certain cases it is accessible. is it >> >>>>> getting lost in transformation phases. >> >>>>> >> >>>>> Same issue happens on if i try to access them in >> >>>>> punctuate() method but some where I saw that it might not >> >>>>> work in punctuate(). Any reason for this or any link >> >>>>> describing this will be helpful >> >>>>> >> >>>>> >> >>>>> == >> == >> >>>>> >> >>>>> >> >>>>> >> >>>>> >> > >> >>>>> >> java.lang.IllegalStateException: This should not happen as offset() >> >>>>> should only be called while a record is processed at >> >>>>> org.apache.kafka.streams.processor.internals. >> >>>>> ProcessorContextImpl.offset(ProcessorContextImpl.java:181) >> >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at >> >>>>> com.sai.repo.MyStore.process(MyStore.java:72) >> >>>>> ~[classes!/:?] at >> >>>>> com.sai.repo.MyStore.process(MyStore.java:39) >> >>>>> ~[classes!/:?] at >> >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process >> (Pr >> > >> >>>>> >> ocessorNode
Re: Exception when accessing partition, offset and timestamp in processor class
My JIRA id is saimishra Regards Sai On Tue, Oct 25, 2016 at 10:55 AM, Matthias J. Sax <matth...@confluent.io> wrote: > -BEGIN PGP SIGNED MESSAGE- > Hash: SHA512 > > What is your JIRA ID? We can add you to the contributor list to give > you permission. > > - -Matthias > > > On 10/25/16 10:48 AM, saiprasad mishra wrote: > > Hi Matthias Thanks for the reply. I think I don't have permission > > for this. If you can grant me permission I can create one (my > > handle is saimishra). Or you can go ahead and create one > > > > I may need permission to create JIRA as I might report more issues > > after discussing with you over here. > > > > Regards Sai > > > > On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax > > <matth...@confluent.io> wrote: > > > > Hi, > > > > sorry for late reply. Seems like a bug to me; within > > Processor#process() accessing the context should work. Can you open > > a JIRA for it? > > > > -Matthias > > > > On 10/23/16 10:28 PM, saiprasad mishra wrote: > >>>> Sorry for the email again > >>>> > >>>> I was expecting it to work always when accessed from > >>>> process() method as this corresponds to each kafka > >>>> message/record processing. I understand illegalstate by the > >>>> time punctuate() is called as its already batched by time > >>>> interval > >>>> > >>>> Regards Sai > >>>> > >>>> On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra > >>>> <saiprasadmis...@gmail.com > >>>>> wrote: > >>>> > >>>>> Hi > >>>>> > >>>>> his is with my streaming app kafka 10.1.0. > >>>>> > >>>>> My flow looks something like below > >>>>> > >>>>> source topic stream -> filter for null value ->map to make > >>>>> it keyed by id ->custom processor to mystore -> to another > >>>>> topic -> ktable > >>>>> > >>>>> I am hitting the below type of exception in a custom > >>>>> processor class if I try to access offset() or partition() > >>>>> or timestamp() from the ProcessorContext in the process() > >>>>> method. I was hoping it would return the partition and > >>>>> offset for the enclosing topic(in this case source topic) > >>>>> where its consuming from or -1 based on the api docs. > >>>>> > >>>>> Looks like only in certain cases it is accessible. is it > >>>>> getting lost in transformation phases. > >>>>> > >>>>> Same issue happens on if i try to access them in > >>>>> punctuate() method but some where I saw that it might not > >>>>> work in punctuate(). Any reason for this or any link > >>>>> describing this will be helpful > >>>>> > >>>>> > >>>>> == > == > >>>>> > >>>>> > >>>>> > >>>>> > > > >>>>> > java.lang.IllegalStateException: This should not happen as offset() > >>>>> should only be called while a record is processed at > >>>>> org.apache.kafka.streams.processor.internals. > >>>>> ProcessorContextImpl.offset(ProcessorContextImpl.java:181) > >>>>> ~[kafka-streams-0.10.1.0.jar!/:?] at > >>>>> com.sai.repo.MyStore.process(MyStore.java:72) > >>>>> ~[classes!/:?] at > >>>>> com.sai.repo.MyStore.process(MyStore.java:39) > >>>>> ~[classes!/:?] at > >>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process > (Pr > > > >>>>> > ocessorNode.java:82) > >>>>> > >>>>> > >>>>> > > ~[kafka-streams-0.10.1.0.jar!/:?] > >>>>> at org.apache.kafka.streams.processor.internals. > >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > >>>>> > >>>>> > ~[kafka-streams-0.10.1.0.jar!/:?] at > >>>>> org.apache.kafka.streams.kstream.internals.KStreamMap$ > >>>>> KStreamMapProcessor.process(KStreamMap.java:43) > >>>>> ~[kafka-streams-0.10.1
Re: Exception when accessing partition, offset and timestamp in processor class
Hi Matthias Thanks for the reply. I think I don't have permission for this. If you can grant me permission I can create one (my handle is saimishra). Or you can go ahead and create one I may need permission to create JIRA as I might report more issues after discussing with you over here. Regards Sai On Tue, Oct 25, 2016 at 10:26 AM, Matthias J. Sax <matth...@confluent.io> wrote: > -BEGIN PGP SIGNED MESSAGE- > Hash: SHA512 > > Hi, > > sorry for late reply. Seems like a bug to me; within > Processor#process() accessing the context should work. Can you open a > JIRA for it? > > - -Matthias > > On 10/23/16 10:28 PM, saiprasad mishra wrote: > > Sorry for the email again > > > > I was expecting it to work always when accessed from process() > > method as this corresponds to each kafka message/record processing. > > I understand illegalstate by the time punctuate() is called as its > > already batched by time interval > > > > Regards Sai > > > > On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra > > <saiprasadmis...@gmail.com > >> wrote: > > > >> Hi > >> > >> his is with my streaming app kafka 10.1.0. > >> > >> My flow looks something like below > >> > >> source topic stream -> filter for null value ->map to make it > >> keyed by id ->custom processor to mystore -> to another topic -> > >> ktable > >> > >> I am hitting the below type of exception in a custom processor > >> class if I try to access offset() or partition() or timestamp() > >> from the ProcessorContext in the process() method. I was hoping > >> it would return the partition and offset for the enclosing > >> topic(in this case source topic) where its consuming from or -1 > >> based on the api docs. > >> > >> Looks like only in certain cases it is accessible. is it getting > >> lost in transformation phases. > >> > >> Same issue happens on if i try to access them in punctuate() > >> method but some where I saw that it might not work in > >> punctuate(). Any reason for this or any link describing this > >> will be helpful > >> > >> > >> > >> > >> > >> > >> > java.lang.IllegalStateException: This should not happen as offset() > >> should only be called while a record is processed at > >> org.apache.kafka.streams.processor.internals. > >> ProcessorContextImpl.offset(ProcessorContextImpl.java:181) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?] at > >> com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?] at > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr > ocessorNode.java:82) > >> > >> > >> > ~[kafka-streams-0.10.1.0.jar!/:?] > >> at org.apache.kafka.streams.processor.internals. > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> org.apache.kafka.streams.kstream.internals.KStreamMap$ > >> KStreamMapProcessor.process(KStreamMap.java:43) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr > ocessorNode.java:82) > >> > >> > >> > ~[kafka-streams-0.10.1.0.jar!/:?] > >> at org.apache.kafka.streams.processor.internals. > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> org.apache.kafka.streams.kstream.internals.KStreamFilter$ > >> KStreamFilterProcessor.process(KStreamFilter.java:44) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(Pr > ocessorNode.java:82) > >> > >> > >> > ~[kafka-streams-0.10.1.0.jar!/:?] > >> at org.apache.kafka.streams.processor.internals. > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> org.apache.kafka.streams.processor.internals. > >> SourceNode.process(SourceNode.java:66) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> org.apache.kafka.streams.processor.internals. > >> StreamTask.process(StreamTask.java:181) > >> ~[kafka-streams-0.10.1.0.jar!/:?] at > >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(Str > eamThread.java:436) > >> >
Re: Exception when accessing partition, offset and timestamp in processor class
Sorry for the email again I was expecting it to work always when accessed from process() method as this corresponds to each kafka message/record processing. I understand illegalstate by the time punctuate() is called as its already batched by time interval Regards Sai On Sun, Oct 23, 2016 at 9:18 PM, saiprasad mishra <saiprasadmis...@gmail.com > wrote: > Hi > > his is with my streaming app kafka 10.1.0. > > My flow looks something like below > > source topic stream -> filter for null value ->map to make it keyed by id > ->custom processor to mystore -> to another topic -> ktable > > I am hitting the below type of exception in a custom processor class if I > try to access offset() or partition() or timestamp() from the > ProcessorContext in the process() method. I was hoping it would return the > partition and offset for the enclosing topic(in this case source topic) > where its consuming from or -1 based on the api docs. > > Looks like only in certain cases it is accessible. is it getting lost in > transformation phases. > > Same issue happens on if i try to access them in punctuate() method but > some where I saw that it might not work in punctuate(). Any reason for this > or any link describing this will be helpful > > > > > java.lang.IllegalStateException: This should not happen as offset() > should only be called while a record is processed > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.offset(ProcessorContextImpl.java:181) > ~[kafka-streams-0.10.1.0.jar!/:?] > at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?] > at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.kstream.internals.KStreamMap$ > KStreamMapProcessor.process(KStreamMap.java:43) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.kstream.internals.KStreamFilter$ > KStreamFilterProcessor.process(KStreamFilter.java:44) > ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > SourceNode.process(SourceNode.java:66) ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > StreamTask.process(StreamTask.java:181) ~[kafka-streams-0.10.1.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) > ~[kafka-streams-0.10.1.0.jar!/:?] > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:242) [kafka-streams-0.10.1.0.jar!/:?] > = > > > Regards > Sai >
Exception when accessing partition, offset and timestamp in processor class
Hi his is with my streaming app kafka 10.1.0. My flow looks something like below source topic stream -> filter for null value ->map to make it keyed by id ->custom processor to mystore -> to another topic -> ktable I am hitting the below type of exception in a custom processor class if I try to access offset() or partition() or timestamp() from the ProcessorContext in the process() method. I was hoping it would return the partition and offset for the enclosing topic(in this case source topic) where its consuming from or -1 based on the api docs. Looks like only in certain cases it is accessible. is it getting lost in transformation phases. Same issue happens on if i try to access them in punctuate() method but some where I saw that it might not work in punctuate(). Any reason for this or any link describing this will be helpful java.lang.IllegalStateException: This should not happen as offset() should only be called while a record is processed at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.offset(ProcessorContextImpl.java:181) ~[kafka-streams-0.10.1.0.jar!/:?] at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?] at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) ~[kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) [kafka-streams-0.10.1.0.jar!/:?] = Regards Sai
Re: kafka streams metadata request fails on 0.10.0.1 broker/topic from 0.10.1.0 client
Thanks Michael Hopefully the upgrade story evolves as 0.10.1+ advances to maturity Just my 2 cents Decoupling the kafka streams from the core kafka changes will help so that the broker can be upgraded without notice and streaming apps can evolve to newer streaming features on their own pace Regards Sai On Wednesday, October 19, 2016, Michael Noll <mich...@confluent.io> wrote: > Apps built with Kafka Streams 0.10.1 only work against Kafka clusters > running 0.10.1+. This explains your error message above. > > Unfortunately, Kafka's current upgrade story means you need to upgrade your > cluster in this situation. Moving forward, we're planning to improve the > upgrade/compatibility story of Kafka so that you could, for example, run a > newer version of Kafka Streams (or any other Kafka client) against an older > version of Kafka. > > > > On Tue, Oct 18, 2016 at 10:56 PM, saiprasad mishra < > saiprasadmis...@gmail.com> wrote: > > > Hi All > > > > Was testing with 0.10.1.0 rc3 build for my new streams app > > > > Seeing issues starting my kafk streams app( 0.10.1.0) on the old version > > broker 0.10.0.1. I dont know if it is supposed to work as is. Will > upgrade > > the broker to same version and see whether it goes away > > > > client side issues > > > > == > > > > java.io.EOFException > > > > at > > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel( > > NetworkReceive.java:83) > > ~[kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.common.network.NetworkReceive. > > readFrom(NetworkReceive.java:71) > > ~[kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.common.network.KafkaChannel.receive( > > KafkaChannel.java:154) > > ~[kafka-clients-0.10.1.0.jar!/:?] > > > > at org.apache.kafka.common.network.KafkaChannel.read( > > KafkaChannel.java:135) > > ~[kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector. > > java:343) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at org.apache.kafka.common.network.Selector.poll(Selector.java:291) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > > ConsumerNetworkClient.java:232) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > > ConsumerNetworkClient.java:209) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient. > > awaitMetadataUpdate(ConsumerNetworkClient.java:148) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient. > > awaitMetadataUpdate(ConsumerNetworkClient.java:136) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > ensureCoordinatorReady(AbstractCoordinator.java:197) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > > ConsumerCoordinator.java:248) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.clients.consumer.KafkaConsumer. > > pollOnce(KafkaConsumer.java:1013) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > KafkaConsumer.java:979) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > StreamThread.java:407) > > [kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.processor.internals. > > StreamThread.run(StreamThread.java:242) > > [kafka-streams-0.10.1.0.jar!/:?] > > > > > > > > On the broker side the following message appears > > > > = > > > > kafka.network.InvalidRequestException: Error getting request for > apiKey: 3 > > and apiVersion: 2 > > > > at > > kafka.network.RequestChannel$Request.liftedTree2$1( > > RequestChannel.scala:95) > > > > at kafka.network.RequestChannel$Request.(RequestChannel.scala:87) > > > > at > > kafka.network.Processor$$anonfun$processCompletedRece
kafka streams metadata request fails on 0.10.0.1 broker/topic from 0.10.1.0 client
Hi All Was testing with 0.10.1.0 rc3 build for my new streams app Seeing issues starting my kafk streams app( 0.10.1.0) on the old version broker 0.10.0.1. I dont know if it is supposed to work as is. Will upgrade the broker to same version and see whether it goes away client side issues == java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83) ~[kafka-clients-0.10.1.0.jar!/:?] at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) ~[kafka-clients-0.10.1.0.jar!/:?] at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154) ~[kafka-clients-0.10.1.0.jar!/:?] at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135) ~[kafka-clients-0.10.1.0.jar!/:?] at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343) [kafka-clients-0.10.1.0.jar!/:?] at org.apache.kafka.common.network.Selector.poll(Selector.java:291) [kafka-clients-0.10.1.0.jar!/:?] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) [kafka-clients-0.10.1.0.jar!/:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) [kafka-clients-0.10.1.0.jar!/:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209) [kafka-clients-0.10.1.0.jar!/:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148) [kafka-clients-0.10.1.0.jar!/:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:136) [kafka-clients-0.10.1.0.jar!/:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:197) [kafka-clients-0.10.1.0.jar!/:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:248) [kafka-clients-0.10.1.0.jar!/:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) [kafka-clients-0.10.1.0.jar!/:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) [kafka-clients-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) [kafka-streams-0.10.1.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) [kafka-streams-0.10.1.0.jar!/:?] On the broker side the following message appears = kafka.network.InvalidRequestException: Error getting request for apiKey: 3 and apiVersion: 2 at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:95) at kafka.network.RequestChannel$Request.(RequestChannel.scala:87) at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:488) at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:483) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.network.Processor.processCompletedReceives(SocketServer.scala:483) at kafka.network.Processor.run(SocketServer.scala:413) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalArgumentException: Invalid version for API key 3: 2 at org.apache.kafka.common.protocol.ProtoUtils.schemaFor(ProtoUtils.java:31) at org.apache.kafka.common.protocol.ProtoUtils.requestSchema(ProtoUtils.java:44) at org.apache.kafka.common.protocol.ProtoUtils.parseRequest(ProtoUtils.java:60) at org.apache.kafka.common.requests.MetadataRequest.parse(MetadataRequest.java:96) at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:48) at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:92) Regards Sai
Re: Kafka consumer does not work from eclipse when kafka running inside VM
I have seen the zk consumers directory getting corrupted due to running your consumer again and again with some settings changed and causing this issue. I will do a removal of zk directory from kafka server's zk like below rmr /consumers/hello and exit zookeeper shell. Then run your consumer and it should work clean always as expected Regards Sai On Mon, Jun 29, 2015 at 10:10 AM, JIEFU GONG jg...@berkeley.edu wrote: Yes, try this command replacing the bracketed things with the correct identification for your project: *bin/kafka-console-consumer.sh --zookeeper [zookeeper info] --topic [your-topic-here] --from-beginning* On Mon, Jun 29, 2015 at 9:56 AM, shivam tiwari bigbang...@gmail.com wrote: It turns out to be the code mentioned does not read topic from beginning and actually waits for producer to produce something fresh. After starting producer and sending messages I was able to retrieve all the messages. Is there a way to get messages on topic from beginning? On 26 June 2015 at 14:32, Gwen Shapira gshap...@cloudera.com wrote: Zookeeper actually doesn't show any errors - it shows a warning, which is pretty normal. What does your consumer and Kafka broker show? Are there any errors in the consumer? Or is it just hanging? You may want to consult our FAQ: https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata ? Gwen On Fri, Jun 26, 2015 at 11:00 AM, shivam tiwari bigbang...@gmail.com wrote: Kafka 0.8.2.2.3 and zookeper both are running inside VM. I was able to run both producer and consumer within VM successfully using kafka-console-producer.sh and kafka-console-consumer.sh respectively. Even I was able to consume Kafka messages from host machine using kafka-console-consumer.sh. But when I tried to run the consumer using java from eclipse then zookeeper logs following error 2015-06-26 03:06:26,323 - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - Closed socket connection for client /192.168.1.12:59549 (no session established for client) 2015-06-26 03:07:26,225 - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted socket connection from /192.168.1.12:59617 2015-06-26 03:07:26,226 - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x0, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.lang.Thread.run(Thread.java:745) Below is my Kafka consume code package com.truckevent.producer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaConsumer { public static void main(String[] args) throws Exception { String group = hello ; Properties props = new Properties(); props.put(zookeeper.connect, 192.168.1.12:2181); props.put(group.id, group); props.put(zookeeper.session.timeout.ms, 2); props.put(zookeeper.sync.time.ms, 2030); props.put(auto.commit.interval.ms, 1); props.put(auto.offset.reset, smallest); ConsumerConfig cf = new ConsumerConfig(props) ; ConsumerConnector consumer = Consumer.createJavaConsumerConnector(cf) ; String topic = event ; MapString, Integer topicCountMap = new HashMapString, Integer(); topicCountMap.put(topic, new Integer(1)); MapString, ListKafkaStreambyte[], byte[] consumerMap = consumer.createMessageStreams(topicCountMap); ListKafkaStreambyte[], byte[] streams = consumerMap.get(topic); KafkaStreambyte[],byte[] stream = streams.get(0) ; ConsumerIteratorbyte[], byte[] it = stream.iterator(); int i = 1 ; while (it.hasNext()) { System.out.println(i + : + new String(it.next().message())); ++i; } consumer.shutdown(); } } I am not sure why I am not able to consume messages from java code. Kafka is running on port 6667 and zookeeper on 2181. -- Regards Shivam Tiwari -- Regards Shivam Tiwari -- Jiefu Gong University of California, Berkeley | Class of 2017 B.A Computer Science |